Как стать автором
Обновить

Умная работа с RabbitMQ в NestJS

Время на прочтение4 мин
Количество просмотров13K
При разработке финансовых систем стандартные механизмы обработки выполненных задач в большинстве реализаций протокола AMQP не всегда подходят. В какой-то момент мы столкнулись с такой проблемой, но обо все по порядку.

Стандартная реализация RabbitMQ в NestJS дает возможность легко получать сообщения в декорируемые функции:

@MessagePattern('notifications')
getNotifications(@Payload() data: number[], @Ctx() context: RmqContext) {
  console.log(context.getMessage());
}

Подробнее о том, как это работает, описано тут.
Также работу с очередями в Nest неплохо осветили в этой статье на Хабре.

Казалось бы, что еще может быть нужно. Однако в текущей реализации есть ряд недостатков.

Проблема 1


Для того, чтобы отправить результат ack в канал нужно вручную вытащить channel из контекста RmqContext и послать ему сигнал.

@MessagePattern('notifications')
getNotifications(@Payload() data: number[], @Ctx() context: RmqContext) {
  const channel = context.getChannelRef();
  const originalMsg = context.getMessage();
  channel.ack(originalMsg);
}

Решение


Использовать такой же паттерн, который использован при работе с http хендлерами путем возвращения результата выполнения прямо из функции как обычный объект, Promise или Observable.

Проблема 2


Если необходимо послать результат в отдельную очередь, то в каждом контроллере, который использует Rabbit вам нужно знать ее название. Иными словами нет возможности сконфигурировать это в 1 месте при инициализации модулей и использовать неявно.

Решение


На этапе конфигурации модуля указать очереди для успешных результатов выполнения операций и для ошибок. Мы решили результаты успешных операций отправлять в одну очередь, а результаты ошибочных — в другую. Пользуясь решением проблемы 1, если возвращаемое значение, Promise или Observable выполнилось успешно, то результат посылается в очередь успешных операций, если нет, то сообщение отклоняется (reject) и попадает в очередь с ошибками, RabbitMQ позволяет легко это сделать с помощью опций x-dead-letter-exchange и x-dead-letter-routing-key при создании очереди.

Проблема 3


Как пользователю библиотеки, разработчику необходимо знать детали протокола AMQP для получения id очередного сообщения, понимать что такое ack и когда его вызывать и т.д.

Решение


Добавить декоратор для получения id сообщения. Вместо ack возвращать результат выполнения из функции-хендлера.

Проблема 4


Самая, пожалуй, главная проблема: доставка сообщения обработчику более одного раза. Когда дело касается финансовых операций это очень важный момент, ведь может возникнуть ситуация, когда деньги уже были отправлены, а операция упала на последнем шаге — при записи в базу данных или отправке acknowledgement брокеру сообщений. Одно из очевидных решений — при получении сообщения консьюмером, перед началом обработки сообщения, записывать ID сообщения сгенерированный продюсером в БД, если такового там еще нет, если же есть, то отвергнуть сообщение. Но в протоколе AMQP предусмотрен флаг redelivered идентифицирующий доставлялось ли это сообщение когда-либо другим клиентам, который мы можем использовать для обнаружения повторно доставленных сообщений и их отправки в очередь с ошибками. В текущей реализации в Nest нет возможности не доставлять такие сообщения.

Решение


Не доставлять это сообщение до обработчика, а логировать ошибку на этапе получения сообщения от драйвера. Конечно, это поведение можно сделать конфигурируемым на этапе декорирования метода, чтобы явно указать, что мы все равно хотим получать сообщения для этого типа action.

Для решения всех вышеперечисленных проблем была написана своя реализация работы с протоколом. Как выглядит инициализация:

const amqp = await NestFactory.create(
 RabbitModule.forRoot({
   host: process.env.AMQP_QUEUE_HOST,
   port: parseInt(process.env.AMQP_QUEUE_PORT, 10),
   login: process.env.AMQP_QUEUE_LOGIN,
   password: process.env.AMQP_QUEUE_PASSWORD,
   tasksQueueNormal: process.env.AMQP_QUEUE_COMMAND_REQUEST,
   tasksQueueRedelivery: process.env.AMQP_QUEUE_REQUEST_ONCE_DELIVERY,
   deadLetterRoutingKey: process.env.AMQP_QUEUE_COMMAND_REQUEST_DEAD_LETTER,
   deadLetterRoutingKeyRedelivery: process.env.AMQP_QUEUE_COMMAND_REQUEST_ONCE_DELEVERY_DEAD_LETTER,
   exchange: process.env.AMQP_EXCHANGE_COMMAND,
   prefetch: parseInt(process.env.AMQP_QUEUE_PREFETCH, 10),
 }),
);
const transport = amqp.get<RabbitTransport>(RabbitTransport);
app.connectMicroservice({
 strategy: transport,
 options: {},
});

app.startAllMicroservices();

Тут мы указываем названия очередей для доставки сообщений, результатов и ошибок, а также отдельные очереди нечувствительные к redelivery.

На уровне контроллера же работа максимально похожа на работу с http

@AMQP(‘say_hey’)
sayHay(@AMQPRequest requestId: string, @AMQPParam q: HeyMessage): Observable<Result> {
 return this.heyService.greet(requestId, q);
}

Результат выполнения задачи попадет в результирующую очередь как только выполниться этот Observable. Параметр декорируемый @AMQPRequest соответствует полю correlationId протокола. Это уникальный индентификатор доставленного сообщения.

Параметр @AMQPParam соответствуют самому телу сообщения. Если это JSON, то сообщение прилетит в функцию уже преобразованным к объекту. Если это простой тип, то сообщение отправляется as is.

Следующее сообщение попадет в dead letter:

@AMQP(‘say_hey’)
sayHayErr(@AMQPRequest requestId: string, @AMQPParam q: HeyMessage): Observable<Result> {
 return throwError(‘Buy’);
}

Что в планах


Добавить Type Reflection для AMQPParam, чтобы тело сообщения преобразовывалось к передаваемому классу. Сейчас это просто каст к типу.

Весь код и инструкции по установке доступны на GitHub.

Любые правки и замечания приветствуются.
Теги:
Хабы:
Всего голосов 10: ↑10 и ↓0+10
Комментарии0

Публикации

Истории

Работа

Ближайшие события