Открыть список
Как стать автором
Обновить

Прозрачный переход PgQ -> RabbitMQ

PostgreSQL


Дорогой хабрачитатель, я хочу поделиться опытом по поводу прозрачного для приложения перехода с очереди PgQ на amqp. Возможно это покажется велосипедом, возможно какие-то мысли пригодятся. Статья предполагает знакомство с основами PgQ и rabbitmq.

Предпосылки


Так исторически сложилось, что в нашем проекте очень активно используется PgQ. При всех своих недостатках, у PgQ есть неоспоримое преимущество — транзакционность с базой данных, чем активно пользовался наш код. То есть, можно быть уверенным, что и в очередь событие попадет, и база обновится. Или ни того, ни другого не случится. И это преимущество должно быть перенесено на новый движок очередей.

Подробно описывать причины ухода с PgQ я тут не буду, это тема для другой статьи. Остановлюсь только конкретно на самом переходе.

Ход размышлений, pg_amqp


Гугление наводит на расширение для PostgreSQL — pg_amqp. Оно предоставляет хранимые процедуры в PostgreSQL для отправки в amqp. Расширение отлично работает на уровне логики приложения: откатив транзакцию в PostgreSQL — данные в amqp не попадут. А если закоммитим — попадут.
BEGIN;
INSERT INTO some_table (...) VALUES (...);
SELECT amqp.publish(broker_id, 'amqp.direct', 'foo', 'bar');
ROLLBACK; //Данные и в базу не вставились, и в amqp не отправились

На самом деле расширение не гарантирует то что сообщение попадет в amqp. Внутри всего лишь последовательный коммит транзакции сначала в PostgreSQL, а потом в amqp. И если пропадет соединение с amqp между двумя коммитами — сообщение пропадет. Несмотря на то что вероятность такого события маленькая, потерянные пакеты будут. А учитывая, что мы работаем с реальными деньгами и торговыми счетами, это недопустимо.

Для тех, кому потеря 0.01% пакетов допустима — остаток статьи можно не читать. Просто используйте pg_amqp, если хотите уйти с PgQ на amqp.

Начинаем строить велосипед


* Далее вместо абстрактного amqp будет конкретный rabbitmq.

Но ведь у нас остается PostgreSQL, внутри которого транзакции полноценные. И мы можем транзакционно вставлять все пакеты в какую-то таблицу, а потом как-то досылать в amqp то что туда не дошло.

Сказано — сделано.

Вся работа с PgQ у нас в приложении делалось с помощью одной хранимой процедуры, которую я мог свободно менять.

Я создал таблицу
amqp.message(
    id bigint default nextval('amqp_message_id_sequence') primary key,
    pid bigint,
    queue varchar(128),
    message text
)

и тригер, который при вставке в таблицу отправлял эти данные в amqp. А хранимку вставки в pgq заменил на вставку данных в эту таблицу. Overhead при этом только в отправке данных в amqp, так как в PgQ тоже при каждом событии происходит вставка в таблицу. Зачем нужен pid объясню позже.

Теперь сообщения есть и в таблице, и у получателя из rabbitmq. В таблицу сообщения пишутся гарантированно в рамках транзакции PostgreSQL, а в amqp отправляются почти все сообщения, с помощью pg_amqp. Но как понять какие сообщения пришли, а какие нет? И как держать эту таблицу во вменяемых размерах (желательно десятки или сотни строк), чтобы не потерять производительность?

Тут на помощь приходит сам rabbitmq. Ведь он умеет сообщения дублировать в несколько очередей



Так давайте одну очередь отдадим нашему бизнес-коду, а вторую будем использовать для подтверждения получения пакета?
Сказано — сделано. Создаем exchange, 2 очереди и досыльщик, который просто удаляет из таблицы amqp.message полученное сообщение.

В итоге, есть таблица, в которой хранятся только те сообщения, которые «в пути». Размер таблицы всегда небольшой, так как сообщения сразу же после вставки удаляются. Размер таблицы можно поставить на мониторинг. А бизнес-код нашего приложения теперь работает только с rabbitmq и ничего не знает о магии под капотом.

Вот как выглядит итоговая схема работы




Но теперь появляется важный вопрос: а как понять, что какой-то пакет не пришел? Ведь строка в таблице amqp.message ещё не гарантирует, что сообщение пропало — оно может быть просто «в пути». Нам нужно быть в этом уверенным, чтобы досылать пакет, иначе мы можем создать дубль пакета, и кому-то зачислим 200$ вместо 100$ :) И в то же время определить, что пакет не пришел и дослать его нужно максимально быстро, чтобы минимально нарушать порядок следования пакетов в очереди.

Вот тут начинается основное шаманство


Все наши пакеты пронумерованы по возрастанию, но ведь система многопоточна, и пакеты не обязаны приходить в rabbitmq в том порядке, в котором они лежат в таблице. А вот в рамках одного процесса, который отправляет сообщения в amqp, они должны быть строго упорядочены. PostgreSQL предоставляет возможность посмотреть pid текущего процесса (pg_backend_pid()). И мы можем ожидать, что в рамках одного pg_backend_pid() пакеты будут строго упорядочены по возрастанию (напоминаю, что id пакета мы генерируем с помощью nextval). А следовательно, при получении пакета с id N, все пакеты от этого же pg_backend_pid с id ниже N являются не доставленными и их нужно дослать.

Итого, нам нужно сделать досыльщик очереди, который делает всего 2 вещи:
  • Слушает очередь «Очередь досыльщика»
  • Проверяет, нет ли в таблице amqp.message сообщений с текущим pid и id меньшим чем текущий. Если есть — досылает их (досылает опять таки транзакционно, через таблицу amqp.message)
  • Удаляет из таблицы amqp.message сообщения по id


Профит! У нас все сообщения доходят до адресата, и при этом мы полностью избавились от PgQ. Код основного приложения практически не изменился.

Накладные расходы на все:
  • Системная таблица amqp.message, в которую мы вставляем каждое сообщение, а потом удаляем
  • Досыльщик, который удаляет строки из amqp.message и досылает сообщения


Обращу внимание на то, что логика досыльщика совершенно устойчива к падениям. Его в любой момент можно убить, он снова запустится и продолжит работу без каких-либо проблем.

Система не учитывает случай, когда процесс postgres отправил пакет в amqp, который не дошел, и больше пакеты не отправляет. Буду благодарен, если кто-то подскажет, как автоматически обработать эту ситуацию. Сейчас это решается просто мониторингом, но ни одного такого события пока не было. Вообще факт досылки сообщения — очень редкое событие. У нас используется pgbouncer, что уменьшает множество различных pg_backend_pid.
Только зарегистрированные пользователи могут участвовать в опросе. Войдите, пожалуйста.
Хотите ли вы заменить PgQ на что-то другое?
8.87% да 11
15.32% нет 19
75.81% Не использую PgQ 94
Проголосовали 124 пользователя. Воздержались 59 пользователей.
Теги:rabbitmqamqppostgresPgQ
Хабы: PostgreSQL
Всего голосов 16: ↑16 и ↓0 +16
Просмотры14.7K

Комментарии 36

Только полноправные пользователи могут оставлять комментарии. Войдите, пожалуйста.

Похожие публикации

Backend разработчик (PHP7, Symfony 5, PostgreSQL)
от 4 000 $TextMagicОмскМожно удаленно
Senior PHP разработчик (Symfony, микросервисы)
от 150 000 до 220 000 ₽ДВИЖМожно удаленно
Senior Backend Developer PHP
от 220 000 ₽HelastelМожно удаленно
DevOps инженер Linux (можно удаленно, Москва)
от 150 000 ₽Баланс-ПлатформаМоскваМожно удаленно
Разработчик Python
от 80 000 до 160 000 ₽Баланс-ПлатформаМоскваМожно удаленно

Лучшие публикации за сутки