Равномерное распределение с помощью dblink
Проксирующая база данных
Управление данными
Вставка записи
Для вставки записей будем использовать триггеры на наши таблицы проксирующего сервера, в сами таблицы записи мы вставлять не будем, только во внешние таблицы, посему, переполнения боятся нет смысла.
Но не для всех таблиц нам нужна вставка, так, например, таблицы рейтингов и последние фото обновляются у нас в процессе изменения данных в хранилищах, поэтому на вставку в эти таблицы ставим заглушки.
SQL код (1)
CREATE OR REPLACE FUNCTION "public"."users_insert_trigger" () RETURNS trigger AS
$body$
DECLARE
connecting TEXT ;
query TEXT ;
BEGIN
IF NEW .login IS NULL OR NEW .pass IS NULL THEN
RAISE EXCEPTION 'Can''t insert row users % ' , NEW ;
END IF ;
SELECT get_md5hash (NEW .login) INTO NEW .user_hash;
SELECT storage_select_by_hash (NEW .user_hash, TRUE ) INTO connecting ;
query := 'INSERT INTO users (user_hash, login, pass)
VALUES (' ||
quote_literal (NEW .user_hash) || ',' ||
quote_literal (NEW .login) || ',' ||
quote_literal (NEW .pass) || ')' ;
PERFORM dblink_exec (connecting , query );
RETURN NULL ;
END ;
$body$
LANGUAGE 'plpgsql' ;
CREATE TRIGGER "users_insert" BEFORE INSERT
ON "public"."users" FOR EACH ROW
EXECUTE PROCEDURE "public"."users_insert_trigger" ();
CREATE OR REPLACE FUNCTION "public"."albums_insert_trigger" () RETURNS trigger AS
$body$
DECLARE
connecting TEXT ;
query TEXT ;
BEGIN
IF NEW .user_hash IS NULL OR NEW .user_id IS NULL THEN
RAISE EXCEPTION 'Can''t insert row albums % ' , NEW ;
END IF ;
SELECT storage_select_by_hash (NEW .user_hash, TRUE ) INTO connecting ;
query := 'INSERT INTO albums (user_hash, user_id, title)
VALUES (' ||
quote_literal (NEW .user_hash) || ',' ||
NEW .user_id || ',' ||
quote_literal (NEW .title) || ')' ;
PERFORM dblink_exec (connecting , query );
RETURN NULL ;
END ;
$body$
LANGUAGE 'plpgsql' ;
CREATE TRIGGER "albums_insert" BEFORE INSERT
ON "public"."albums" FOR EACH ROW
EXECUTE PROCEDURE "public"."albums_insert_trigger" ();
CREATE FUNCTION "fotos_insert_trigger" () RETURNS trigger AS
$body$
DECLARE
connecting TEXT ;
query TEXT ;
BEGIN
IF NEW .user_hash IS NULL OR
NEW .user_id IS NULL OR
NEW .album_id IS NULL THEN
RAISE EXCEPTION 'Can''t insert row foto % ' , NEW ;
END IF ;
SELECT storage_select_by_hash (NEW .user_hash, TRUE ) INTO connecting ;
query := 'INSERT INTO fotos ( user_hash,
album_id,
user_id,
title,
filename )
VALUES (' ||
quote_literal (NEW .user_hash) || ',' ||
NEW .album_id || ',' ||
NEW .user_id || ',' ||
quote_literal (NEW .title) || ',' ||
quote_literal (NEW .filename) || ')' ;
PERFORM dblink_exec (connecting , query );
RETURN NULL ;
END ;
$body$
LANGUAGE 'plpgsql' ;
CREATE TRIGGER "fotos_insert" BEFORE INSERT
ON "public"."fotos" FOR EACH ROW
EXECUTE PROCEDURE "fotos_insert_trigger" ();
CREATE FUNCTION "fotos_votes_insert_trigger" () RETURNS trigger AS
$body$
DECLARE
connecting TEXT ;
query TEXT ;
BEGIN
IF NEW .foto_id IS NULL OR
NEW .foto_hash IS NULL OR
NEW .user_id IS NULL OR
NEW .user_hash IS NULL OR
NEW .vote IS NULL
THEN
RAISE EXCEPTION 'Can''t insert row fotos_votes % ' , NEW ;
END IF ;
SELECT storage_select_by_hash (NEW .foto_hash, TRUE ) INTO connecting ;
query := 'SELECT votes_users((' ||
NEW .foto_id || ',' ||
quote_literal (NEW .foto_hash) || ',' ||
NEW .user_id || ',' ||
quote_literal (NEW .user_hash) || ',' ||
NEW .vote || '::smallint))' ;
PERFORM (SELECT * FROM dblink (connecting , query ) AS (f BOOLEAN ));
RETURN NULL ;
END ;
$body$
LANGUAGE 'plpgsql' ;
CREATE TRIGGER "fotos_votes_insert" BEFORE INSERT
ON "public"."fotos_votes" FOR EACH ROW
EXECUTE PROCEDURE "fotos_votes_insert_trigger" ();
CREATE OR REPLACE FUNCTION "public"."fotos_tags_insert_trigger" () RETURNS trigger AS
$body$
DECLARE
connecting TEXT ;
connecting_tag TEXT ;
query TEXT ;
BEGIN
IF NEW .foto_id IS NULL OR
NEW .album_id IS NULL OR
NEW .user_id IS NULL OR
NEW .user_hash IS NULL OR
NEW .tag_id IS NULL OR
NEW .tag_hash IS NULL
THEN
RAISE EXCEPTION 'Can''t insert row fotos_tags % ' , NEW ;
END IF ;
SELECT storage_select_by_hash (NEW .user_hash, TRUE ) INTO connecting ;
query := 'INSERT INTO fotos_tags (
foto_id,
album_id,
user_id,
user_hash,
tag_id,
tag_hash )
VALUES (' ||
NEW .foto_id || ',' ||
NEW .album_id || ',' ||
NEW .user_id || ',' ||
quote_literal (NEW .user_hash) || ',' ||
NEW .tag_id || ',' ||
quote_literal (NEW .tag_hash) || ')' ;
PERFORM dblink_exec (connecting , query );
SELECT storage_select_by_hash (NEW .tag_hash, TRUE ) INTO connecting_tag ;
IF connecting_tag <> connecting THEN
PERFORM dblink_exec (connecting_tag , query );
END IF ;
RETURN NULL ;
END ;
$body$
LANGUAGE 'plpgsql' ;
CREATE TRIGGER "fotos_tags_insert" BEFORE INSERT
ON "public"."fotos_tags" FOR EACH ROW
EXECUTE PROCEDURE "public"."fotos_tags_insert_trigger" ();
Как видно, записи у нас реально будут вставляться только для распределенных таблиц. Индексная база же по большей части изменяется на уровне хранилищ, за исключением таблицы тегов, но о ней позднее.
Делаем триггера-заглушки, на всякий случай.
SQL код (2)
CREATE FUNCTION "trigger_rollback_insert" () RETURNS trigger AS
$body$
BEGIN
RETURN NULL ;
END ;
$body$
LANGUAGE 'plpgsql' ;
CREATE TRIGGER "index_last_foto_insert" BEFORE INSERT
ON "public"."index_last_foto" FOR EACH ROW
EXECUTE PROCEDURE "public"."trigger_rollback_insert" ();
CREATE TRIGGER "index_rating_foto_insert" BEFORE INSERT
ON "public"."index_rating_foto" FOR EACH ROW
EXECUTE PROCEDURE "public"."trigger_rollback_insert" ();
CREATE TRIGGER "index_tags_insert" BEFORE INSERT
ON "public"."index_tags" FOR EACH ROW
EXECUTE PROCEDURE "public"."trigger_rollback_insert" ();
CREATE TRIGGER "index_top_foto_insert" BEFORE INSERT
ON "public"."index_top_foto" FOR EACH ROW
EXECUTE PROCEDURE "public"."trigger_rollback_insert" ();
И да, если вдруг кто не знал, в PostgreSQL Не обязательно для каждого триггера создавать свою хранимую процедуру, можно использовать существующую.
Остается таблица тегов (index_tags ), по сути нам не интересна вставка данных как таковая, нас интересует только выборка, причем, если тега такого еще у нас нет, то мы все равно должны его вернуть, предварительно вставив. Поэтому сделаем хранимую процедуру для выборки тегов, которая будет сама, по надобности, вставлять записи в таблицу:
SQL код (3)
CREATE OR REPLACE FUNCTION "public"."get_tags_by_content" (income_tags text [])
RETURNS SETOF "public"."index_tags" AS
$body$
DECLARE
tag_row index_tags ;
tag_rows index_tags [];
tag_exists TEXT [];
tmp_tag TEXT ;
tag_hash TEXT ;
query TEXT ;
query_exec TEXT ;
index_connect TEXT ;
income_tmp TEXT [];
BEGIN
query := 'SELECT * FROM index_tags WHERE tag IN (' ;
FOR tmp_tag IN SELECT * FROM unnest (income_tags ) LOOP
IF tmp_tag = ANY (income_tmp ) THEN ELSE
tmp_tag := lower (tmp_tag );
query := query || quote_literal (tmp_tag ) || ',' ;
income_tmp := array_append (income_tmp , tmp_tag );
END IF ;
END LOOP ;
query := query || ''''')' ;
SELECT index_db_select (TRUE ) INTO index_connect ;
FOR tag_row IN
SELECT *
FROM _dblink_index_tags (index_connect , query , TRUE )
LOOP
RETURN NEXT tag_row ;
tag_exists := array_append (tag_exists , tag_row .tag);
END LOOP ;
query := NULL ;
FOR tmp_tag IN SELECT * FROM unnest (income_tags ) LOOP
tmp_tag := lower (tmp_tag );
IF tmp_tag <> ALL (tag_exists ) THEN
SELECT get_md5hash (tmp_tag ) INTO tag_hash ;
query_exec := 'INSERT INTO index_tags (tag, tag_hash)
VALUES (' ||
quote_literal (tmp_tag ) || ',' ||
quote_literal (tag_hash ) || ')' ;
PERFORM dblink_exec (index_connect , query_exec );
query := query || quote_literal (tmp_tag ) || ',' ;
END IF ;
END LOOP ;
IF query IS NOT NULL THEN
query := 'SELECT * FROM index_tags WHERE tag IN (' || query || ''''')' ;
FOR tag_row IN
SELECT *
FROM _dblink_index_tags (index_connect , query , TRUE )
LOOP
RETURN NEXT tag_row ;
END LOOP ;
END IF ;
END ;
$body$
LANGUAGE 'plpgsql' ;
Вот так, теперь мы можем вставлять записи практически в любую таблицу.
Изменение записей
В приложении обычно изменение записей производится на уровне одной строки, всякие списочные и глобальные изменения имеют место быть, но они либо разовые, либо жестко специализированы.
В нашем случае, можно определить следующий список возможных запросов на изменение данных:
Пользователь - таблица users :
Изменение пароля - поле pass , выборка возможна по (id , user_hash ) либо по login ;
Удаление пользователя - выборка возможна по (id , user_hash ) либо по login ;
Альбом - таблица albums :
Изменение названия - поле title , выборка по (id , user_hash );
Удаление альбома - выборка по (id , user_hash );
Фото - таблица fotos :
Изменение названия и альбома - поля album_id и title , выборка по (id , user_hash );
Удаление фото - выборка по (id , user_hash );
Голосование пользователей - таблица fotos_votes :
Удаления голоса нет, обновление тоже не нужно, так как триггер вызывает соответсвующую хранимую процедуру на внешнем сервере, следовательно, для этой таблицы нам будет более чем достаточно, триггера на вставку, значит ничего отдельно делать не будем;
Список тегов для фото - таблица fotos_tags :
добавление тега (списка) к фото - выборка по (список тегов, foto_id , foto_hash );
изменение списка тегов для фото - выборка по (список тегов, foto_id , foto_hash );
удаление из списка тегов для фото - выборка по (список тегов, foto_id , foto_hash );
удаление всего списка тегов для фото - выборка по (foto_id , foto_hash );
Как видно, у каждой операции однотипные фильтры для каждой таблицы (если это не так, то это надо сделать), значит не обязательно делать для каждого действия свою процедуру:
Так же, кроме команд на обновление и удаление я сделаю обработку команд на вставку, это, конечно избыточное решение, так как за вставку у нас отвечают триггера, но тем не менее это один из вариантов решения и его опускать не совсем правильно.
Итак, хранимая процедура для пользователей:
SQL код (4)
CREATE OR REPLACE FUNCTION "public"."users_command"
( command "public"."command_type" ,
id integer ,
hash text ,
login text ,
pass text )
RETURNS boolean AS
$body$
DECLARE
connecting TEXT ;
query TEXT ;
user_hash TEXT := hash ;
BEGIN
IF id IS NOT NULL AND user_hash IS NOT NULL THEN
query := ' WHERE id = ' || id || ' AND
user_hash = ' || quote_literal (user_hash );
ELSIF login IS NOT NULL THEN
SELECT get_md5hash (login ) INTO user_hash ;
query := ' WHERE login = ' || quote_literal (login ) || ' AND
user_hash = ' || quote_literal (user_hash );
ELSE
RAISE EXCEPTION 'No arguments for select' ;
END IF ;
SELECT storage_select_by_hash (user_hash , TRUE ) INTO connecting ;
IF command = 'DELETE' THEN
query := 'DELETE FROM users ' || query ;
ELSIF command = 'UPDATE' THEN
IF pass IS NULL THEN RAISE EXCEPTION 'The pass is empty' ; END IF ;
query := 'UPDATE users SET pass = ' || quote_literal (pass ) || query ;
ELSIF command = 'INSERT' THEN
IF pass IS NULL OR login IS NULL THEN
RAISE EXCEPTION 'The pass or login is empty' ;
END IF ;
query := 'INSERT INTO users (user_hash, login, pass)
VALUES (' || quote_literal (user_hash ) || ',' ||
quote_literal (login ) || ',' ||
quote_literal (pass ) || ')' ;
ELSE
RAISE EXCEPTION 'What the command?' ;
END IF ;
PERFORM dblink_exec (connecting , query );
RETURN TRUE ;
END ;
$body$
LANGUAGE 'plpgsql' ;
SELECT "public"."users_command" ('INSERT' , NULL , NULL , 'login' , 'password' );
SELECT "public"."users_command" ('UPDATE' , NULL , NULL , 'login' , 'new password ');
SELECT "public"."users_command" ('UPDATE' , 1, '52DA' , NULL , 'old password' );
SELECT "public"."users_command" ('DELETE' , NULL , NULL , 'login' , NULL );
И да, если не криптуем пароль на уровне базы, знаит делаем это на уровне приложения.
В случае с пользователем, я передаю жесткий список аргументов, хотя можно использовать сериализованную строку, например для альбомов:
SQL код (5)
CREATE OR REPLACE FUNCTION "public"."albums_command"
( command "public"."command_type" ,
album "public"."albums" )
RETURNS boolean AS
$body$
DECLARE
connecting TEXT ;
query TEXT ;
BEGIN
IF album .user_hash IS NOT NULL AND album .id IS NOT NULL THEN
query := ' WHERE id = ' || album .id || ' AND
user_hash = ' || quote_literal (album .user_hash);
ELSIF album .user_hash IS NOT NULL AND album .user_id IS NOT NULL THEN
query := ' WHERE user_id = ' || album .user_id || ' AND
user_hash = ' || quote_literal (album .user_hash);
ELSE
RAISE EXCEPTION 'No arguments for select' ;
END IF ;
SELECT storage_select_by_hash (album .user_hash, TRUE ) INTO connecting ;
IF command = 'DELETE' THEN
query := 'DELETE FROM albums ' || query ;
ELSIF command = 'UPDATE' THEN
query := 'UPDATE albums
SET title = ' || quote_literal (album .title) || query ;
ELSIF command = 'INSERT' THEN
query := 'INSERT INTO albums (user_hash, user_id, title)
VALUES (' || quote_literal (album .user_hash) || ',' ||
album .user_id || ',' ||
quote_literal (album .title) || ')' ;
ELSE
RAISE EXCEPTION 'What the command?' ;
END IF ;
PERFORM dblink_exec (connecting , query );
RETURN TRUE ;
END ;
$body$
LANGUAGE 'plpgsql' ;
SELECT "public"."albums_command" ('INSERT' , (NULL ,'52DA' ,1,'Мой альбом' ,NULL ));
SELECT "public"."albums_command" ('UPDATE' , (1,'52DA' ,NULL ,'Новое название' ,NULL ));
SELECT "public"."albums_command" ('DELETE' , (1,'52DA' ,NULL ,NULL ,NULL ));
Хотя приходится передавать еще поле count_fotos , мы его никак изменить не сможем, поэтому передаются несколько избыточные данные.
Это особенно видно для хранимой процедуры изменения данных для таблицы fotos :
SQL код (6)
CREATE OR REPLACE FUNCTION "public"."fotos_command"
( command "public"."command_type" ,
foto "public"."fotos" )
RETURNS boolean AS
$body$
DECLARE
connecting TEXT ;
query TEXT ;
sets TEXT [];
fields_for_set TEXT [] := ARRAY [ 'album_id' , 'title' ];
field TEXT ;
field_value TEXT ;
flag_update BOOLEAN := FALSE ;
BEGIN
IF foto .user_hash IS NOT NULL AND foto .id IS NOT NULL THEN
query := ' WHERE id = ' || foto .id || ' AND
user_hash = ' || quote_literal (foto .user_hash);
ELSIF foto .user_hash IS NOT NULL AND foto .album_id IS NOT NULL THEN
query := ' WHERE album_id = ' || foto .user_id || ' AND
user_hash = ' || quote_literal (foto .user_hash);
ELSE
RAISE EXCEPTION 'No arguments for select' ;
END IF ;
SELECT storage_select_by_hash (foto .user_hash, TRUE ) INTO connecting ;
IF command = 'DELETE' THEN
query := 'DELETE FROM fotos ' || query ;
ELSIF command = 'UPDATE' THEN
query := ' id = id ' || query ;
FOR field IN SELECT * FROM unnest (fields_for_set ) LOOP
EXECUTE 'SELECT $1.' || field INTO field_value USING foto ;
IF field_value IS NOT NULL THEN
query := field || ' = ' || quote_literal (field_value ) ||
',' || query ;
flag_update := TRUE ;
END IF ;
END LOOP ;
IF flag_update = FALSE THEN
RETURN FALSE ;
END IF ;
query := 'UPDATE fotos
SET ' || query ;
ELSIF command = 'INSERT' THEN
IF foto .user_id IS NULL OR
foto .title IS NULL OR
foto .filename IS NULL
THEN
RAISE EXCEPTION 'Can not insert foto: %' , foto ;
END IF ;
query := 'INSERT INTO fotos ( user_hash,
album_id,
user_id,
title,
filename )
VALUES (' || quote_literal (foto .user_hash) || ',' ||
foto .album_id || ',' ||
foto .user_id || ',' ||
quote_literal (foto .title) || ',' ||
quote_literal (foto .filename) || ')' ;
ELSE
RAISE EXCEPTION 'What the command?' ;
END IF ;
PERFORM dblink_exec (connecting , query );
RETURN TRUE ;
END ;
$body$
LANGUAGE 'plpgsql' ;
SELECT "public"."fotos_command" ('INSERT' , (NULL ,'52DA' ,1,1,'Фото1' ,'file1.jpg' ,NULL ,NULL ,NULL ));
SELECT "public"."fotos_command" ('UPDATE' , (1,'52DA' ,NULL ,NULL ,'Фото2' ,NULL ,NULL ,NULL ,NULL ));
SELECT "public"."fotos_command" ('DELETE' , (1,'52DA' ,NULL ,NULL ,'Фото2' ,NULL ,NULL ,NULL ,NULL ));
В этой процедуре, считается что если поле, которое можно обновлять (fields_for_set ), NULL, то его не обновляем.
Теперь самое фееричное - теги
SQL код (7)
CREATE TYPE "public"."tag_id_hash" AS (
"id" INTEGER ,
"hash" CHAR (4)
);
CREATE OR REPLACE FUNCTION "public"."fotos_tags_command"
( command "public"."command_type" ,
foto_id integer ,
user_hash text ,
tags text [] )
RETURNS boolean AS
$body$
DECLARE
foto_connect TEXT ;
tag_connect TEXT ;
tag_connect_now TEXT ;
query TEXT ;
index_tag index_tags ;
foto_tag fotos_tags ;
income_tags tag_id_hash [];
exist_tags tag_id_hash [];
insert_tag TEXT ;
insert_tags TEXT [];
insert_tags_double TEXT [];
delete_tags INTEGER [];
delete_tags_double INTEGER [];
delete_cursor refcursor ;
album_id INTEGER ;
user_id INTEGER ;
tag_id_tmp INTEGER ;
tag_hash_tmp CHAR (4);
BEGIN
IF command IS NULL OR foto_id IS NULL OR user_hash IS NULL
THEN
RAISE EXCEPTION 'Bad arguments command: %, foto_id: %, user_hash: %' ,
command , foto_id , user_hash ;
END IF ;
IF tags IS NOT NULL THEN
FOR index_tag IN SELECT * FROM get_tags_by_content (tags ) LOOP
income_tags := array_append (income_tags , (index_tag .id, index_tag .tag_hash)::tag_id_hash );
END LOOP ;
END IF ;
IF tags IS NOT NULL OR command = 'DELETE' THEN
SELECT storage_select_by_hash (user_hash , TRUE ) INTO foto_connect ;
query := 'SELECT album_id, user_id FROM fotos WHERE id = ' || foto_id ||
' AND user_hash = ' || quote_literal (user_hash ) || ';' ;
SELECT * INTO album_id , user_id
FROM dblink (foto_connect , query , TRUE ) AS (a INTEGER , u INTEGER );
IF album_id IS NULL THEN
RAISE NOTICE 'No foto for foto_id: %, user_hash: %' , foto_id , user_hash ;
RETURN FALSE ;
END IF ;
query := 'SELECT * FROM fotos_tags WHERE foto_id = ' || foto_id ||
' AND user_hash = ' || quote_literal (user_hash ) || ';' ;
FOR foto_tag IN
SELECT * FROM _dblink_fotos_tags (foto_connect , query , TRUE )
LOOP
exist_tags := array_append (exist_tags , (foto_tag .tag_id, foto_tag .tag_hash)::tag_id_hash );
END LOOP ;
END IF ;
IF command = 'INSERT' THEN
IF tags IS NULL THEN
RAISE NOTICE 'No tags for insert' ;
RETURN FALSE ;
END IF ;
insert_tags_double := NULL ;
FOR tag_id_tmp , tag_hash_tmp IN
SELECT * FROM unnest (income_tags ) EXCEPT
SELECT * FROM unnest (exist_tags ) ORDER BY hash
LOOP
insert_tag := '(' || foto_id || ',' ||
album_id || ',' ||
user_id || ',' ||
quote_literal (user_hash ) || ',' ||
tag_id_tmp || ',' ||
quote_literal (tag_hash_tmp ) || ')' ;
insert_tags := array_append (insert_tags , insert_tag );
SELECT storage_select_by_hash (tag_hash_tmp , TRUE ) INTO tag_connect ;
IF tag_connect_now IS NULL AND tag_connect <> foto_connect THEN
tag_connect_now := tag_connect ;
insert_tags_double := array_append (insert_tags_double , insert_tag );
ELSIF tag_connect_now <> tag_connect AND tag_connect <> foto_connect THEN
query := 'INSERT INTO fotos_tags VALUES ' || array_to_string (insert_tags_double , ',' );
PERFORM dblink_exec (tag_connect_now , query , TRUE );
insert_tags_double := NULL ;
tag_connect_now := tag_connect ;
insert_tags_double := array_append (insert_tags_double , insert_tag );
ELSIF tag_connect <> foto_connect THEN
insert_tags_double := array_append (insert_tags_double , insert_tag );
END IF ;
END LOOP ;
query := 'INSERT INTO fotos_tags VALUES ' || array_to_string (insert_tags , ',' );
PERFORM dblink_exec (foto_connect , query , TRUE );
IF tag_connect_now IS NOT NULL AND tag_connect <> foto_connect THEN
query := 'INSERT INTO fotos_tags VALUES ' || array_to_string (insert_tags_double , ',' );
PERFORM dblink_exec (tag_connect_now , query , TRUE );
insert_tags_double := NULL ;
END IF ;
ELSIF command = 'DELETE' THEN
IF tags IS NULL THEN
OPEN delete_cursor FOR
SELECT * FROM unnest (exist_tags ) ORDER BY hash;
ELSE
OPEN delete_cursor FOR
SELECT * FROM unnest (income_tags ) INTERSECT
SELECT * FROM unnest (exist_tags ) ORDER BY hash;
END IF ;
FETCH delete_cursor INTO tag_id_tmp , tag_hash_tmp ;
delete_tags := NULL ;
delete_tags_double := NULL ;
WHILE tag_id_tmp IS NOT NULL LOOP
delete_tags := array_append (delete_tags , tag_id_tmp );
SELECT storage_select_by_hash (tag_hash_tmp , TRUE ) INTO tag_connect ;
IF tag_connect_now IS NULL AND tag_connect <> foto_connect THEN
tag_connect_now := tag_connect ;
delete_tags_double := array_append (delete_tags_double , tag_id_tmp );
ELSIF tag_connect_now <> tag_connect AND tag_connect <> foto_connect THEN
query := 'DELETE FROM fotos_tags WHERE foto_id = ' || foto_id || ' AND ' ||
' user_hash = ' || quote_literal (user_hash ) || ' AND ' ||
' tag_id IN (' || array_to_string (delete_tags_double , ',' ) || ');' ;
PERFORM dblink_exec (tag_connect_now , query , TRUE );
delete_tags_double := NULL ;
tag_connect_now := tag_connect ;
delete_tags_double := array_append (delete_tags_double , tag_id_tmp );
ELSIF tag_connect <> foto_connect THEN
delete_tags_double := array_append (delete_tags_double , tag_id_tmp );
END IF ;
FETCH delete_cursor INTO tag_id_tmp , tag_hash_tmp ;
END LOOP ;
query := 'DELETE FROM fotos_tags WHERE foto_id = ' || foto_id || ' AND ' ||
' user_hash = ' || quote_literal (user_hash ) || ' AND ' ||
' tag_id IN (' || array_to_string (delete_tags , ',' ) || ');' ;
PERFORM dblink_exec (foto_connect , query , TRUE );
IF tag_connect_now IS NOT NULL AND tag_connect <> foto_connect THEN
query := 'DELETE FROM fotos_tags WHERE foto_id = ' || foto_id || ' AND ' ||
' user_hash = ' || quote_literal (user_hash ) || ' AND ' ||
' tag_id IN (' || array_to_string (delete_tags_double , ',' ) || ');' ;
PERFORM dblink_exec (tag_connect_now , query , TRUE );
delete_tags_double := NULL ;
END IF ;
ELSIF command = 'UPDATE' THEN
IF tags IS NULL THEN
RAISE NOTICE 'No tags for update' ;
RETURN FALSE ;
END IF ;
insert_tags := NULL ;
FOR tag_id_tmp , tag_hash_tmp IN
SELECT * FROM unnest (income_tags ) UNION
SELECT * FROM unnest (exist_tags )
LOOP
IF (tag_id_tmp , tag_hash_tmp )::tag_id_hash = ANY (income_tags ) AND
(tag_id_tmp , tag_hash_tmp )::tag_id_hash <> ALL (exist_tags )
THEN
insert_tag := '(' || foto_id || ',' ||
album_id || ',' ||
user_id || ',' ||
quote_literal (user_hash ) || ',' ||
tag_id_tmp || ',' ||
quote_literal (tag_hash_tmp ) || ')' ;
insert_tags := array_append (insert_tags , insert_tag );
SELECT storage_select_by_hash (tag_hash_tmp , TRUE ) INTO tag_connect ;
IF tag_connect_now IS NULL AND tag_connect <> foto_connect THEN
tag_connect_now := tag_connect ;
insert_tags_double := array_append (insert_tags_double , insert_tag );
ELSIF tag_connect_now <> tag_connect AND tag_connect <> foto_connect THEN
query := 'INSERT INTO fotos_tags VALUES ' || array_to_string (insert_tags_double , ',' );
PERFORM dblink_exec (tag_connect_now , query , TRUE );
insert_tags_double := NULL ;
tag_connect_now := tag_connect ;
insert_tags_double := array_append (insert_tags_double , insert_tag );
ELSIF tag_connect <> foto_connect THEN
insert_tags_double := array_append (insert_tags_double , insert_tag );
END IF ;
ELSIF (tag_id_tmp , tag_hash_tmp )::tag_id_hash <> ALL (income_tags ) AND
(tag_id_tmp , tag_hash_tmp )::tag_id_hash = ANY (exist_tags )
THEN
delete_tags := array_append (delete_tags , tag_id_tmp );
SELECT storage_select_by_hash (tag_hash_tmp , TRUE ) INTO tag_connect ;
IF tag_connect_now IS NULL AND tag_connect <> foto_connect THEN
tag_connect_now := tag_connect ;
delete_tags_double := array_append (delete_tags_double , tag_id_tmp );
ELSIF tag_connect_now <> tag_connect AND tag_connect <> foto_connect THEN
query := 'DELETE FROM fotos_tags WHERE foto_id = ' || foto_id || ' AND ' ||
' user_hash = ' || quote_literal (user_hash ) || ' AND ' ||
' tag_id IN (' || array_to_string (delete_tags_double , ',' ) || ');' ;
PERFORM dblink_exec (tag_connect_now , query , TRUE );
delete_tags_double := NULL ;
tag_connect_now := tag_connect ;
delete_tags_double := array_append (delete_tags_double , tag_id_tmp );
ELSIF tag_connect <> foto_connect THEN
delete_tags_double := array_append (delete_tags_double , tag_id_tmp );
END IF ;
END IF ;
END LOOP ;
IF insert_tags IS NOT NULL THEN
query := 'INSERT INTO fotos_tags VALUES ' || array_to_string (insert_tags , ',' );
PERFORM dblink_exec (foto_connect , query , TRUE );
END IF ;
IF delete_tags IS NOT NULL THEN
query := 'DELETE FROM fotos_tags WHERE foto_id = ' || foto_id || ' AND ' ||
' user_hash = ' || quote_literal (user_hash ) || ' AND ' ||
' tag_id IN (' || array_to_string (delete_tags , ',' ) || ');' ;
PERFORM dblink_exec (foto_connect , query , TRUE );
END IF ;
IF tag_connect_now IS NOT NULL AND
tag_connect <> foto_connect AND
delete_tags_double IS NOT NULL
THEN
query := 'DELETE FROM fotos_tags WHERE foto_id = ' || foto_id || ' AND ' ||
' user_hash = ' || quote_literal (user_hash ) || ' AND ' ||
' tag_id IN (' || array_to_string (delete_tags_double , ',' ) || ');' ;
PERFORM dblink_exec (tag_connect_now , query , TRUE );
delete_tags_double := NULL ;
END IF ;
IF tag_connect_now IS NOT NULL AND
tag_connect <> foto_connect AND
insert_tags_double IS NOT NULL
THEN
query := 'INSERT INTO fotos_tags VALUES ' || array_to_string (insert_tags_double , ',' );
PERFORM dblink_exec (tag_connect_now , query , TRUE );
insert_tags_double := NULL ;
END IF ;
ELSE
RAISE EXCEPTION 'What the command?' ;
END IF ;
RETURN TRUE ;
END ;
$body$
LANGUAGE 'plpgsql' ;
SELECT fotos_tags_command ('INSERT' , 1, '52DA' , ARRAY ['a' , 'b' , 'c' ]);
SELECT fotos_tags_command ('UPDATE' , 1, '52DA' , ARRAY ['b' , 'c' , 'd' ]);
SELECT fotos_tags_command ('DELETE' , 1, '52DA' , ARRAY ['b' ]);
SELECT fotos_tags_command ('DELETE' , 1, '52DA' , NULL );
Несколько пояснений по принципу работы. Для каждой из операции определяются пересечения существующего списка тегов и списка тегов пришедшего в запросе, это позволяет минимизировать количество запросов, но усложняет логику. Так же сложности добавляет двойная запись тегов, правда запросы тоже группируются по хранилищам.
Еще видно, UPDATE это практически копия вставки и удаления, поэтому возможно оптимальней сделать раздельные функции вставки и одаления и использовать их.
Теперь мы можем вставлять, изменять и удалять практически любые данные проекта. Остается, по моему мнению, самое простое - выборки данных.