Как стать автором
Обновить

Введение в EagleMQ

Время на прочтение7 мин
Количество просмотров5K
EagleMQ – это новый высокопроизводительный менеджер очередей. Основные решаемые задачи это эффективное распределение сообщений между процессами, межпроцессорная коммуникация и уведомления реального времени.

image



Для решения основных задач имеется 3 главных примитива: очереди (queue), маршруты (route) и каналы (channel).

Очереди
Очереди являются самым главным примитивом. Очереди позволяют хранить сообщения и выдавать их клиенту в том же порядке в котором они пришли (принцип FIFO).

Каждая очередь перед использованием должна быть декларирована. Если очередь не декларирована то при работе с ней не будет доступен весь набор команд. Декларирование позволяет отслеживать использование очереди клиентами и автоматически ее удалять если она никем не используется и при создании очереди был указан флаг AUTODELETE.

Сообщения могут иметь гарантированную доставку. Для применения гарантированной доставки нужно при извлечении сообщения из очереди командой .queue_pop указать timeout возврата отличным от нуля (в милисекундах). Затем при получении сообщения подтвердить доставку командой .queue_confirm (по уникальному идентификатору сообщения). При неподтверждении доставки в указанное время сообщение будет возвращено в очередь как самое старое и выдано следующему клиенту как можно скорее.

Также сообщения могут быть с ограниченым сроком хранения (expiration time) и будут автоматически удалены в случае истечения срока. Срок хранения является атрибутом посылаемого в очередь сообщения.

Очереди имеют поддержку асинхронной доставки поступающих сообщений клиентам. Для использования асинхронной доставки нужно подписаться на очередь командой .queue_subscribe с указанием в флагах режима подписки. Очереди имеют 2 режима подписки: сообщение и уведомление.
В режиме сообщения клиентам посылается событие в котором содержится название очереди, в которую поступило сообщение и само сообщение. Сообщение в очередь в таком случае не попадает т. к. уже послано клиентам. Если очередь создана с флагом ROUND_ROBIN тогда сообщение будет послано только одному клиенту каждый раз, а не всем. Для распределения посылки сообщений клиентам используется алгоритм round-robin.
В режиме уведомления всем клиентам подписанным на очередь с использованием такого режима посылается сообщение в котором содержится только название очереди. Данное событие будет послано всем подписчикам очереди с таким режимом, даже если сообщение было кому-то отправлено и не попало в очередь.
Отписаться от асинхронной доставки сообщений можно командой .queue_unsubscribe.

При создании очереди указывается само название очереди, максимальное количество сообщений которое можно хранить, максимальный размер сообщения и флаги.
Максимальное количество сообщений позволяет прекратить поступление новых сообщений в очередь при достижения лимита.
Максимальный размер сообщения позволяет запретить отправку сообщений с большим размером. Данный параметр не может иметь значение больше чем 2147483647.

Очередь поддерживает 4 флага: AUTODELETE, FORCE_PUSH, ROUND_ROBIN, DURABLE.

Флаг AUTODELETE используется для автоматического удаления очереди если она ни кем не декларирована и не имеет подписчиков.

Флаг FORCE_PUSH позволяет отправлять в очередь сообщения при достижении лимита максимального количества сообщений. Это достигается путем удаления более старого сообщения чтобы сохранить новое.

Флаг ROUND_ROBIN позволяет посылать сообщение только одному подписчику каждый раз(см. описание выше).

Флаг DURABLE указывает что очередь является персистентной и может быть сохранена вместе со всеми сообщениями в хранилище.

Список команд для работы с очередями:

  • .queue_create(name, max_msg, max_msg_size, flags)
  • .queue_declare(name)
  • .queue_exist(name)
  • .queue_list
  • .queue_rename(from, to)
  • .queue_size(name)
  • .queue_push(name, message)
  • .queue_get(name)
  • .queue_pop(name, timeout)
  • .queue_confirm(name, tag)
  • .queue_subscribe(name, flags)
  • .queue_unsubscribe(name)
  • .queue_purge(name)
  • .queue_delete(name)


Маршруты
Маршруты являются впомогательным примитивом для использования очередей. Маршруты позволяют связывать очереди с определенным ключом и организовывать быструю и эффективную доставку сообщений.

Для связывания очереди с ключом используется команда .route_bind. За посылку сообщения в маршрут отвечает команда .route_push.

Если при посылке сообщения в маршрут по ключу было связано несколько очередей, то копирование сообщения не производится (references counter) что позволяет ускорить выполнение команды и снизить потребление памяти сервером.

Маршрут поддерживает 3 флага: AUTODELETE, ROUND_ROBIN, DURABLE.

Флаг AUTODELETE используется для автоматического удаления маршрута если он не имеет связей с очередями по ключу.

Флаг ROUND_ROBIN позволяет при получении сообщения маршрутом посылать сообщение только одной очереди связанной по ключу. Для распределения сообщений по очередям используется алгоритм round-robin.

Флаг DURABLE указывает что маршрут является персистентным и может быть сохранен вместе со всеми ключами в хранилище.

Список команд для работы с маршрутами:
  • .route_create(name, flags)
  • .route_exist(name)
  • .route_list
  • .route_keys(name)
  • .route_rename(from, to)
  • .route_bind(name, queue, key)
  • .route_unbind(name, queue, key)
  • .route_push(name, key, message)
  • .route_delete(name)


Каналы
Каналы это примитив для удобной доставки сообщений клиентам в реальном времени.

Принцип работы заключается в том, что клиент может подписаться на определенный topic командой .channel_subscribe. При отправке сообщения в канал другим клиентом командой .channel_publish на данный topic клиенту который подписался на этот topic будет послано событие с сообщением.

Также можно подписать на группу topic'ов по шаблону командой .channel_psubscribe. Форматом шаблона является glob-style pattern.

Канал поддерживает 3 флага: AUTODELETE, ROUND_ROBIN, DURABLE.

Флаг AUTODELETE используется для автоматического удаления канала если он не имеет подписчиков.

Флаг ROUND_ROBIN позволяет посылать сообщение только одному из множества подписчиков. Для распределения сообщений между подписчиками используется алгоритм round-robin.

Флаг DURABLE указывает что канал является персистентным и может быть сохранен в хранилище.

Список команд для работы с каналами:

  • .channel_create(name, flags)
  • .channel_exist(name)
  • .channel_list
  • .channel_rename(from, to)
  • .channel_publish(name, topic, message)
  • .channel_subscribe(name, topic)
  • .channel_psubscribe(name, pattern)
  • .channel_unsubscribe(name, topic)
  • .channel_punsubscribe(name, pattern)
  • .channel_delete(name)


Пользователи
Каждый клиент EagleMQ должен быть аутентифицирован чтобы иметь права на выполнение команд. Без аутентификации доступна только одна команда — .disconnect.

EagleMQ может иметь множество пользователей и каждый пользователь может иметь свой набор прав.

Пользовательские права имеют большую гибкость в настройке. Можно отключить любую команду (кроме .disconnect) и следовательно любой примитив.

Это может быть полезно во множестве случаев. Например вы используете сервер как очередь задач и у вас имеется множество клиентов которые создают задачи, а также множество клиентов которые их обрабатывают. В таком случае клиентам которые создают задачи рекомендуется разрешить использование только команды .queue_push. Клиентам обработчикам будет достаточно прав только на команду .queue_pop. Это поможет сохранить данные и четко задать клиенту роль в системе.
Также это может быть полезно если вы поставили работать сервер во внешней сети (например публичная real-time рассылка предупреждений о землетресении) и позволяете любому пользователю подключится на прямую (в таком случае пользователю достаточно прав на .channel_subscribe и .channel_psubscribe).

Список команд для работы с пользователями:

  • .user_create(name, password, perm)
  • .user_list
  • .user_rename(from, to)
  • .user_set_perm(name, perm)
  • .user_delete(name)

Всего в EagleMQ на текущий момент имеется 44 команды. Список всех команд есть в документации.

Производительность
EagleMQ имеет достаточно высокую производительность. Строгие комплексные измерения и сравнительные тесты еще не проводились. Это связано во многом с тем, что некоторые подобные системы имеют возможность тонкой настройки под задачу (например RabbitMQ) и мой опыт в этих системах не позволяет оценить объективно.

В отличии от остальных систем EagleMQ почти не нуждается в дополнительном конфигурировании влияющем на производительность. Также очень мала вероятность ошибиться при выборе примитива или использовать его неправильно.

Для тестирования использовалась тестовая утилита benchmark из состава libemq.

Суть тестирования заключается в том, чтобы отправить сообщения в очередь командой .queue_push. Приложение использует 50 потоков. Для соединения с сервером использовались локальные TCP сокеты.

Intel Core(TM) i5-2450M CPU @ 2.50GHz
Clients Total Messages Message Size Time, seconds Req/Sec
50 1 000 000 1000 9.20 108283.71
50 1 000 000 100 8.07 123931.09
50 1 000 000 10 8.05 124300.80
50 100 000 1000 0.95 105596.62
50 100 000 100 0.85 117096.02
50 100 000 10 0.81 123304.56


Intel Core(TM) i5-3470 CPU @ 3.20GHz
Clients Total Messages Message Size Time, seconds Req/Sec
50 1 000 000 1000 5.19 192566.92
50 1 000 000 100 4.47 223613.59
50 1 000 000 10 4.40 227169.47
50 100 000 1000 0.53 189393.94
50 100 000 100 0.46 219298.25
50 100 000 10 0.45 220750.55


Персистентность
Каждый примитив EagleMQ обладает персистентностью. Обычно для поддержки этой функциональности примитив должен быть создан с флагом DURABLE. Также в конфигурационном файле должен быть указан путь к хранилищу и задан интервал сохранения.

Все данные постоянно хранятся в оперативной памяти.

Принцип работы механизма сохранения состоит в клонировании процесса (fork) по заданому интервалу и записи всех необходимых данных в файл. Для сохранения данных по требованию клиента имеется команда .save.

Все сохраняемые данные по возможности сжимаются библиотекой liblzf.

Протокол
EagleMQ имеет свой собственный бинарный протокол.

Бинарные протоколы почти всегда используются в подобных системах (AMQP, ZMQ) и помогают достигнуть высокую производительность при обработке запроса.

Имеются конечно же и минусы:
  1. Бинарный протокол не является human-readable.
  2. Могут быть сложности при реализации клиентских библиотек на скриптовых языках программирования.
  3. Необходимость использовать совместимый клиент т.к. бинарный протокол может измениться.


Протокол EagleMQ может изменяться только в главных (major) релизах.

Roadmap
  1. Улучшение поддержки packet aggregation для возможности посылки нескольких команд за один раз. Сейчас такая поддержка уже имеется, но с потерей пакетов.
  2. Документирование бинарного протокола.
  3. Написание CLI для управления EagleMQ с помощью командной строки.
  4. Доработка и сопровождение сайта www.eaglemq.com.
  5. Поддержка little и big engian архитектур.
  6. Покрытие всей функциональности тестами.


Все желающие могут задать мне свой вопрос в коментариях или по почте. Буду рад ответить.

Репозиторий проекта: GitHub
Теги:
Хабы:
Всего голосов 35: ↑33 и ↓2+31
Комментарии36

Публикации