Website development
30 September 2009

AMQP-PHP чат

Вот, дошел до первого практического воплощения моих первых двух статей. Далее будут изложены только идеи… Идеи уже воплощенные и идеи, которые воплощаются…

В отличие от других протоколов передачи сообщений (XMPP STOMP или Memcache (MemcacheQ)) AMQP обладает большей гибкостью.

Немного теории или вернемся к основам AMQP. Более подробно описано <a href=«habrahabr.ru/blogs/webdev/62502» title «Практика использования AMQP»>здесь

Публикация осуществляется передачей Сообщения (Message) в индивидуальный Обмен (Exchange)
Подписчик (Consumer) читает сообщения из индивидуальной Очереди (Queue)
С помощью bind осуществляется связь Очереди и Обмена через систему маршрутных ключей (routing key).
Каждая публикация Сообщения в Обмен сопровождается маршрутным ключом, в соответствии с которым оно направляется в соответствующую очередь. Существует три типа Обмена: funout, direct, topic. Нам нужен последний, так как он позволяет распределять сообщения по очередям в соответствии с патернами ключей.

Для реализации чата, для каждой чат-руум необходим свой Обмен. Пусть у нас будет открыто три чат-руум: php, mysql & hiload с одноименными названиями обменов.

Если вернуться к API PHP AMQP, то при инициализации чата с тематикой РНР мы должны объявить обмен:
// exchange declare exchange.php
$rabbit = new Rabbit(); // connection
$rabbit->exchange('php', "topic"); // declade exchange

Тип обмена должен быть топик (topic), так как предполагается использовать маршрутизацию по паттерну ключа (что это такое расшифруем дальше).

Когда Пользователь входит в чат (регистрация сессии пользователя), то он является Подписчиком и Публицистом одновременно, мы же должны видеть свои сообщения, не так ли?
По этому необходимо объявить очередь входящих сообщений. Для каждого участника чата должна быть своя очередь, если мы одновременно участвуем в нескольких чат-комнатах, то должно быть по очереди на каждую чат-комнату.

В принципе, если бы WEB позволял держать постоянное долговременное соединение, то было бы достаточно одной очереди на каждый чат. Это реально при создании отдельного чат-клиента, типа как ICQ или маил-агент: открыли соединение-передали запрос-ждем сколько угодно долго события — получили оповещение — передали ответ — закрыли соединение. Но при организации обмена поверх HTTP предполагается асинхронность обмена: открыли соединение-передали запрос-получили ответ, возможно даже пустой -закрыли соединение — через некоторый период вновь опрашиваем сервер.

При регистрации в чате — объявляем очередь:
// login.php
$rabbit = new Rabbit(); // connection
$rabbit->queue('php.fanat', AMQP_DURABLE); // declade queue

AMQP_DURABLE — флаг, который указывает, что сообщения при перезагрузки AMQP-брокера должны быть сохранены в БД

Если г-н fanat хочет одновременно поучаствовать и в чате mysql, то необходимо будет объявить новую очередь:
$rabbit->queue('mysql.fanat', AMQP_DURABLE);

Следующий шаг — это необходимо привязать очередь «php.fanat» к обмену «php»:
$rabbit-> bind( 'php', 'php.fanat', 'fanat' ); // binding exchange to queue where routing_key

тоже делается на клиентской стороне каждого из участников.

Пусть в чате присутствуют трое участников: fanat, fisher, fixxxer. Для публикации сообщения, например от участника, fanat необходимо выполнить код: // sendMessage.php
$rabbit = new Rabbit(); // connection

$msg = "сегодня провел тестирование шаблонизаторов для новой ЦМС";
$rabbit->publish('php','*',$msg); // публичная публикация

данная инструкция осуществляет публикацию в обмен 'php' с ключом '*'. Так как у нас тип обмена 'topic', то возможна
публикация сообщений по паттерну. Например, публикация в обмен 'php' с ключом 'fanat' направит сообщение в очередь 'php.fanat', а публикация сообщения в обмен 'mysql' с ключом 'fisher', направит сообщение в очередь 'mysql.fisher' (чат по mysql). Это так называемы прямой (direct) тип обмена. Но, нам надо отправить сообщение всем участникам чата, вот тут то и будет использован паттерн: если вместо имени участника указать '*', то это обозначает отправить сообщение во все подписанные (связанные по bind) очереди. Для организации приватного чата, например: fisher — fixxer, как раз можно осуществлять публикацию по прямому ключу:
// sendPrivateMessage.php
$rabbit = new Rabbit(); // connection

$msg = "я тут решил в блице сделать небольшой патчик, для реализации наследования шаблонов...";
$rabbit->publish('php','fisher',$msg); // приватная публикация

самое интересное, что кроме изменения ключа — больше ничего делать не надо, все за тебя сделает AMQP-брокер.
Технически, публикация осуществляется отправлением AJAX POST запроса с WEB страницы чата на url: sendMessage.php Приватность можно определять дополнительным параметром.

Если бы у нас был не WEB клиент, то как я упоминал выше — был бы и подход иной.
По этому, классическая подписка на Очередь в этом случае не подходит. Мы должны организовать постоянный асинхронный опрос очередей. В общем, принцип любого чата — это постоянный опрос сервера на наличие новых сообщений и при их присутствии — их отображение. Здесь, тоже ничего нового не придумано. С WEB страницы чата, мы по AJAX делаем запрос на скрипт опрооса очередей: define("MSGCOUNT", 5);
$rabbit = new Rabbit(); // connection
$countMessage = $rabbit->queue('php.fisher'); //получаем кол-во сообщений в очореди

$msg = array();
if (!$countMessage)
return json_encode( $msg ); //возвращаем пустой массив

for ($i=0;$i< MSGCOUNT) {
$res = $rabbit->getQueueItem('php');
if ($res['count'],0) break;
$msg[] = $res['msg'];
}
return json_encode( array( 'msg' => $msg ,
'count'=>$res['count'],
)); //возвращаем массив массив сообщений

данный код считывает MSGCOUNT или все если их меньше и возвращает JSON с полученными из очереди сообщений (ключ msg) и остатком ключ 'count'

Есть несколько открытых вопросов,
Например, мы хотим оставлять в чате короткую историю сообщений, пусть будет последние пять, чтоб быть на танке…
Можно использовать флаг AMQP_NOASK, то сообщение помечается как не прочитанное. Когда мы входим в чат, то считывает все сообщения как прочитанные, за исключением последних пятнадцать. Здесь есть еще поле для обдумывания, например — стоит ли хранить кучу сообщений в очереди. Или организовать временную очередь последних пяти-десяти сообщений для вновь входящих. В общем есть место для экспериментов.

+5
7.7k 35
Comments 19
Top of the day