DOC.PROTOTYPES.RU

Равномерное распределение с помощью dblink

Проксирующая база данных

Внутренние процедуры

Дополнительные dblink процедуры

Как я писал в переводе документации dblink лучше сделать свои функции на основе dblink для того что бы сразу типизировать возвращаемые данные:

SQL код (1)
CREATE OR REPLACE FUNCTION "public"."_dblink_users" (text, text, boolean)
RETURNS SETOF "public"."users" AS
'$libdir/dblink', 'dblink_record'
LANGUAGE 'c';

CREATE OR REPLACE FUNCTION "public"."_dblink_albums" (text, text, boolean)
RETURNS SETOF "public"."albums" AS
'$libdir/dblink', 'dblink_record'
LANGUAGE 'c';

CREATE OR REPLACE FUNCTION "public"."_dblink_fotos" (text, text, boolean)
RETURNS SETOF "public"."fotos" AS
'$libdir/dblink', 'dblink_record'
LANGUAGE 'c';

CREATE OR REPLACE FUNCTION "public"."_dblink_fotos_tags" (text, text, boolean)
RETURNS SETOF "public"."fotos_tags" AS
'$libdir/dblink', 'dblink_record'
LANGUAGE 'c';

CREATE OR REPLACE FUNCTION "public"."_dblink_albums_tags" (text, text, boolean)
RETURNS SETOF "public"."albums_tags" AS
'$libdir/dblink', 'dblink_record'
LANGUAGE 'c';

CREATE OR REPLACE FUNCTION "public"."_dblink_fotos_votes" (text, text, boolean)
RETURNS SETOF "public"."fotos_votes" AS
'$libdir/dblink', 'dblink_record'
LANGUAGE 'c';

CREATE OR REPLACE FUNCTION "public"."_dblink_index_tags" (text, text, boolean)
RETURNS SETOF "public"."index_tags" AS
'$libdir/dblink', 'dblink_record'
LANGUAGE 'c';

CREATE OR REPLACE FUNCTION "public"."_dblink_index_last_foto" (text, text, boolean)
RETURNS SETOF "public"."index_last_foto" AS
'$libdir/dblink', 'dblink_record'
LANGUAGE 'c';

CREATE OR REPLACE FUNCTION "public"."_dblink_index_top_foto" (text, text, boolean)
RETURNS SETOF "public"."index_top_foto" AS
'$libdir/dblink', 'dblink_record'
LANGUAGE 'c';

CREATE OR REPLACE FUNCTION "public"."_dblink_index_rating_foto" (text, text, boolean)
RETURNS SETOF "public"."index_rating_foto" AS
'$libdir/dblink', 'dblink_record'
LANGUAGE 'c';

Я продублировал dblink процедуры только для набора входящих параметров (text,text,boolean), так как не именованых соединений у нас не будет, и хочется в ручную контролировать возвращаемые ошибки.

Конфигурация и распределение

Основным критерием распределения является 4 последних символа md5 хеша от логина или от тега, соответсвенно, мы должны указать названия соединений, сервера к ним относящиеся и диапазоны хешей. Для этого будем использовать типизированный массив конфигурации, и соответсвующую хранимую процедуру:

SQL код (2)
-- Создаем составной тип для конфига
CREATE TYPE "public"."config_storage_type" AS (
    "alias"         TEXT,
    "server"        TEXT,
    "md5_start"     CHAR(4),
    "md5_end"       CHAR(4)
);

-- Процедура конфига:
CREATE OR REPLACE FUNCTION "public"."storage_config" () RETURNS "public"."config_storage_type"[] AS
$body$
DECLARE
    config      config_storage_type[];
    item        config_storage_type;
BEGIN
-- storage_01
    item.connect    := 'storage_01';
    item.server     := 'storage_01';
    item.md5_start  := '0000';
    item.md5_end    := '9FFF';
    config := array_append(config, item);
-- storage_02
    item.connect    := 'storage_02';
    item.server     := 'storage_02';
    item.md5_start  := 'A000';
    item.md5_end    := 'FFFF';
    config := array_append(config, item);

    RETURN config;
END;
$body$
LANGUAGE 'plpgsql';

Соответсвенно, для того что бы получить нужное соединение сделаем процедуру:

SQL код (3)
CREATE OR REPLACE FUNCTION "public"."storage_select_by_hash"
    (md5hash text, fail_on_error boolean)
    RETURNS text AS
$body$
DECLARE
    config          config_storage_type[];
    config_row      config_storage_type;
    connectings     TEXT[];
    connect         TEXT                    := 'ERR';
    counter         INTEGER;
BEGIN
-- Выбираем конфиг
    SELECT storage_config() INTO config;
-- Выбираем активные соединения
    SELECT dblink_get_connections() INTO connectings;
-- Проходим в цикле по конфигу
    FOR config_row IN SELECT * FROM unnest(config) LOOP
-- Если md5hash в диапазоне, то это соединение нам и нужно
        IF md5hash BETWEEN config_row.md5_start AND config_row.md5_end THEN
            connect := config_row.connect;
-- Проверяем активно ли наше соединение и если нет, то подключаемся
            IF connect = ANY (connectings) THEN ELSE
                PERFORM dblink_connect(config_row.connect, config_row.server);
            END IF;
            EXIT;
        END IF;
    END LOOP;
    RETURN connect;
-- Не будем умирать в случае ошибки, а выдадим сообщение.
EXCEPTION
WHEN sqlclient_unable_to_establish_sqlconnection THEN
    IF fail_on_error = TRUE THEN
        RAISE EXCEPTION 'Can not connect to server %, connect %', config_row.server, config_row.connect;
    ELSE
        RAISE NOTICE 'Can not connect to server %, connect %', config_row.server, config_row.connect;
        RETURN 'ERR';
    END IF;
END;
$body$
LANGUAGE 'plpgsql';

Но еще нам нужно уметь получать md5 ключ из каких-либо данных, логин или тег:

SQL код (4)
CREATE OR REPLACE FUNCTION "public"."get_md5hash" (income text) RETURNS text AS
$body$
DECLARE
    md5hash     TEXT;
BEGIN
-- Получаем md5
    SELECT UPPER(md5(income)) INTO md5hash;
-- Берем 4 последних символа
    SELECT substr(md5hash, length(md5hash) - 3, 4) INTO md5hash;
    RETURN md5hash;
END;
$body$
LANGUAGE 'plpgsql';

Кстати, md5 берем не с начала а с конца, потому как в таком случае минимальное значение MD5 будет "1000", а не "0000".

И как результат - процедура определяющая хранилице относительно логина, тега:

SQL код (5)
CREATE FUNCTION "public"."storage_select_by_text" (income text) RETURNS text AS
$body$
DECLARE
    connect     TEXT;
    md5hash     TEXT;
BEGIN
    SELECT get_md5hash(income) INTO md5hash;
    SELECT storage_select_by_hash(md5hash, TRUE) INTO connect;
    RETURN connect;
END;
$body$
LANGUAGE 'plpgsql';

Да, и процедуру, для получения имени соединения к индексной базе, для того, что бы можно было открывать соединения "на лету";

SQL код (6)
CREATE OR REPLACE FUNCTION "public"."index_db_select"
    (fail_on_error boolean)
    RETURNS text AS
$body$
DECLARE
    connect         TEXT    := 'index_db';
    connectings     TEXT[];
BEGIN
-- Выбираем активные соединения
    SELECT dblink_get_connections() INTO connectings;
    IF connect = ANY (connectings) THEN ELSE
        PERFORM dblink_connect(connect, 'index_db');
    END IF;
    RETURN connect;
EXCEPTION
WHEN sqlclient_unable_to_establish_sqlconnection THEN
    IF fail_on_error = TRUE THEN
        RAISE EXCEPTION 'Can not connect to server index_db, connect index_db';
    ELSE
        RAISE NOTICE 'Can not connect to server index_db, connect index_db';
        RETURN 'ERR';
    END IF;
END;
$body$
LANGUAGE 'plpgsql';

Вот так, теперь все готово для формирования запросов и команд.

Сергей Томулевич aka Phoinix (01.12.2009 г.)
Copyright © 2011 Сергей Томулевич