Очереди сообщений в PostgreSQL с использованием PgQ

PostgreSQLПрограммированиеSQL
Из песочницы


Очереди сообщений используются для выполнения: отложенных операций, взаимодействия сервисов между собой, «batch processing» и т.д. Для организации подобных очередей существуют специализированные решения, такие как: RabbitMQ, ActiveMQ, ZeroMQ и тд, но часто бывает, что в них нет большой необходимости, а их установка и поддержка причинит больше боли и страданий, чем принесет пользы. Допустим, у вас есть сервис, при регистрации в котором пользователю отправляется email для подтверждения, и, если вы используете Postgres, то вам повезло — в Postgres, почти из коробки, есть расширение PgQ, которое сделает всю грязную работу за вас.

В этой статье я расскажу об организации очередей сообщений (задач) в PostgreSQL с использованием расширения PgQ. Эта статья будет полезна, если вы еще не использовали PgQ или используете самописные очереди поверх Postgres.

Зачем вообще нужен PgQ, если можно просто создать табличку и записывать туда задачи? Казалось бы, можно, но вам придется учесть паралельный доступ к задачам, возможные ошибки (что будет, если процесс обрабатывающий задачу, упадет?), а также производительность (PgQ очень быстрый, а самописные решения, как правило, нет, особенно если транзакция в базе не закрывается во время всего выполнения задачи), но самое главное, почему на мой взгляд надо использовать PgQ, это то, что PgQ уже написан и работает, а самописное решение еще надо написать (UPD: про то, почему не стоит использовать самописные очереди, можно почитать, например, тут).
(UPD: т.к. PgQ работает поверх Postgres, все прелести транзакций можно использовать и в нем)

Но у PgQ есть один огромный минус — отсутствие документации, этот недостаток я и пытаюсь компенсировать этой статьей.

Устройство


PgQ состоит из частей (как минимум 2-х): 1 — расширение pgq для postgres, 2 — демон pgqd (об их установке чуть позже).

Все взаимодействие с очередью осуществляется с помощью функций внутри Postgres.

Например, чтобы создать очередь, надо выполнить

select * from pgq.create_queue({имя очереди} text);

После того как очередь создана, в нее можно добавлять сообщения

select * from pgq.insert_event({имя очереди} text, {тип события} text, {информация о событии} text);

Теперь надо научиться получать записанные сообщения. Для этого существует такая сущность, как «consumer» (я буду писать консьюмер), который получает не сообщения (события), а «бачи» (batch). Бач — это группа подряд идущих сообщений, бачи создаются с помощью pgqd. Периодически (параметр «ticker_period» в конфигурационном файле) pgqd берет все накопленные сообщения и записывает в новый бач. Важно, если pgqd не работает, то новые бачи не создаются, а значит, консьюмерам нечего читать, также, если pgqd долго не работал, а потом включился, то он создаст один большой бач из сообщений, накопленных за это время, поэтому pgqd не следует просто так отключать.

Регистрация консьюмера (Важно! консьюмер будет получать события, записанные только после его регистрации, поэтому следует вначале создать консьюмер, а только потом уже писать события):

select * from pgq.register_consumer({имя очереди} text, {имя консьюмера} text);

(аналогично pgq.unregister_consumer)
Каждый консьюмер получит абсолютно все события, произошедшие после его создания (даже уже обработанные другим консьюмером), это значит, что скорее всего вам нужен всего один консьюмер на одну очередь. Далее я расскажу, как при этом разделить нагрузку на несколько серверов.

Для получения бача нужно вначале узнать его ID:

select * from pgq.next_batch({имя очереди} text, {имя консьюмера} text);

Функция может вернуть NULL, если консьюмер обработал все бачи. В этом случае нужно просто подождать, пока pgqd создаст новый бач.

При этом, пока бач не обработан, эта функция будет возвращать одно и то же значение.
Получить все события в баче можно с помощью:

select * from pgq.get_batch_events({id бача} bigint);

(Бач может быть пустым.)

Если при обработке какого-то из них возникла ошибка, то можно попробовать обработать это событие позже:

select * from pgq.event_retry({id бача} bigint, {id события} bigint, {количество секунд до ретрая} integer);

Чтобы сообщить об окончании бача и получить возможность приступить к новому, используется

select * from pgq.finish_batch({id бача} bigint);

Разумеется, это не все функции в расширении, рекомендую почитать pgq.github.io/extension/pgq/files/external-sql.html и github.com/pgq/pgq/tree/master/functions (в каждом файле содержится дефиниция и описание соответствующей функции).

Распределение нагрузки


Для того чтобы обрабатывать события одновременно несколькими обработчиками, существует расширение pgq_coop, которое работает повех pgq и добавляет новую сущность «субконсьюмер» (sub consumer), который будет получать все события с момента регистрации родительского консьюмера, разумеется, кроме уже обработанных.

select * from pgq_coop.register_subconsumer({имя очереди} text, {имя консьюмера} text, {имя субконсьюмера} text);

select * from pgq_coop.next_batch({имя очереди} text, {имя консьюмера} text, {имя субконсьюмера} text);

select * from pgq_coop.next_batch({имя очереди} text, {имя консьюмера} text, {имя субконсьюмера} text, {если другой субконсьюмер был неактивен в течение этого интервала, текущий может забрать у него бач} interval);

select * from pgq_coop.finish_batch({id бача} bigint);

Прочитать про все функции можно здесь.

Установка


Расширение pgq и демон pgqd входят в репозитории PGDG и в большинстве дитрибутивов устанавливаются очень просто, например, в Debian:

sudo apt install postgresql-XX-pgq3 pgqd (XX — номер версии).

pgqd — это небольшая программка, узнать про использование которой можно с помощью pgqd --help, не забудьте добавить ее в автозапуск (sudo systemctl enable pgqd.service, а конфиг по умолчанию — /etc/pgqd.ini).

Чтобы начать использовать PgQ, в базе надо просто подключить расширение:

create extension if not exists pgq;


C pgq_coop все немного сложнее, его нет в репозитории, но собрать его из исходников не составляет труда (пример для Debian):

sudo apt install postgresql-server-dev-XX
git clone https://github.com/pgq/pgq-coop.git
cd pgq-coop
sudo make install

и подключить расширение с помощью

create extension if not exists pgq_coop;

Полезные ссылки


Документация pgq
Функции pgq
Функции pgq_coop
Исходный код pgqd
github аккаунт со всеми связанными проектами
Wiki postgres-а
Теги:postgrespostgresqlpgqmessage queueочередиочереди сообщенийочереди задачочередь задачочередь сообщений
Хабы: PostgreSQL Программирование SQL
+17
9,7k 107
Комментарии 34

Похожие публикации

MS SQL Server Developer
10 марта 202135 000 ₽OTUS
PostgreSQL
15 апреля 202180 000 ₽OTUS
Введение в SQL
19 апреля 202117 100 ₽Luxoft Training
Профессия Project Manager
20 января 202198 000 ₽Нетология

Лучшие публикации за сутки