Pull to refresh

Неочевидный RabbitMQ в Yii2 или почему RabbitMQ пишет во все очереди сразу

Reading time 3 min
Views 11K

Хочу поделиться практической проблемой конфигурирования брокера очередей RabbitMQ в Yii2. Предупреждаю читателя, что не обладаю экспертным мнением по работе с данным брокером очередей, однако очень хочется восполнить пробелы документации Yii2 и закрепить результат собственных мучений. Итак, если вы когда либо сталкивались с проблемой, что посылаемые сообщения попадают во все очереди, которые есть на сервере очередей, вы согласны, что это проблема и не понимаете почему так происходит, тогда прошу под кат.

Почему вы можете столкнуться с таким поведением? Например, если вы, как и я раньше не работали с RabbitMQ, но работали например с Gearman. Сам Gearman, простой как железная дорога (позаимствовано у известного и уважаемого в интеренете персонажа). Вы создаете очередь с неким именем, кладете туда данные. Воркер читает из очереди с этим же именем. Всё просто. Теперь настала пора использовать модные технологии, сами не понимая зачем вы выбираете RabbitMQ. Потом радуетесь, что в Yii2 есть готовая абстракция для многих популярных брокеров очередей. Скудная документация подсказывает дефолтную конфигурацию чтобы всё завелось:

return [
    'bootstrap' => [
        'queue', // The component registers its own console commands
    ],
    'components' => [
        'queue' => [
            'class' => \yii\queue\amqp_interop\Queue::class,
            'port' => 5672,
            'user' => 'guest',
            'password' => 'guest',
            'queueName' => 'queue',
            'driver' => yii\queue\amqp_interop\Queue::ENQUEUE_AMQP_LIB,

            // or
            'dsn' => 'amqp://guest:guest@localhost:5672/%2F',

            // or, same as above
            'dsn' => 'amqp:',
        ],
    ],
];

Казалось бы всё до безумия просто, вот знакомая нам строка с queueName, копипастим, исправляем, запускаем — работает! Делаем очереди для других компонентов системы. Распараллеливаем чем можем наш php. Commit, Push довольные кладем таску в QA и идем читать хабр (в обеденный перерыв).

Тут на самом интересном нас прерывает QA в чате, и говорит, что происходит что-то странное. Почему-то данные которые пишут воркеры (косьюмеры) дублируются. Что что? Не может такого быть. Идем проверять логи. Видим, что сообщение пишется в очередь, при чем очередь выбрана правильно, hash jobId получен. Не-не, у нас нет никаких ошибок. Пишем довольные QA, проверь ещё раз, не может такого быть, у нас всё хорошо.

Буквально через пол часа нас отвлекают снова, ошибка повторилась и тут же на мыло упало уведомление — задача снова переведена в работу. Ну что, я ж программист, сейчас разберемся. У RabbitMQ, в отличие от того же Gearman есть веб интерфейс, в котором есть много информации о работе сервера. Выглядит сие чудо вот так:



Кидаем ещё пару сообщений в очередь, видим в вебморде, что наши сообщения доходят и обрабатываются воркером. Случайный взгляд замечает, что когда мы кидаем сообщение в очередь, во всех очередях подскакивает график «Message rates».



Ещё сто раз проверяем конфигурацию, перечитываем скудную документациюю Yii. Мы всё сделали правильно. Идем читать документацию на сайт rabbit'a. После пары десятков минут блуждания в темноте натыкаемся на туториал. Сразу после первого абзаца знакомимся с причиной наших непоняток — Exchanges. Повторять документацию не буду, очень коротко.

В RabbitMQ мы не пишем сообщение в очередь, мы пишем его в exchange, этакий прокси, который одним концом принимает наши сообщения, а другим концом общается с очередями на сервере. В его власти принять решения в какую очередь положить наши данные. Занятно, что в документации Yii об этом нет ни строчки. С первого взгляда непонятно как сконфигуровать exchange, ныряем в кишочки и в файле vendor/yiisoft/yii2-queue/src/drivers/amqp_interop/Queue.php:176 находим заветное свойство, которое можно засетить. Тут надо сказать, что драйверов для RabbitMQ много, в моем случае используется enqueue/amqp-lib. Выставляем exchangeName, тестируем, ничего не меняется. Мы ж как настоящий русский инженер, сначала пробуем, а потом идем вдумчиво читать документацию ещё раз. Читаем ещё раз внимательно, потом идем в веб морду rabbit'a и видим следующее:



Несколько наших очередей связаны с одним и тем же exchange. Бинго! Вот она причина, одно «но», я их не связывал. Идем снова в кишочки драйвера, находим метод setupBroker строчку 392

$this->context->bind(new AmqpBind($queue, $topic));

Вот это неуправляемое связывание.

И так, недолго поразмыслив, приходим к выводу, что для каждой очереди должен быть объявлен свой exchange, тогда связь будет верной и у одного exchange будет всего одна связанная очередь. таким образом мы добьемся поведения аналогичного Gearman. Кстати, в документации подробно описано для чего сделан exchange, и как я понял, одна из причин это возможность связать с exchange несколько очередей. Вот только я не придумал, что за кейс такой когда это может понадобиться, ребята пишите в комментариях. И пишите, сталкивались ли вы с описанной выше ситуацией или я всё делаю неправильно?
Tags:
Hubs:
+8
Comments 7
Comments Comments 7

Articles