Равномерное распределение с помощью dblink
Структура кластера
Распределенная DB
Таблица пользователей
В данную таблицу вставим денормализованое поле tags, которое будет списком тегов пользователя с количеством применений.
Структура таблицы:
SQL код (1)
CREATE TYPE "public"."tag_count" AS (
"tag_id" INTEGER ,
"counter" INTEGER
);
CREATE TABLE "public"."users" (
"id" SERIAL ,
"user_hash" CHAR (4) NOT NULL ,
"login" VARCHAR (200) NOT NULL ,
"pass" TEXT NOT NULL ,
"tags" "public"."tag_count" [],
CONSTRAINT "users_pkey" PRIMARY KEY ("user_hash", "id")
) WITHOUT OIDS ;
CREATE UNIQUE INDEX "users_idx" ON "public"."users"
USING btree ("login");
Хочу обратить внимание на PRIMARY KEY он у нас составной, так как на разных серверах свои последовательности, а использовать одну последовательность на проксирующем сервере не совсем хочется, так как их тоже может быть несколько. Поэтому берем за правило, что для распределенных данных ключ всегда составной, как хеш хранищища и ID в пределах хранилища. Тем более могут возникнуть ситуации, когда потребуется объединить два хранилища и мы получим пересечение по id.
Поле user_hash будем вычислять на прокси сервере, поэтому никаких дополнительных действий перед вставкой не нужно. А вот при обновлении непрохо было бы заблокировать изменения полей id , user_hash и login , для этого создадим триггер для обновления:
SQL код (2)
CREATE OR REPLACE FUNCTION "public"."users_update_trigger" () RETURNS trigger AS
$body$
BEGIN
NEW .id := OLD .id;
NEW .login := OLD .login;
NEW .user_hash := OLD .user_hash;
RETURN NEW ;
END ;
$body$
LANGUAGE 'plpgsql' ;
CREATE TRIGGER "users_update" BEFORE UPDATE
ON "public"."users" FOR EACH ROW
EXECUTE PROCEDURE "public"."users_update_trigger" ();
Это, конечно накладывает некоторые ограничения, но зато меньше будет проблем с целостностью данных и миграцией.
Таблица альбомов
Структура таблицы:
SQL код (3)
CREATE TABLE "public"."albums" (
"id" SERIAL ,
"user_hash" CHAR (4) NOT NULL ,
"user_id" INTEGER NOT NULL ,
"title" TEXT NOT NULL ,
"count_fotos" INTEGER DEFAULT 0 NOT NULL ,
CONSTRAINT "albums_pkey" PRIMARY KEY ("user_hash", "id"),
CONSTRAINT "albums_fk" FOREIGN KEY ("user_hash", "user_id")
REFERENCES "public"."users" ("user_hash", "id")
MATCH FULL
ON DELETE CASCADE
ON UPDATE CASCADE
NOT DEFERRABLE
) WITHOUT OIDS ;
CREATE INDEX "albums_idx" ON "public"."albums"
USING btree ("user_id");
Хочу обратить внимание опять на PRIMARY KEY , что поле user_hash в составном ключе, если для таблицы users это не имело особого значения, то в последующих таблицах это важно. Из этой таблицы мы будем выбирать данные как по первичному ключу так и по user_id + user_hash , что бы получить альбомы пользователя, так вот, что бы не расширять индекс по user_id добавив в него user_hash , user_hash проще сделать первым в первичном ключе, все равно по одному id выбирать никогда не будем. Тогда при запросах по user_id + user_hash будет использоваться индекс по user_id и первичный ключ, это конечно не столь оптимально, но зато позволит съэкономить память.
Еще сразу навешиваем внешний ключ к таблице users .
И триггер повесим на обновление.
SQL код (4)
CREATE OR REPLACE FUNCTION "public"."albums_update_trigger" () RETURNS trigger AS
$body$
BEGIN
NEW .id := OLD .id;
NEW .user_hash := OLD .user_hash;
NEW .user_id := OLD .user_id;
RETURN NEW ;
END ;
$body$
LANGUAGE 'plpgsql' ;
CREATE TRIGGER "albums_update" BEFORE UPDATE
ON "public"."albums" FOR EACH ROW
EXECUTE PROCEDURE "public"."albums_update_trigger" ();
С теми же последствиями по ограничению. И пользователя у альбома мы менять не можем.
Таблица фото
Структура таблицы:
SQL код (5)
CREATE TABLE "public"."fotos" (
"id" SERIAL ,
"user_hash" CHAR (4) NOT NULL ,
"album_id" INTEGER NOT NULL ,
"user_id" INTEGER NOT NULL ,
"title" TEXT NOT NULL ,
"filename" VARCHAR (200) NOT NULL ,
"rating" NUMERIC (5,4) NOT NULL DEFAULT 0,
"votes_num" INTEGER NOT NULL DEFAULT 0,
"votes_sum" INTEGER NOT NULL DEFAULT 0,
CONSTRAINT "fotos_pkey" PRIMARY KEY ("user_hash", "id"),
CONSTRAINT "fotos_fk" FOREIGN KEY ("user_hash", "album_id")
REFERENCES "public"."albums" ("user_hash", "id")
MATCH FULL
ON DELETE CASCADE
ON UPDATE CASCADE
NOT DEFERRABLE
) WITHOUT OIDS ;
CREATE INDEX "fotos_idx" ON "public"."fotos"
USING btree ("album_id");
CREATE INDEX "fotos_idx1" ON "public"."fotos"
USING btree ("user_id");
Хотя по большому счету user_id не особо нужен, но пусть будет, по крайней мере при смене альбома, мы не сможем сменить пользователя фото, даже если альбом совсем левый, для чего cделаем триггер на обновление.
Кроме всего прочего, у нас может обновится рейтинг, причем, мы еще должны его обновить на индексной базе данных, для чего нам понадобится dblink и данные о внешнем индексном сервере:
SQL код (6)
CREATE FOREIGN DATA WRAPPER postgresql VALIDATOR postgresql_fdw_validator;
CREATE SERVER index_db FOREIGN DATA WRAPPER postgresql OPTIONS (hostaddr '127.0.0.1' , dbname 'db_index' );
CREATE USER MAPPING FOR postgres SERVER index_db OPTIONS (user 'postgres' , password 'pass' );
CREATE OR REPLACE FUNCTION "public"."index_db_select " () RETURNS text AS
$body$
DECLARE
connect TEXT := 'index_db' ;
connectings TEXT [];
BEGIN
SELECT dblink_get_connections () INTO connectings ;
IF connect = ANY (connectings ) THEN ELSE
PERFORM dblink_connect (connect , 'index_db' );
END IF ;
RETURN connect ;
EXCEPTION
WHEN sqlclient_unable_to_establish_sqlconnection THEN
RAISE NOTICE 'Can not connect to server index_db, connect index_db' ;
RETURN 'ERR' ;
END ;
$body$
LANGUAGE 'plpgsql' ;
Вот так, теперь можно из хранилища обращаться напрямую к индексной базе. Кроме рейтинга еще обновляем счетчик альбома.
SQL код (7)
CREATE OR REPLACE FUNCTION "public"."fotos_update_trigger" () RETURNS trigger AS
$body$
DECLARE
query TEXT ;
index_connect TEXT ;
BEGIN
IF NEW .album_id <> OLD .album_id THEN
UPDATE albums
SET count_fotos = count_fotos - 1
WHERE id = OLD .album_id AND user_hash = OLD .user_hash;
UPDATE albums
SET count_fotos = count_fotos + 1
WHERE id = NEW .album_id AND user_hash = NEW .user_hash;
END IF ;
IF NEW .votes_sum <> OLD .votes_sum OR
(OLD .votes_sum IS NULL AND NEW .votes_sum IS NOT NULL )
THEN
IF NEW .votes_num IS NULL THEN
NEW .votes_num := OLD .votes_num;
END IF ;
IF NEW .votes_num <= 0 OR NEW .votes_sum <= 0 THEN
NEW .rating := 0;
ELSE
NEW .rating := NEW .votes_sum::numeric (12,4) / NEW .votes_num;
END IF ;
SELECT index_db_select () INTO index_connect ;
IF index_connect = 'ERR' OR index_connect IS NULL THEN
RAISE EXCEPTION 'No index_db connect' ;
END IF ;
query := 'SELECT index_foto_rating_command(
(''UPDATE'', ' ||
NEW .id || ',' ||
quote_literal(NEW .user_hash) || ',' ||
quote_literal(NEW .rating) || ',' ||
quote_literal(OLD .rating) || '));' ;
PERFORM (SELECT * FROM dblink (index_connect , query , TRUE ) AS (f BOOLEAN ));
END IF ;
NEW .id := OLD .id;
NEW .user_hash := OLD .user_hash;
NEW .user_id := OLD .user_id;
RETURN NEW ;
END ;
$body$
LANGUAGE 'plpgsql' ;
CREATE TRIGGER "fotos_update" BEFORE UPDATE
ON "public"."fotos" FOR EACH ROW
EXECUTE PROCEDURE "public"."fotos_update_trigger" ();
Так как у нас еще идет обновление счетчиков для альбомов и поля рейтинга, поэтому еще сделаем триггер на вставку и на удаление, что бы обновлять счетчики фото:
SQL код (8)
CREATE OR REPLACE FUNCTION "public"."fotos_insert_trigger" () RETURNS trigger AS
$body$
DECLARE
index_connect TEXT ;
query TEXT ;
BEGIN
UPDATE albums
SET count_fotos = count_fotos + 1
WHERE id = NEW .album_id AND user_hash = NEW .user_hash;
SELECT index_db_select () INTO index_connect ;
IF index_connect = 'ERR' OR index_connect IS NULL THEN
RAISE EXCEPTION 'No index_db connect' ;
END IF ;
query := 'INSERT INTO index_last_foto (foto_id, user_hash)
VALUES (' || NEW .id || ',' ||
quote_literal(NEW .user_hash) || '));' ;
PERFORM dblink_exec (index_connect , query , TRUE );
RETURN NEW ;
END ;
$body$
LANGUAGE 'plpgsql' ;
CREATE TRIGGER "fotos_insert" AFTER INSERT
ON "public"."fotos" FOR EACH ROW
EXECUTE PROCEDURE "public"."fotos_insert_trigger" ();
CREATE OR REPLACE FUNCTION "public"."fotos_delete_trigger" () RETURNS trigger AS
$body$
DECLARE
index_connect TEXT ;
query TEXT ;
BEGIN
UPDATE albums
SET count_fotos = count_fotos - 1
WHERE id = OLD .album_id AND user_hash = OLD .user_hash;
SELECT index_db_select () INTO index_connect ;
IF index_connect = 'ERR' OR index_connect IS NULL THEN
RAISE EXCEPTION 'No index_db connect' ;
END IF ;
query := 'SELECT index_foto_rating_command(
(''DELETE'', ' ||
OLD .id || ',' ||
quote_literal(OLD .user_hash) || ',' ||
0 || ',' ||
quote_literal(OLD .rating) || ');' ;
PERFORM (SELECT * FROM dblink (index_connect , query , TRUE ) AS (f BOOLEAN ));
query := 'DELETE FROM index_last_foto
WHERE foto_id = ' || OLD .id ||
' AND user_hash = ' || quote_literal(OLD .user_hash);
PERFORM dblink_exec (index_connect , query , TRUE );
RETURN OLD ;
END ;
$body$
LANGUAGE 'plpgsql' ;
CREATE TRIGGER "fotos_delete" BEFORE DELETE
ON "public"."fotos" FOR EACH ROW
EXECUTE PROCEDURE "public"."fotos_delete_trigger" ();
Теперь количество фотографий для альбома будет считаться автоматически, как рейтинги и прочая в индексной базе.
Таблица связки тег-фото
Кроме таблицы связки тег-фото, сделаем таблицу связки тег-альбом, она нам пригодится в дальнейшем, при выборке данных :
Структура таблицы:
SQL код (9)
CREATE TABLE "public"."fotos_tags" (
"foto_id" INTEGER NOT NULL ,
"album_id" INTEGER NOT NULL ,
"user_id" INTEGER NOT NULL ,
"user_hash" CHAR (4) NOT NULL ,
"tag_id" INTEGER NOT NULL ,
"tag_hash" CHAR (4) NOT NULL ,
CONSTRAINT "fotos_tags_pkey" PRIMARY KEY ("user_hash", "foto_id", "tag_id")
) WITHOUT OIDS ;
CREATE INDEX "fotos_tags_idx" ON "public"."fotos_tags"
USING btree ("album_id");
CREATE INDEX "fotos_tags_idx1" ON "public"."fotos_tags"
USING btree ("user_id");
CREATE INDEX "fotos_tags_idx2" ON "public"."fotos_tags"
USING btree ("tag_id", "foto_id" DESC );
CREATE TABLE "public"."albums_tags" (
"album_id" INTEGER NOT NULL ,
"user_hash" CHAR (4) NOT NULL ,
"tag_id" INTEGER NOT NULL ,
"counter" INTEGER NOT NULL DEFAULT 0,
CONSTRAINT "albums_tags_pkey" PRIMARY KEY ("user_hash", "album_id"),
CONSTRAINT "albums_tags_fk" FOREIGN KEY ("user_hash", "album_id")
REFERENCES "public"."albums" ("user_hash", "id")
MATCH FULL
ON DELETE CASCADE
ON UPDATE CASCADE
NOT DEFERRABLE
) WITHOUT OIDS ;
CREATE INDEX "albums_tags_idx" ON "public"."albums_tags"
USING btree ("counter");
CREATE INDEX "albums_tags_idx1" ON "public"."albums_tags"
USING btree ("tag_id");
По сути поле tag_hash в этой таблице - информационное, но лучше пусть будет, это поможет во время миграции.
Так как данные по пользовательским тегам и тегам альбома у нас денормализованы, то требуется создать триггера на вставку и удаление для таблицы fotos_tags:
SQL код (10)
CREATE OR REPLACE FUNCTION "public"."fotos_tags_insert_trigger" ()
RETURNS trigger AS
$body$
DECLARE
user_row users ;
user_tag tag_count ;
user_tags tag_count [];
user_tag_update_flag BOOLEAN := FALSE ;
BEGIN
SELECT * INTO user_row
FROM users
WHERE id = NEW .user_id AND
user_hash = NEW .user_hash;
IF user_row IS NOT NULL THEN
FOR user_tag IN SELECT * FROM unnest (user_row .tags) LOOP
IF user_tag .tag_id = NEW .tag_id THEN
user_tag .counter := user_tag .counter + 1;
user_tag_update_flag := TRUE ;
END IF ;
user_tags := array_append (user_tags , user_tag );
END LOOP ;
IF user_tag_update_flag = FALSE THEN
user_tag .tag_id := NEW .tag_id;
user_tag .counter := 1;
user_tags := array_append (user_tags , user_tag );
END IF ;
UPDATE users
SET tags = user_tags
WHERE id = NEW .user_id AND
user_hash = NEW .user_hash;
UPDATE albums_tags
SET counter = counter + 1
WHERE album_id = NEW .album_id AND
user_hash = NEW .user_hash AND
tag_id = NEW .tag_id;
IF FOUND = FALSE THEN
INSERT INTO albums_tags
VALUES (NEW .album_id, NEW .user_hash, NEW .tag_id, 1);
END IF ;
END IF ;
RETURN NEW ;
END ;
$body$
LANGUAGE 'plpgsql' ;
CREATE TRIGGER "fotos_tags_insert" AFTER INSERT
ON "public"."fotos_tags" FOR EACH ROW
EXECUTE PROCEDURE "public"."fotos_tags_insert_trigger" ();
CREATE OR REPLACE FUNCTION "public"."fotos_tags_delete_trigger" () RETURNS trigger AS
$body$
DECLARE
user_row users ;
user_tag tag_count ;
user_tags tag_count [];
user_tag_update_flag BOOLEAN := FALSE ;
album_counter INTEGER ;
BEGIN
SELECT * INTO user_row
FROM users
WHERE id = NEW .user_id AND
user_hash = NEW .user_hash;
IF user_row IS NOT NULL THEN
FOR user_tag IN SELECT * FROM unnest (user_row .tags) LOOP
IF user_tag .tag_id = NEW .tag_id THEN
user_tag .counter := user_tag .counter - 1;
user_tag_update_flag := TRUE ;
END IF ;
IF user_tag .counter > 0 THEN
user_tags := array_append (user_tags , user_tag );
END IF ;
END LOOP ;
UPDATE users
SET tags = user_tags
WHERE id = NEW .user_id AND
user_hash = NEW .user_hash;
UPDATE albums_tags
SET counter = counter - 1
WHERE album_id = NEW .album_id AND
user_hash = NEW .user_hash AND
tag_id = NEW .tag_id
RETURNING counter INTO album_counter ;
IF FOUND = TRUE AND album_counter < 1 THEN
DELETE FROM albums_tags
WHERE album_id = NEW .album_id AND
user_hash = NEW .user_hash AND
tag_id = NEW .tag_id;
END IF ;
END IF ;
RETURN OLD ;
END ;
$body$
LANGUAGE 'plpgsql' ;
CREATE TRIGGER "fotos_tags_delete" AFTER DELETE
ON "public"."fotos_tags" FOR EACH ROW
EXECUTE PROCEDURE "public"."fotos_tags_delete_trigger" ();
Таблица голосований пользователей
Структура таблицы:
SQL код (11)
CREATE TABLE "public"."fotos_votes" (
"foto_id" INTEGER NOT NULL ,
"foto_hash" CHAR (4) NOT NULL ,
"user_id" INTEGER NOT NULL ,
"user_hash" CHAR (4) NOT NULL ,
"vote" SMALLINT NOT NULL ,
CONSTRAINT "foto_votes_pkey" PRIMARY KEY ("foto_hash", "foto_id", "user_hash", "user_id"),
CONSTRAINT "fotos_votes_fk" FOREIGN KEY ("foto_hash", "foto_id")
REFERENCES "public"."fotos" ("user_hash", "id")
MATCH FULL
ON DELETE CASCADE
ON UPDATE CASCADE
NOT DEFERRABLE
) WITHOUT OIDS ;
Данные о том кто и как голосовал за фото будут располагаться в том же хранилище что и само фото, так как проще хранить целостность данных (для чего создан внешний ключ), а так же не потребуется распределять данные двойной записью как с тегами. Тем более, нам интереснее какие пользователи голосовали за фото, а не за какие фото голосовал пользователь. А еще можно, точнее нужно, навесить триггеры которые обновляет данные о голосах:
Триггер на вставку:
SQL код (12)
CREATE OR REPLACE FUNCTION "public"."fotos_votes_insert_trigger" () RETURNS trigger AS
$body$
BEGIN
UPDATE fotos
SET votes_num = votes_num + 1,
votes_sum = votes_sum + NEW .vote
WHERE user_hash = NEW .foto_hash AND
id = NEW .foto_id;
RETURN NEW ;
END ;
$body$
LANGUAGE 'plpgsql' ;
CREATE TRIGGER "fotos_votes_insert" AFTER INSERT
ON "public"."fotos_votes" FOR EACH ROW
EXECUTE PROCEDURE "public"."fotos_votes_insert_trigger" ();
Триггер на изменение:
SQL код (13)
CREATE FUNCTION "fotos_votes_update_trigger" () RETURNS trigger AS
$body$
BEGIN
NEW .foto_id := OLD .foto_id;
NEW .foto_hash := OLD .foto_hash;
NEW .user_id := OLD .user_id;
NEW .user_hash := OLD .user_hash;
IF NEW .vote <> OLD .vote THEN
UPDATE fotos
SET votes_sum = votes_sum - OLD .vote + NEW .vote
WHERE user_hash = NEW .foto_hash AND
id = NEW .foto_id;
END IF ;
RETURN NEW ;
END ;
$body$
LANGUAGE 'plpgsql' ;
CREATE TRIGGER "fotos_votes_update" BEFORE UPDATE
ON "public"."fotos_votes" FOR EACH ROW
EXECUTE PROCEDURE "fotos_votes_update_trigger" ();
Триггер на удаление:
SQL код (14)
CREATE OR REPLACE FUNCTION "public"."fotos_votes_delete_trigger" () RETURNS trigger AS
$body$
BEGIN
UPDATE fotos
SET votes_num = votes_num - 1,
votes_sum = votes_sum - OLD .vote
WHERE user_hash = OLD .foto_hash AND
id = OLD .foto_id;
RETURN OLD ;
END ;
$body$
LANGUAGE 'plpgsql' ;
CREATE TRIGGER "fotos_votes_delete" BEFORE DELETE
ON "public"."fotos_votes" FOR EACH ROW
EXECUTE PROCEDURE "public"."fotos_votes_delete_trigger" ();
Рейтинг для фото, как мы помним, пересчитается сам.
Точки входа
Структура таблиц, внешних ключей и триггеров позволяет нам относительно жестко контролировать целостность данных. Но для упрощения вставки голосования можно сделать отдельную хранимую процедуру:
SQL код (15)
CREATE OR REPLACE FUNCTION "public"."votes_users"
(income "public"."fotos_votes" ) RETURNS boolean AS
$body$
BEGIN
UPDATE fotos_votes
SET vote = income .vote
WHERE foto_hash = income .foto_hash AND
foto_id = income .foto_id AND
user_hash = income .user_hash AND
user_id = income .user_id;
IF FOUND = FALSE THEN
INSERT INTO fotos_votes (foto_hash, foto_id, user_hash, user_id, vote)
VALUES (income .foto_hash,
income .foto_id,
income .user_hash,
income .user_id,
income .vote);
UPDATE fotos
SET votes_num = votes_num + 1,
votes_sum = votes_sum + income .vote;
END IF ;
RETURN TRUE ;
END ;
$body$
LANGUAGE 'plpgsql' ;
Теперь достаточно вызвать эту функцию передав в неё данные, а она оже сама определит, требуется ли вставка или обновление.
С распределенным сервером DB все, можно приступать к самому интересному