DOC.PROTOTYPES.RU

Равномерное распределение с помощью dblink

Проксирующая база данных

Управление данными

Вставка записи

Для вставки записей будем использовать триггеры на наши таблицы проксирующего сервера, в сами таблицы записи мы вставлять не будем, только во внешние таблицы, посему, переполнения боятся нет смысла.

Но не для всех таблиц нам нужна вставка, так, например, таблицы рейтингов и последние фото обновляются у нас в процессе изменения данных в хранилищах, поэтому на вставку в эти таблицы ставим заглушки.

SQL код (1)
-- Триггер для таблицы users
CREATE OR REPLACE FUNCTION "public"."users_insert_trigger" () RETURNS trigger AS
$body$
DECLARE
    connecting      TEXT;
    query           TEXT;
BEGIN
-- Проверяем обязательные поля и формируем остальное
    IF NEW.login IS NULL OR NEW.pass IS NULL THEN
        RAISE EXCEPTION 'Can''t insert row users % ', NEW;
    END IF;
-- Устанавливаем user_hash
    SELECT get_md5hash(NEW.login) INTO NEW.user_hash;
-- Выбираем распределенный сервер
    SELECT storage_select_by_hash(NEW.user_hash, TRUE) INTO connecting;
-- Формируем запрос
    query := 'INSERT INTO users (user_hash, login, pass)
                    VALUES (' ||
                                quote_literal(NEW.user_hash) || ',' ||
                                quote_literal(NEW.login) || ',' ||
                                quote_literal(NEW.pass) || ')';
    PERFORM dblink_exec(connecting, query);
-- Возвращаем NULL что бы не вставлять запись в локальную таблицу
    RETURN NULL;
END;
$body$
LANGUAGE 'plpgsql';

CREATE TRIGGER "users_insert" BEFORE INSERT 
    ON "public"."users" FOR EACH ROW 
    EXECUTE PROCEDURE "public"."users_insert_trigger"();
    
-- Триггер для таблицы albums
CREATE OR REPLACE FUNCTION "public"."albums_insert_trigger" () RETURNS trigger AS
$body$
DECLARE
    connecting      TEXT;
    query           TEXT;
BEGIN
-- Проверяем обязательные поля и формируем остальное
    IF NEW.user_hash IS NULL OR NEW.user_id IS NULL THEN
        RAISE EXCEPTION 'Can''t insert row albums % ', NEW;
    END IF;
-- Выбираем распределенный сервер
    SELECT storage_select_by_hash(NEW.user_hash, TRUE) INTO connecting;
-- Формируем запрос
    query := 'INSERT INTO albums (user_hash, user_id, title)
                    VALUES (' ||
                                quote_literal(NEW.user_hash) || ',' ||
                                NEW.user_id || ',' ||
                                quote_literal(NEW.title) || ')';
    PERFORM dblink_exec(connecting, query);
-- Возвращаем NULL что бы не вставлять запись в локальную таблицу
    RETURN NULL;
END;
$body$
LANGUAGE 'plpgsql';

CREATE TRIGGER "albums_insert" BEFORE INSERT 
    ON "public"."albums" FOR EACH ROW 
    EXECUTE PROCEDURE "public"."albums_insert_trigger"();
    
-- Триггер для таблицы fotos
CREATE FUNCTION "fotos_insert_trigger" () RETURNS trigger AS
$body$
DECLARE
    connecting      TEXT;
    query           TEXT;
BEGIN
-- Проверяем обязательные поля и формируем остальное
    IF  NEW.user_hash IS NULL OR 
        NEW.user_id IS NULL OR 
        NEW.album_id IS NULL THEN
        RAISE EXCEPTION 'Can''t insert row foto % ', NEW;
    END IF;
-- Выбираем распределенный сервер
    SELECT storage_select_by_hash(NEW.user_hash, TRUE) INTO connecting;
-- Формируем запрос
    query := 'INSERT INTO fotos (   user_hash, 
                                    album_id, 
                                    user_id,
                                    title,
                                    filename )
                    VALUES (' ||
                                quote_literal(NEW.user_hash) || ',' ||
                                NEW.album_id || ',' ||
                                NEW.user_id || ',' ||
                                quote_literal(NEW.title) || ',' ||
                                quote_literal(NEW.filename) || ')';
    PERFORM dblink_exec(connecting, query);
-- Возвращаем NULL что бы не вставлять запись в локальную таблицу
    RETURN NULL;
END;
$body$
LANGUAGE 'plpgsql';

CREATE TRIGGER "fotos_insert" BEFORE INSERT 
    ON "public"."fotos" FOR EACH ROW 
    EXECUTE PROCEDURE "fotos_insert_trigger"();
    
-- Триггер для таблицы fotos_votes
CREATE FUNCTION "fotos_votes_insert_trigger" () RETURNS trigger AS
$body$
DECLARE
    connecting      TEXT;
    query           TEXT;
BEGIN
-- Проверяем обязательные поля и формируем остальное
    IF  NEW.foto_id IS NULL OR 
        NEW.foto_hash IS NULL OR
        NEW.user_id IS NULL OR
        NEW.user_hash IS NULL OR
        NEW.vote IS NULL
    THEN
        RAISE EXCEPTION 'Can''t insert row fotos_votes % ', NEW;
    END IF;
-- Выбираем распределенный сервер
    SELECT storage_select_by_hash(NEW.foto_hash, TRUE) INTO connecting;
-- Формируем запрос, для голосования у нас есть своя процедура
    query := 'SELECT votes_users((' ||
                                NEW.foto_id || ',' ||
                                quote_literal(NEW.foto_hash) || ',' ||
                                NEW.user_id || ',' ||
                                quote_literal(NEW.user_hash) || ',' ||
                                NEW.vote || '::smallint))';
    PERFORM (SELECT * FROM dblink(connecting, query) AS (f BOOLEAN));
-- Возвращаем NULL что бы не вставлять запись в локальную таблицу
    RETURN NULL;
END;
$body$
LANGUAGE 'plpgsql';

CREATE TRIGGER "fotos_votes_insert" BEFORE INSERT 
    ON "public"."fotos_votes" FOR EACH ROW 
    EXECUTE PROCEDURE "fotos_votes_insert_trigger"();

CREATE OR REPLACE FUNCTION "public"."fotos_tags_insert_trigger" () RETURNS trigger AS
$body$
DECLARE
    connecting      TEXT;
    connecting_tag  TEXT;
    query           TEXT;
BEGIN
-- Проверяем обязательные поля и формируем остальное
    IF  NEW.foto_id IS NULL OR 
        NEW.album_id IS NULL OR
        NEW.user_id IS NULL OR
        NEW.user_hash IS NULL OR
        NEW.tag_id IS NULL OR
        NEW.tag_hash IS NULL
    THEN
        RAISE EXCEPTION 'Can''t insert row fotos_tags % ', NEW;
    END IF;
-- Выбираем распределенный сервер для фото
    SELECT storage_select_by_hash(NEW.user_hash, TRUE) INTO connecting;
-- Формируем запрос
    query := 'INSERT INTO fotos_tags (
                                        foto_id,
                                        album_id,
                                        user_id,
                                        user_hash, 
                                        tag_id,
                                        tag_hash )
                    VALUES (' ||
                                NEW.foto_id || ',' ||
                                NEW.album_id || ',' ||
                                NEW.user_id || ',' ||
                                quote_literal(NEW.user_hash) || ',' ||
                                NEW.tag_id || ',' ||
                                quote_literal(NEW.tag_hash) || ')';
    PERFORM dblink_exec(connecting, query);
-- Выбираем распределенный сервер для тега
    SELECT storage_select_by_hash(NEW.tag_hash, TRUE) INTO connecting_tag;
-- Стоит ли нам делать двойную запись
    IF connecting_tag <> connecting THEN
-- Запись будет двойная
        PERFORM dblink_exec(connecting_tag, query);
-- Здесь еще стоит поставить проверку на правильность отработки сервера, так как
-- Первую вставку мы уже сделали
    END IF;
-- Возвращаем NULL что бы не вставлять запись в локальную таблицу
    RETURN NULL;
END;
$body$
LANGUAGE 'plpgsql';

CREATE TRIGGER "fotos_tags_insert" BEFORE INSERT 
    ON "public"."fotos_tags" FOR EACH ROW 
    EXECUTE PROCEDURE "public"."fotos_tags_insert_trigger"();

Как видно, записи у нас реально будут вставляться только для распределенных таблиц. Индексная база же по большей части изменяется на уровне хранилищ, за исключением таблицы тегов, но о ней позднее.

Делаем триггера-заглушки, на всякий случай.

SQL код (2)
CREATE FUNCTION "trigger_rollback_insert" () RETURNS trigger AS
$body$
BEGIN
    RETURN NULL;
END;
$body$
LANGUAGE 'plpgsql';

CREATE TRIGGER "index_last_foto_insert" BEFORE INSERT 
    ON "public"."index_last_foto" FOR EACH ROW 
    EXECUTE PROCEDURE "public"."trigger_rollback_insert"();

CREATE TRIGGER "index_rating_foto_insert" BEFORE INSERT 
    ON "public"."index_rating_foto" FOR EACH ROW 
    EXECUTE PROCEDURE "public"."trigger_rollback_insert"();

CREATE TRIGGER "index_tags_insert" BEFORE INSERT 
    ON "public"."index_tags" FOR EACH ROW 
    EXECUTE PROCEDURE "public"."trigger_rollback_insert"();

CREATE TRIGGER "index_top_foto_insert" BEFORE INSERT 
    ON "public"."index_top_foto" FOR EACH ROW 
    EXECUTE PROCEDURE "public"."trigger_rollback_insert"();

И да, если вдруг кто не знал, в PostgreSQL Не обязательно для каждого триггера создавать свою хранимую процедуру, можно использовать существующую.

Остается таблица тегов (index_tags), по сути нам не интересна вставка данных как таковая, нас интересует только выборка, причем, если тега такого еще у нас нет, то мы все равно должны его вернуть, предварительно вставив. Поэтому сделаем хранимую процедуру для выборки тегов, которая будет сама, по надобности, вставлять записи в таблицу:

SQL код (3)
CREATE OR REPLACE FUNCTION "public"."get_tags_by_content" (income_tags text[])
    RETURNS SETOF "public"."index_tags" AS
$body$
DECLARE
    tag_row             index_tags;
    tag_rows            index_tags[];
    tag_exists          TEXT[];
    tmp_tag             TEXT;
    tag_hash            TEXT;
    query               TEXT;
    query_exec          TEXT;
    index_connect       TEXT;
    income_tmp          TEXT[];
BEGIN
-- Создаем запрос для выборки всех пришедших тегов
-- Сразу уберем повторы и переведем все теги в нижний регистр
    query := 'SELECT * FROM index_tags WHERE tag IN (';
    FOR tmp_tag IN SELECT * FROM unnest(income_tags) LOOP
        IF tmp_tag = ANY (income_tmp) THEN ELSE
            tmp_tag := lower(tmp_tag);
            query := query || quote_literal(tmp_tag) || ',';
            income_tmp := array_append(income_tmp, tmp_tag);
        END IF;
    END LOOP;
    query := query || ''''')';
-- Получаем соединение к индексной базе
    SELECT index_db_select(TRUE) INTO index_connect;
-- Заполняем выходной массив данными с существующими тегами
    FOR tag_row IN 
        SELECT * 
            FROM _dblink_index_tags(index_connect, query, TRUE)
    LOOP
        RETURN NEXT tag_row;
        tag_exists := array_append(tag_exists, tag_row.tag);
    END LOOP;
-- Определяем все ли теги мы получили и если надо то создаем их и возвращаем
    query := NULL;
    FOR tmp_tag IN SELECT * FROM unnest(income_tags) LOOP
-- Теги всегда строчными буквами будут
        tmp_tag := lower(tmp_tag);
        IF tmp_tag <> ALL (tag_exists) THEN
-- Такого тега нет, создаем
            SELECT get_md5hash(tmp_tag) INTO tag_hash;
            query_exec := 'INSERT INTO index_tags (tag, tag_hash)
                               VALUES (' ||
                                   quote_literal(tmp_tag) || ',' ||
                                   quote_literal(tag_hash) || ')';
            PERFORM dblink_exec(index_connect, query_exec);
            query := query || quote_literal(tmp_tag) || ',';
        END IF;
    END LOOP;
-- Делаем запрос на получение только что вставленных тегов
    IF query IS NOT NULL THEN 
        query := 'SELECT * FROM index_tags WHERE tag IN (' || query || ''''')';
        FOR tag_row IN 
            SELECT * 
                FROM _dblink_index_tags(index_connect, query, TRUE)
        LOOP
            RETURN NEXT tag_row;
        END LOOP;
    END IF;
END;
$body$
LANGUAGE 'plpgsql';

Вот так, теперь мы можем вставлять записи практически в любую таблицу.

Изменение записей

В приложении обычно изменение записей производится на уровне одной строки, всякие списочные и глобальные изменения имеют место быть, но они либо разовые, либо жестко специализированы.

В нашем случае, можно определить следующий список возможных запросов на изменение данных:

Как видно, у каждой операции однотипные фильтры для каждой таблицы (если это не так, то это надо сделать), значит не обязательно делать для каждого действия свою процедуру:

Так же, кроме команд на обновление и удаление я сделаю обработку команд на вставку, это, конечно избыточное решение, так как за вставку у нас отвечают триггера, но тем не менее это один из вариантов решения и его опускать не совсем правильно.

Итак, хранимая процедура для пользователей:

SQL код (4)
-- Действия над пользователем:
CREATE OR REPLACE FUNCTION "public"."users_command"
        (   command     "public"."command_type",
            id          integer,
            hash        text,
            login       text,
            pass        text )
    RETURNS boolean AS
$body$
DECLARE
    connecting          TEXT;
    query               TEXT;
    user_hash           TEXT    := hash;
BEGIN
-- Определимся с условием для запроса
    IF id IS NOT NULL AND user_hash IS NOT NULL THEN
        query := ' WHERE id = ' || id || ' AND 
                         user_hash = ' || quote_literal(user_hash);
    ELSIF login IS NOT NULL THEN
        SELECT get_md5hash(login) INTO user_hash;
        query := ' WHERE login = ' || quote_literal(login) || ' AND 
                         user_hash = ' || quote_literal(user_hash);
    ELSE
        RAISE EXCEPTION 'No arguments for select';
    END IF;
-- Определяемся с хранилищем
    SELECT storage_select_by_hash(user_hash, TRUE) INTO connecting;
-- Определяем что у нас за команда и в соответсвии с ней дописываем запрос
    IF command = 'DELETE' THEN
        query := 'DELETE FROM users ' || query;
    ELSIF command = 'UPDATE' THEN
        IF pass IS NULL THEN RAISE EXCEPTION 'The pass is empty'; END IF;
        query := 'UPDATE users SET pass = ' || quote_literal(pass) || query;
    ELSIF command = 'INSERT' THEN
        IF pass IS NULL OR login IS NULL THEN 
            RAISE EXCEPTION 'The pass or login is empty'; 
        END IF;
        query := 'INSERT INTO users (user_hash, login, pass) 
                   VALUES (' || quote_literal(user_hash) || ',' ||
                                quote_literal(login) || ',' ||
                                quote_literal(pass) || ')';
    ELSE
        RAISE EXCEPTION 'What the command?';
    END IF;
    PERFORM dblink_exec(connecting, query);
    RETURN TRUE;
END;
$body$
LANGUAGE 'plpgsql';

-- Примеры:
SELECT "public"."users_command"('INSERT', NULL, NULL, 'login', 'password');
SELECT "public"."users_command"('UPDATE', NULL, NULL, 'login', 'new password');
SELECT "public"."users_command"('UPDATE', 1, '52DA', NULL, 'old password');
SELECT "public"."users_command"('DELETE', NULL, NULL, 'login', NULL);

И да, если не криптуем пароль на уровне базы, знаит делаем это на уровне приложения.

В случае с пользователем, я передаю жесткий список аргументов, хотя можно использовать сериализованную строку, например для альбомов:

SQL код (5)
-- Действия над альбомом:
CREATE OR REPLACE FUNCTION "public"."albums_command"
        (   command     "public"."command_type",
            album       "public"."albums" )
    RETURNS boolean AS
$body$
DECLARE
    connecting          TEXT;
    query               TEXT;
BEGIN
-- Определимся с условием для запроса
    IF album.user_hash IS NOT NULL AND album.id IS NOT NULL THEN
        query := ' WHERE id = ' || album.id || ' AND 
                    user_hash = ' || quote_literal(album.user_hash);
    ELSIF album.user_hash IS NOT NULL AND album.user_id IS NOT NULL THEN
-- На самом деле, это условие для групповой операции, действие над всеми 
-- альбомами пользователя, задачи пока нет, но пусть будет возможность
        query := ' WHERE user_id = ' || album.user_id || ' AND 
                    user_hash = ' || quote_literal(album.user_hash);
    ELSE    
        RAISE EXCEPTION 'No arguments for select';
    END IF;
-- Определяемся с хранилищем
    SELECT storage_select_by_hash(album.user_hash, TRUE) INTO connecting;
-- Определяем что у нас за команда и в соответсвии с ней дописываем запрос
    IF command = 'DELETE' THEN
        query := 'DELETE FROM albums ' || query;
    ELSIF command = 'UPDATE' THEN
        query := 'UPDATE albums 
                    SET title = ' || quote_literal(album.title) || query;
    ELSIF command = 'INSERT' THEN
        query := 'INSERT INTO albums (user_hash, user_id, title) 
                   VALUES (' || quote_literal(album.user_hash) || ',' ||
                                album.user_id || ',' ||
                                quote_literal(album.title) || ')';
    ELSE
        RAISE EXCEPTION 'What the command?';
    END IF;
    PERFORM dblink_exec(connecting, query);
    RETURN TRUE;
END;
$body$
LANGUAGE 'plpgsql';

-- Примеры:
SELECT "public"."albums_command"('INSERT', (NULL,'52DA',1,'Мой альбом',NULL));
SELECT "public"."albums_command"('UPDATE', (1,'52DA',NULL,'Новое название',NULL));
SELECT "public"."albums_command"('DELETE', (1,'52DA',NULL,NULL,NULL));

Хотя приходится передавать еще поле count_fotos, мы его никак изменить не сможем, поэтому передаются несколько избыточные данные.

Это особенно видно для хранимой процедуры изменения данных для таблицы fotos:

SQL код (6)
CREATE OR REPLACE FUNCTION "public"."fotos_command"
        (   command     "public"."command_type",
            foto        "public"."fotos"    )
    RETURNS boolean AS
$body$
DECLARE
    connecting          TEXT;
    query               TEXT;
    sets                TEXT[];
    fields_for_set      TEXT[]      := ARRAY[ 'album_id', 'title' ];
    field               TEXT;
    field_value         TEXT;
    flag_update         BOOLEAN     := FALSE;
BEGIN
-- Определимся с условием для запроса
    IF foto.user_hash IS NOT NULL AND foto.id IS NOT NULL THEN
        query := ' WHERE id = ' || foto.id || ' AND 
                    user_hash = ' || quote_literal(foto.user_hash);
    ELSIF foto.user_hash IS NOT NULL AND foto.album_id IS NOT NULL THEN
-- Так же, это условие для групповой операции, действие над всеми 
-- картинками альбомама, задачи пока нет, но пусть будет возможность
        query := ' WHERE album_id = ' || foto.user_id || ' AND 
                    user_hash = ' || quote_literal(foto.user_hash);
    ELSE    
        RAISE EXCEPTION 'No arguments for select';
    END IF;
-- Определяемся с хранилищем
    SELECT storage_select_by_hash(foto.user_hash, TRUE) INTO connecting;
-- Определяем что у нас за команда и в соответсвии с ней дописываем запрос
    IF command = 'DELETE' THEN
        query := 'DELETE FROM fotos ' || query;
    ELSIF command = 'UPDATE' THEN
-- Тут сделаем небольшую проверку, что если передаем в поле NULL, 
-- то его не обновляем:
        query := ' id = id ' || query;
        FOR field IN SELECT * FROM unnest(fields_for_set) LOOP
-- Увы имплементить переменную как поле типа постре не может, 
-- поэтому приходится делать небольшой финт ушами:
            EXECUTE 'SELECT $1.' || field INTO field_value USING foto;
            IF field_value IS NOT NULL THEN
                query := field || ' = ' || quote_literal(field_value) || 
                         ',' || query;
                flag_update := TRUE;
            END IF;
        END LOOP;
        IF flag_update = FALSE THEN
-- Собственно, а обновлять нам нечего
            RETURN FALSE;
        END IF;
        query := 'UPDATE fotos 
                    SET ' || query;
    ELSIF command = 'INSERT' THEN
-- Дополнительная проверка обязательных полей
        IF  foto.user_id IS NULL OR
            foto.title IS NULL OR
            foto.filename IS NULL
        THEN
            RAISE EXCEPTION 'Can not insert foto: %', foto;
        END IF;
        query := 'INSERT INTO fotos (   user_hash, 
                                        album_id,
                                        user_id, 
                                        title,
                                        filename ) 
                   VALUES (' || quote_literal(foto.user_hash) || ',' ||
                                foto.album_id || ',' ||
                                foto.user_id || ',' ||
                                quote_literal(foto.title) || ',' ||
                                quote_literal(foto.filename) || ')';
    ELSE
        RAISE EXCEPTION 'What the command?';
    END IF;
    PERFORM dblink_exec(connecting, query);
    RETURN TRUE;
END;
$body$
LANGUAGE 'plpgsql';

-- Ну и примеры:
SELECT "public"."fotos_command"('INSERT', (NULL,'52DA',1,1,'Фото1','file1.jpg',NULL,NULL,NULL));
SELECT "public"."fotos_command"('UPDATE', (1,'52DA',NULL,NULL,'Фото2',NULL,NULL,NULL,NULL));
SELECT "public"."fotos_command"('DELETE', (1,'52DA',NULL,NULL,'Фото2',NULL,NULL,NULL,NULL));

В этой процедуре, считается что если поле, которое можно обновлять (fields_for_set), NULL, то его не обновляем.

Теперь самое фееричное - теги

SQL код (7)
-- Сделаем вспомогательный составной тип
CREATE TYPE "public"."tag_id_hash" AS (
  "id"      INTEGER,
  "hash"    CHAR(4)
);

CREATE OR REPLACE FUNCTION "public"."fotos_tags_command"
        (   command     "public"."command_type",
            foto_id     integer,
            user_hash   text,
            tags        text[]  )
    RETURNS boolean AS
$body$
DECLARE
    foto_connect        TEXT;
    tag_connect         TEXT;
    tag_connect_now     TEXT;
    query               TEXT;
    index_tag           index_tags;
    foto_tag            fotos_tags;
    income_tags         tag_id_hash[];
    exist_tags          tag_id_hash[];
    insert_tag          TEXT;
    insert_tags         TEXT[];
    insert_tags_double  TEXT[];
    delete_tags         INTEGER[];
    delete_tags_double  INTEGER[];
    delete_cursor       refcursor;
    album_id            INTEGER;
    user_id             INTEGER;
    tag_id_tmp          INTEGER;
    tag_hash_tmp        CHAR(4);
BEGIN
-- Проверяем входящие данные
    IF command IS NULL OR foto_id IS NULL OR user_hash IS NULL 
    THEN
        RAISE EXCEPTION 'Bad arguments command: %, foto_id: %, user_hash: %',
            command, foto_id, user_hash;
    END IF;
-- Получаем переданные теги, а так же теги которые у нас есть для фото, 
-- если передан список тегов:
    IF tags IS NOT NULL THEN
-- Список пришедших тегов
        FOR index_tag IN SELECT * FROM get_tags_by_content(tags) LOOP
            income_tags := array_append(income_tags, (index_tag.id, index_tag.tag_hash)::tag_id_hash);
        END LOOP;
    END IF;
    IF tags IS NOT NULL OR command = 'DELETE' THEN
-- Нам понадобится album_id и user_id фото, заодно и проверим, существует ли оно
        SELECT storage_select_by_hash(user_hash, TRUE) INTO foto_connect;
        query := 'SELECT album_id, user_id FROM fotos WHERE id = ' || foto_id || 
                        ' AND user_hash = ' || quote_literal(user_hash) || ';'; 
        SELECT * INTO album_id, user_id
            FROM dblink(foto_connect, query, TRUE) AS (a INTEGER, u INTEGER);
        IF album_id IS NULL THEN
            RAISE NOTICE 'No foto for foto_id: %, user_hash: %', foto_id, user_hash;
            RETURN FALSE;
        END IF;
-- Список существующих тегов фото
        query := 'SELECT * FROM fotos_tags WHERE foto_id = ' || foto_id || 
                        ' AND user_hash = ' || quote_literal(user_hash) || ';';
        FOR foto_tag IN 
            SELECT * FROM _dblink_fotos_tags(foto_connect, query, TRUE)
        LOOP
            exist_tags := array_append(exist_tags, (foto_tag.tag_id, foto_tag.tag_hash)::tag_id_hash);
        END LOOP;
    END IF;
-- Выполняем действия в зависимости от команды:
-- Если втавка тегов
    IF command = 'INSERT' THEN
-- Входящий список не должен быть пуст
        IF tags IS NULL THEN
            RAISE NOTICE 'No tags for insert';
            RETURN FALSE;
        END IF;
-- Вставляем только те теги которых у нас еще нет:
        insert_tags_double := NULL;
        FOR tag_id_tmp, tag_hash_tmp IN 
            SELECT * FROM unnest(income_tags) EXCEPT
            SELECT * FROM unnest(exist_tags) ORDER BY hash
        LOOP
-- Формируем список для основной вставки
            insert_tag := '(' || foto_id || ',' ||
                                 album_id || ',' ||
                                 user_id || ',' ||
                                 quote_literal(user_hash) || ',' ||
                                 tag_id_tmp || ',' ||
                                 quote_literal(tag_hash_tmp) || ')';
            insert_tags := array_append(insert_tags, insert_tag);
-- Этот блок для двойных записей
            SELECT storage_select_by_hash(tag_hash_tmp, TRUE) INTO tag_connect;
            IF tag_connect_now IS NULL AND tag_connect <> foto_connect THEN
                tag_connect_now := tag_connect;
                insert_tags_double := array_append(insert_tags_double, insert_tag);
            ELSIF tag_connect_now <> tag_connect AND tag_connect <> foto_connect THEN
                query := 'INSERT INTO fotos_tags VALUES ' || array_to_string(insert_tags_double, ',');
                PERFORM dblink_exec(tag_connect_now, query, TRUE);
                insert_tags_double := NULL;
                tag_connect_now := tag_connect;
                insert_tags_double := array_append(insert_tags_double, insert_tag);
            ELSIF tag_connect <> foto_connect THEN
                insert_tags_double := array_append(insert_tags_double, insert_tag);
            END IF;
        END LOOP;
        query := 'INSERT INTO fotos_tags VALUES ' || array_to_string(insert_tags, ',');
        PERFORM dblink_exec(foto_connect, query, TRUE);
-- и последний дублирующий
        IF tag_connect_now IS NOT NULL AND tag_connect <> foto_connect THEN
            query := 'INSERT INTO fotos_tags VALUES ' || array_to_string(insert_tags_double, ',');
            PERFORM dblink_exec(tag_connect_now, query, TRUE);
            insert_tags_double := NULL;
        END IF;
-- Если удаление тегов
    ELSIF command = 'DELETE' THEN
-- Варианта два, либо удаляем все либо только часть
        IF tags IS NULL THEN    -- Удаляем все
            OPEN delete_cursor FOR 
                SELECT * FROM unnest(exist_tags) ORDER BY hash;
        ELSE                    -- Или только пересечение
            OPEN delete_cursor FOR 
                SELECT * FROM unnest(income_tags) INTERSECT
                SELECT * FROM unnest(exist_tags) ORDER BY hash;
        END IF;
        FETCH delete_cursor INTO tag_id_tmp, tag_hash_tmp;
        delete_tags := NULL;
        delete_tags_double := NULL;
        WHILE tag_id_tmp IS NOT NULL LOOP
-- Формируем список для основного удаления
            delete_tags := array_append(delete_tags, tag_id_tmp);
-- Этот блок для двойных записей
            SELECT storage_select_by_hash(tag_hash_tmp, TRUE) INTO tag_connect;
            IF tag_connect_now IS NULL AND tag_connect <> foto_connect THEN
                tag_connect_now := tag_connect;
                delete_tags_double := array_append(delete_tags_double, tag_id_tmp);
            ELSIF tag_connect_now <> tag_connect AND tag_connect <> foto_connect THEN
                query := 'DELETE FROM fotos_tags WHERE foto_id = ' || foto_id || ' AND ' ||
                            ' user_hash = ' || quote_literal(user_hash) || ' AND ' ||
                            ' tag_id IN (' || array_to_string(delete_tags_double, ',') || ');';
                PERFORM dblink_exec(tag_connect_now, query, TRUE);
                delete_tags_double := NULL;
                tag_connect_now := tag_connect;
                delete_tags_double := array_append(delete_tags_double, tag_id_tmp);
            ELSIF tag_connect <> foto_connect THEN
                delete_tags_double := array_append(delete_tags_double, tag_id_tmp);
            END IF;
            FETCH delete_cursor INTO tag_id_tmp, tag_hash_tmp;
        END LOOP;
        query := 'DELETE FROM fotos_tags WHERE foto_id = ' || foto_id || ' AND ' ||
                    ' user_hash = ' || quote_literal(user_hash) || ' AND ' ||
                    ' tag_id IN (' || array_to_string(delete_tags, ',') || ');';
        PERFORM dblink_exec(foto_connect, query, TRUE);
-- и последний дублирующий
        IF tag_connect_now IS NOT NULL AND tag_connect <> foto_connect THEN
            query := 'DELETE FROM fotos_tags WHERE foto_id = ' || foto_id || ' AND ' ||
                        ' user_hash = ' || quote_literal(user_hash) || ' AND ' ||
                        ' tag_id IN (' || array_to_string(delete_tags_double, ',') || ');';
            PERFORM dblink_exec(tag_connect_now, query, TRUE);
            delete_tags_double := NULL;
        END IF;
-- Если обновление тегов
    ELSIF command = 'UPDATE' THEN
-- Входящий список не должен быть пуст
        IF tags IS NULL THEN
            RAISE NOTICE 'No tags for update';
            RETURN FALSE;
        END IF;
-- Объединяем список входящих тегов и существующих
        insert_tags := NULL;
        FOR tag_id_tmp, tag_hash_tmp IN 
            SELECT * FROM unnest(income_tags) UNION
            SELECT * FROM unnest(exist_tags) 
        LOOP
            IF  (tag_id_tmp, tag_hash_tmp)::tag_id_hash = ANY (income_tags) AND
                (tag_id_tmp, tag_hash_tmp)::tag_id_hash <> ALL (exist_tags)
            THEN -- этот тег вставлять
-- Формируем список для основной вставки
                insert_tag := '(' || foto_id || ',' ||
                                     album_id || ',' ||
                                     user_id || ',' ||
                                     quote_literal(user_hash) || ',' ||
                                     tag_id_tmp || ',' ||
                                     quote_literal(tag_hash_tmp) || ')';
                insert_tags := array_append(insert_tags, insert_tag);
-- Этот блок для двойных записей
                SELECT storage_select_by_hash(tag_hash_tmp, TRUE) INTO tag_connect;
                IF tag_connect_now IS NULL AND tag_connect <> foto_connect THEN
                    tag_connect_now := tag_connect;
                    insert_tags_double := array_append(insert_tags_double, insert_tag);
                ELSIF tag_connect_now <> tag_connect AND tag_connect <> foto_connect THEN
                    query := 'INSERT INTO fotos_tags VALUES ' || array_to_string(insert_tags_double, ',');
                    PERFORM dblink_exec(tag_connect_now, query, TRUE);
                    insert_tags_double := NULL;
                    tag_connect_now := tag_connect;
                    insert_tags_double := array_append(insert_tags_double, insert_tag);
                ELSIF tag_connect <> foto_connect THEN
                    insert_tags_double := array_append(insert_tags_double, insert_tag);
                END IF;
            ELSIF (tag_id_tmp, tag_hash_tmp)::tag_id_hash <> ALL (income_tags) AND
                (tag_id_tmp, tag_hash_tmp)::tag_id_hash = ANY (exist_tags)
            THEN -- этот тег удалять
-- Формируем список для основного удаления
                delete_tags := array_append(delete_tags, tag_id_tmp);
-- Этот блок для двойных записей
                SELECT storage_select_by_hash(tag_hash_tmp, TRUE) INTO tag_connect;
                IF tag_connect_now IS NULL AND tag_connect <> foto_connect THEN
                    tag_connect_now := tag_connect;
                    delete_tags_double := array_append(delete_tags_double, tag_id_tmp);
                ELSIF tag_connect_now <> tag_connect AND tag_connect <> foto_connect THEN
                    query := 'DELETE FROM fotos_tags WHERE foto_id = ' || foto_id || ' AND ' ||
                                ' user_hash = ' || quote_literal(user_hash) || ' AND ' ||
                                ' tag_id IN (' || array_to_string(delete_tags_double, ',') || ');';
                    PERFORM dblink_exec(tag_connect_now, query, TRUE);
                    delete_tags_double := NULL;
                    tag_connect_now := tag_connect;
                    delete_tags_double := array_append(delete_tags_double, tag_id_tmp);
                ELSIF tag_connect <> foto_connect THEN
                    delete_tags_double := array_append(delete_tags_double, tag_id_tmp);
                END IF;
            END IF;
        END LOOP;
        IF insert_tags IS NOT NULL THEN
            query := 'INSERT INTO fotos_tags VALUES ' || array_to_string(insert_tags, ',');
            PERFORM dblink_exec(foto_connect, query, TRUE);
        END IF;
        IF delete_tags IS NOT NULL THEN
            query := 'DELETE FROM fotos_tags WHERE foto_id = ' || foto_id || ' AND ' ||
                        ' user_hash = ' || quote_literal(user_hash) || ' AND ' ||
                        ' tag_id IN (' || array_to_string(delete_tags, ',') || ');';
            PERFORM dblink_exec(foto_connect, query, TRUE);
        END IF;
-- и последний дублирующий
        IF  tag_connect_now IS NOT NULL AND 
            tag_connect <> foto_connect AND
            delete_tags_double IS NOT NULL
        THEN
            query := 'DELETE FROM fotos_tags WHERE foto_id = ' || foto_id || ' AND ' ||
                        ' user_hash = ' || quote_literal(user_hash) || ' AND ' ||
                        ' tag_id IN (' || array_to_string(delete_tags_double, ',') || ');';
            PERFORM dblink_exec(tag_connect_now, query, TRUE);
            delete_tags_double := NULL;
        END IF;
-- и последний дублирующий
        IF  tag_connect_now IS NOT NULL AND 
            tag_connect <> foto_connect AND
            insert_tags_double IS NOT NULL
        THEN
            query := 'INSERT INTO fotos_tags VALUES ' || array_to_string(insert_tags_double, ',');
            PERFORM dblink_exec(tag_connect_now, query, TRUE);
            insert_tags_double := NULL;
        END IF;
    ELSE
        RAISE EXCEPTION 'What the command?';
    END IF;
    RETURN TRUE;
END;
$body$
LANGUAGE 'plpgsql';

-- Примеры использования:
SELECT fotos_tags_command('INSERT', 1, '52DA', ARRAY['a', 'b', 'c']);
SELECT fotos_tags_command('UPDATE', 1, '52DA', ARRAY['b', 'c', 'd']);
SELECT fotos_tags_command('DELETE', 1, '52DA', ARRAY['b']);
SELECT fotos_tags_command('DELETE', 1, '52DA', NULL);

Несколько пояснений по принципу работы. Для каждой из операции определяются пересечения существующего списка тегов и списка тегов пришедшего в запросе, это позволяет минимизировать количество запросов, но усложняет логику. Так же сложности добавляет двойная запись тегов, правда запросы тоже группируются по хранилищам.

Еще видно, UPDATE это практически копия вставки и удаления, поэтому возможно оптимальней сделать раздельные функции вставки и одаления и использовать их.

Теперь мы можем вставлять, изменять и удалять практически любые данные проекта. Остается, по моему мнению, самое простое - выборки данных.

Сергей Томулевич aka Phoinix (01.12.2009 г.)
Copyright © 2011 Сергей Томулевич