В нашей предыдущей статье мы создали очередь задач. Она предполагает что одна задача в виде сообщения доставляются одному получателю. В этой статье мы сделаем кое-что другое, мы будем отсылать одно сообщения сразу нескольким получателям. Мы создадим систему логгирования которая будет состоять из двух программ: первая будет отправлять сообщения, а вторая получать и выводить их. В нашей системе, все запущенные получатели будут получать сообщение отосланное отправителем.
Оглавление всех частей:
Часть 1
Часть 2
Часть 3
Часть 4
Часть 5
В предыдущих статьях мы отправляли и получали сообщение из очередей, теперь пришло время представить полную модель переправки сообщений в RabbitMQ.
Давайте быстро пройдёмся по тому что мы рассматривали в предыдущих статьях:
Ключевая идея RabbitMQ (а точнее AMQP) состоит в том, что отправитель(Producer) никогда не отправляет сообщение напрямую в очередь. Более того он даже не знает попало ли оно в очередь. Вместо этого отправитель отсылает сообщения обменникам (Exchange). Обменник очень простая вещь, он делает две вещи: получает сообщение от отправителей и перенаправляет их в очередь. Обменники бывают разных типов: одни направляют сообщения в определённую очередь (тип direct), другие направляют одно и то же сообщения сразу в несколько очередей (тип fanout), третьи перенаправляют сообщения в очереди опираясь на определённые, заданные правила перенаправления (тип topic).
(изображение взято с официального сайта RabbitMQ)
Давайте рассмотрим обменник типа fanout. Для того чтобы его создать напишем следующий код:
Fanout обменник работает по очень простой схеме, он просто отсылает получаемые сообщение всем привязанным к нему очередям.
Чтобы перечислить все обменники существующие на сервере вызовите rabbitmqctl
В этом списке будут amq.* обменники и стандартный безымянный (пустая строка) обменник. Не обращайте на них внимания они нам пока не нужны.
В прошлой статье мы ничего не знали об обменниках, но тем не менее были способны отправлять сообщения. Так происходило потому что если в MonsterMQ вы не указываете явно обменник как третий аргумент метода Producer::publish() библиотека будет использовать стандартный безымянный обменник RabbitMQ, который отсылает сообщения в очереди, имена которых переданы как второй аргумент методу Producer::publish().
Теперь мы можем отсылать сообщения нашему новому обменнику, указав его имя как третий аргумент метода Producer::publish():
Теперь давайте создадим двух воркеров с двумя очередями, которые будут существовать до тех пор, пока воркеры, которые их объявили, активны. Сделать это можно следующим образом:
Теперь когда воркер, объявивший очередь, завершит сессию и отключится, она автоматически удалится. Если завершат работу оба воркера, удалятся обе очереди.
Мы уже создали обменник (exchange) и очередь (queue). Теперь нам осталось сказать обменнику, чтобы он отсылал полученные сообщения в наши очереди. Нам нужно связать (bind) обменник и очереди. Для этого напишем следующий код:
Отныне наш обменник logs связан с нашими очередями и будет переправлять полученные сообщения в них.
Вы можете перечислить все существующие связи следующей командой на linux
Наш скрипт отправителя претерпел не так много изменений по сравнению с предыдущим уроком. Главное отличие в том, что сейчас он отсылает сообщения, предварительно объявленному обменнику, а не стандартному (доступному из коробки). Вот код send.php
Стоит упомянуть, что послав сообщение обменнику, не связанному с какой-либо очередью, сообщение потеряется. Но сейчас это нас устраивает, так как мы ещё не запускали наших получателей.
Вот код нашего первого воркера worker-1.php
Вот код второго воркера worker-2.php
Если вы хотите сохранить сообщения отправленные первому воркеру в лог-файл просто вызовите следующую команду в консоли:
Чтобы запустить обоих воркеров и выводить сообщения в окна терминалов вызовите следующие команды, каждое в своём окне:
А чтобы отсылать сообщения вызовите следующую команду:
В следующем уроке мы разберём как получать только определённое подмножество сообщений из всех отправляемых.
Оглавление всех частей:
Часть 1
Часть 2
Часть 3
Часть 4
Часть 5
Exchanges(обменники)
В предыдущих статьях мы отправляли и получали сообщение из очередей, теперь пришло время представить полную модель переправки сообщений в RabbitMQ.
Давайте быстро пройдёмся по тому что мы рассматривали в предыдущих статьях:
- Producer — приложение которое посылает сообщения
- Queue — буфер который хранит сообщения
- Consumer — приложение которое получает и обрабатывает сообщения
Ключевая идея RabbitMQ (а точнее AMQP) состоит в том, что отправитель(Producer) никогда не отправляет сообщение напрямую в очередь. Более того он даже не знает попало ли оно в очередь. Вместо этого отправитель отсылает сообщения обменникам (Exchange). Обменник очень простая вещь, он делает две вещи: получает сообщение от отправителей и перенаправляет их в очередь. Обменники бывают разных типов: одни направляют сообщения в определённую очередь (тип direct), другие направляют одно и то же сообщения сразу в несколько очередей (тип fanout), третьи перенаправляют сообщения в очереди опираясь на определённые, заданные правила перенаправления (тип topic).
(изображение взято с официального сайта RabbitMQ)
Давайте рассмотрим обменник типа fanout. Для того чтобы его создать напишем следующий код:
$producer->newFanoutExchange('logs');
Fanout обменник работает по очень простой схеме, он просто отсылает получаемые сообщение всем привязанным к нему очередям.
Чтобы перечислить все обменники существующие на сервере вызовите rabbitmqctl
sudo rabbitmqctl list_exchanges
В этом списке будут amq.* обменники и стандартный безымянный (пустая строка) обменник. Не обращайте на них внимания они нам пока не нужны.
В прошлой статье мы ничего не знали об обменниках, но тем не менее были способны отправлять сообщения. Так происходило потому что если в MonsterMQ вы не указываете явно обменник как третий аргумент метода Producer::publish() библиотека будет использовать стандартный безымянный обменник RabbitMQ, который отсылает сообщения в очереди, имена которых переданы как второй аргумент методу Producer::publish().
Теперь мы можем отсылать сообщения нашему новому обменнику, указав его имя как третий аргумент метода Producer::publish():
$producer->publish($message, '', 'logs');
Теперь давайте создадим двух воркеров с двумя очередями, которые будут существовать до тех пор, пока воркеры, которые их объявили, активны. Сделать это можно следующим образом:
//Worker 1
$producer->queue('queue-1')->setExclusive()->declare();
//Worker 2
$producer->queue('queue-2')->setExclusive()->declare();
Теперь когда воркер, объявивший очередь, завершит сессию и отключится, она автоматически удалится. Если завершат работу оба воркера, удалятся обе очереди.
Связываем очередь с обменником
Мы уже создали обменник (exchange) и очередь (queue). Теперь нам осталось сказать обменнику, чтобы он отсылал полученные сообщения в наши очереди. Нам нужно связать (bind) обменник и очереди. Для этого напишем следующий код:
//Worker 1
$producer->queue('queue-1')->bind('logs');
//Worker 2
$producer->queue('queue-2')->bind('logs');
Отныне наш обменник logs связан с нашими очередями и будет переправлять полученные сообщения в них.
Вы можете перечислить все существующие связи следующей командой на linux
rabbitmqctl list_bindings
Соединяем код вместе
Наш скрипт отправителя претерпел не так много изменений по сравнению с предыдущим уроком. Главное отличие в том, что сейчас он отсылает сообщения, предварительно объявленному обменнику, а не стандартному (доступному из коробки). Вот код send.php
try {
$producer = \MonsterMQ\Client\Producer();
$producer->connect('127.0.0.1', 5672);
$producer->logIn('guest', 'guest');
$producer->newFanoutExchange('logs');
$message = implode(' ', array_slice($argv, 1));
$message = empty($message) ? 'Hello world!' : $message;
$producer->publish($message, '', 'logs');
echo "\n Sent {$message} \n";
} catch(\Exception $e) {
var_dump($e);
}
Стоит упомянуть, что послав сообщение обменнику, не связанному с какой-либо очередью, сообщение потеряется. Но сейчас это нас устраивает, так как мы ещё не запускали наших получателей.
Вот код нашего первого воркера worker-1.php
try {
$consumer = \MonsterMQ\Client\Consumer();
$consumer->connect('127.0.0.1', 5672);
$consumer->logIn('guest', 'guest');
$producer->queue('queue-1')->setExclusive()->declare();
$producer->queue('queue-1')->bind('logs');
$consumer->consume('queue-1');
$consumer->wait(function ($message, $channelNumber) use ($consumer){
echo "\n $message \n";
});
} catch(\Exception $e) {
var_dump($e);
}
Вот код второго воркера worker-2.php
try {
$consumer = \MonsterMQ\Client\Consumer();
$consumer->connect('127.0.0.1', 5672);
$consumer->logIn('guest', 'guest');
$producer->queue('queue-2')->setExclusive()->declare();
$producer->queue('queue-2')->bind('logs');
$consumer->consume('queue-2');
$consumer->wait(function ($message, $channelNumber) use ($consumer){
echo "\n $message \n";
});
} catch(\Exception $e) {
var_dump($e);
}
Если вы хотите сохранить сообщения отправленные первому воркеру в лог-файл просто вызовите следующую команду в консоли:
php worker-1.php > logs_from_rabbit.log
Чтобы запустить обоих воркеров и выводить сообщения в окна терминалов вызовите следующие команды, каждое в своём окне:
php worker-1.php
php worker-2.php
А чтобы отсылать сообщения вызовите следующую команду:
php send.php
В следующем уроке мы разберём как получать только определённое подмножество сообщений из всех отправляемых.