Pull to refresh

Использование RabbitMQ вместе с MonsterMQ часть 3

Reading time4 min
Views1.8K
В нашей предыдущей статье мы создали очередь задач. Она предполагает что одна задача в виде сообщения доставляются одному получателю. В этой статье мы сделаем кое-что другое, мы будем отсылать одно сообщения сразу нескольким получателям. Мы создадим систему логгирования которая будет состоять из двух программ: первая будет отправлять сообщения, а вторая получать и выводить их. В нашей системе, все запущенные получатели будут получать сообщение отосланное отправителем.

Оглавление всех частей:
Часть 1
Часть 2
Часть 3
Часть 4
Часть 5

Exchanges(обменники)


В предыдущих статьях мы отправляли и получали сообщение из очередей, теперь пришло время представить полную модель переправки сообщений в RabbitMQ.

Давайте быстро пройдёмся по тому что мы рассматривали в предыдущих статьях:

  • Producer — приложение которое посылает сообщения
  • Queue — буфер который хранит сообщения
  • Consumer — приложение которое получает и обрабатывает сообщения

Ключевая идея RabbitMQ (а точнее AMQP) состоит в том, что отправитель(Producer) никогда не отправляет сообщение напрямую в очередь. Более того он даже не знает попало ли оно в очередь. Вместо этого отправитель отсылает сообщения обменникам (Exchange). Обменник очень простая вещь, он делает две вещи: получает сообщение от отправителей и перенаправляет их в очередь. Обменники бывают разных типов: одни направляют сообщения в определённую очередь (тип direct), другие направляют одно и то же сообщения сразу в несколько очередей (тип fanout), третьи перенаправляют сообщения в очереди опираясь на определённые, заданные правила перенаправления (тип topic).

image
(изображение взято с официального сайта RabbitMQ)

Давайте рассмотрим обменник типа fanout. Для того чтобы его создать напишем следующий код:

$producer->newFanoutExchange('logs');

Fanout обменник работает по очень простой схеме, он просто отсылает получаемые сообщение всем привязанным к нему очередям.

Чтобы перечислить все обменники существующие на сервере вызовите rabbitmqctl

sudo rabbitmqctl list_exchanges

В этом списке будут amq.* обменники и стандартный безымянный (пустая строка) обменник. Не обращайте на них внимания они нам пока не нужны.

В прошлой статье мы ничего не знали об обменниках, но тем не менее были способны отправлять сообщения. Так происходило потому что если в MonsterMQ вы не указываете явно обменник как третий аргумент метода Producer::publish() библиотека будет использовать стандартный безымянный обменник RabbitMQ, который отсылает сообщения в очереди, имена которых переданы как второй аргумент методу Producer::publish().

Теперь мы можем отсылать сообщения нашему новому обменнику, указав его имя как третий аргумент метода Producer::publish():

$producer->publish($message, '', 'logs');

Теперь давайте создадим двух воркеров с двумя очередями, которые будут существовать до тех пор, пока воркеры, которые их объявили, активны. Сделать это можно следующим образом:

//Worker 1
$producer->queue('queue-1')->setExclusive()->declare();

//Worker 2
$producer->queue('queue-2')->setExclusive()->declare();

Теперь когда воркер, объявивший очередь, завершит сессию и отключится, она автоматически удалится. Если завершат работу оба воркера, удалятся обе очереди.

Связываем очередь с обменником


Мы уже создали обменник (exchange) и очередь (queue). Теперь нам осталось сказать обменнику, чтобы он отсылал полученные сообщения в наши очереди. Нам нужно связать (bind) обменник и очереди. Для этого напишем следующий код:

//Worker 1
$producer->queue('queue-1')->bind('logs');

//Worker 2
$producer->queue('queue-2')->bind('logs');

Отныне наш обменник logs связан с нашими очередями и будет переправлять полученные сообщения в них.

Вы можете перечислить все существующие связи следующей командой на linux

rabbitmqctl list_bindings

Соединяем код вместе


Наш скрипт отправителя претерпел не так много изменений по сравнению с предыдущим уроком. Главное отличие в том, что сейчас он отсылает сообщения, предварительно объявленному обменнику, а не стандартному (доступному из коробки). Вот код send.php

try {
   $producer = \MonsterMQ\Client\Producer();

   $producer->connect('127.0.0.1', 5672);
   $producer->logIn('guest', 'guest');

   $producer->newFanoutExchange('logs');

   $message = implode(' ', array_slice($argv, 1));
   $message = empty($message) ? 'Hello world!' : $message;

   $producer->publish($message, '', 'logs');

   echo "\n Sent {$message} \n";
} catch(\Exception $e) {
   var_dump($e);
}

Стоит упомянуть, что послав сообщение обменнику, не связанному с какой-либо очередью, сообщение потеряется. Но сейчас это нас устраивает, так как мы ещё не запускали наших получателей.

Вот код нашего первого воркера worker-1.php

try {
   $consumer = \MonsterMQ\Client\Consumer();

   $consumer->connect('127.0.0.1', 5672);
   $consumer->logIn('guest', 'guest');

   $producer->queue('queue-1')->setExclusive()->declare();
   $producer->queue('queue-1')->bind('logs');

   $consumer->consume('queue-1');

   $consumer->wait(function ($message, $channelNumber) use ($consumer){
      echo "\n $message \n";
   });
} catch(\Exception $e) {
   var_dump($e);
}

Вот код второго воркера worker-2.php

try {
   $consumer = \MonsterMQ\Client\Consumer();

   $consumer->connect('127.0.0.1', 5672);
   $consumer->logIn('guest', 'guest');

   $producer->queue('queue-2')->setExclusive()->declare();
   $producer->queue('queue-2')->bind('logs');

   $consumer->consume('queue-2');

   $consumer->wait(function ($message, $channelNumber) use ($consumer){
      echo "\n $message \n";
   });
} catch(\Exception $e) {
   var_dump($e);
}

Если вы хотите сохранить сообщения отправленные первому воркеру в лог-файл просто вызовите следующую команду в консоли:

php worker-1.php > logs_from_rabbit.log

Чтобы запустить обоих воркеров и выводить сообщения в окна терминалов вызовите следующие команды, каждое в своём окне:

php worker-1.php

php worker-2.php

А чтобы отсылать сообщения вызовите следующую команду:

php send.php

В следующем уроке мы разберём как получать только определённое подмножество сообщений из всех отправляемых.
Tags:
Hubs:
Total votes 7: ↑6 and ↓1+5
Comments0

Articles