Комментарии 57
Не нашёл в тексте ссылку на исходный код проекта, он (код) закрыт?
Проект закрытый.
Постараемся открыть после перехода на кодовую базу Tarantool.
В в случае, если-бы вы сегодня озадачились аналогичной проблемой, подошло-ли бы вам какое-либо из существующих решений?
Для части задач подходит нынешняя версия RabitMQ
они сильно улучшили производительность в районе 2.8.х

Для остальных не знаю подходящего аналога.
Всего максимум 125 байт на сообщение, что существенно меньше, чем требуется авторам, судя по описанию. Мы же не знаем, для какого реального объема сообщений заявлена метрика «не меньше 50000 rps на вставках».
Сразу скажу — этой системы не видел.

что бросилось в глаза при первом знакомстве:
1. не указан размер сообщений которыми тестировали
2. в тестовой конфигурации сервера указано 12 SAS дисков
«HP StorageWorks 146GB 15K dual port SAS LFF disk drives 512547-B21»

Если загружают все диски, то, указанная цифра в 8 млн кажется маленькой. Опять же, зависит от размера сообщения.

я то же не указал конфигурацию.
В нашем случае эталон это 1 инстанс сервера очередей на одном типовом SATA диске. Размер сообщений равен 4096 байт пользовательских данных.

По другим тестам производительность не выглядит впечатляющей, возможно неправильные тесты (http://integr8consulting.blogspot.ru/2011/02/jms-speed-test-activemq-vs-hornetq.html)
Если я правильно понял вопрос про отложенную доставку:
— если нужно отложить обработку события приходит команда на переустановку времени активации события.
— если нужно отложенное событие снова вернуть в пул готовых к обработке приходит та же команда, устанавливающая время активации в текущее значение или в прошлое.
На момент разработки Apache Kafka еще не существовала
сейчас Kafka выглядит крайне интересно.
Нет смысле переделывать существующую архитектуру ради Kafka
в новом проекте можно использовать Kafka для транспорта важной статистики, транспорт между внутренними хранилищами и т.п.

мне нравится как используется Kafka в хранилище druid (http://druid.io)
В рамках Rabbitmq подтверждаю все сказанное и для текущих версий. Плюс были проблемы стабильности.
— непредсказуемая память
— очень, ОЧЕНЬ, тяжело распределять нагрузку (приложение celery-based), нормальная ситуация, что 2 потребителя загружены под завязку, а еще 8 по 1 задаче в секунду берут (это примерно при 2000-10000 вставок в секунду).
— катастрофическое падение производительности при больших ответах (так получилось тезнически, что в celery-based приложении пришлось отдавать ответы через rabbitmq, вес ответа доходил до 30 Мб, при около 2000 вставок/сек машина с двумя 8-core процами и 16Гб оперативы по LA уходила в 6-10)
Ну вот (
Уже думал, что это следующий шаг для нас после Gearman.
Попробуйте. В моем случае оснонвная проблема была в том, что разрабы не хотели вылечить свой код, и отдавать ответы через redis.

Думаю большая часть моих проблем и была связанно с тем, что результаты передавались через rabbit.

Я для себя писал с участием кролика пару проектов, где не было такой «фичи», у меня на стресс-тесте (яндекс танком) на среднего пошиба виртуалке на DO, все нормально жило при 15000 вставок/сек. На продакшене еще не смотрел, но не думаю, что что-то изменится.

Тут главное изначально подойти с умом к кролику, а не латать дыры. ИМХО.
Да, нагрузка пока маленькая, и этот этап в любом случае нужно пройти.
Опять же, повторюсь, с моей точки зрения — обратите внимание на размер ответов. Я делал, что в ответе идет клюс, по которому я с редиса забираю. Оверхед куда менше, чем задержки с раббитом. Ну и цена работы куда ниже.

Было подозрение, не подтвержденное, что как раз те потребители, которые отдавали большие данные и не брали потом больше задач. И когда загруженные потребители тоже отдавали данные, кролик все равно им пизал новые задачи.
А у кролика вообще заявлена балансировка, может он и не умеет?
Там есть параметры, лучше сами посмотрите.
Я игрался с кластером через haproxy, проблем не обнаружил. Но и в продакшн не сталвил.
Отложенная обработка действий пользователя.

А как решаются проблемы возникающие в результате задержки записи в базу? Ведь как я понял из описания данные от клиента ушли, но в базу еще не записались, т.к. находятся в очереди на запись. Клиент повторно запросил данные и:
* отдались старые данные из базы?
* из какого-то быстрого кэша?
* прямо из очереди?
* показали клиенту на фронте данные которые он оправил? (а если открыл в другом браузере?)
* просто забили?
Не очень момент понятен. Для примера пусть это будет обновление юзером своего профайла.
Никакой магии.
— пользователь отредактировал свой профиль, сообщение ушло в очередь.
— пользователю скажут что все ОК и покажут обновленные данные.
— если пользователь нажал F5 до того как данные дошли до хранилища, он увидит старые данные.

При нормальной работе сервиса события из очереди обрабатываются за несколько миллисекунд, данные окажутся в хранилище раньше чем браузер пользователя отобразит ответ от сервера.
Выгода проявляется во время сбоев, что-то с сетью, часть хранилищ перегружено, или аппаратный сбой, да все что угодно. В таком случае данные из очереди попадут не сразу в хранилище, важно, что они туда обязательно попадут. Неважно, ждет пользователь ответ от сервера, или уже ушел.
А есть что-то подобное постингу топиков? Как в этом случае?
Пользователь запостил сообщение, оно ушло в очередь, но из-за сбоя сети сообщение попало в очередь, но не ушло в базу. Допустим, что и у пользователя произошел сбой и ему пришлось обновить страницу. Своего сообщения он не увидел, ибо в базе его еще нет, поэтому он тут же постит точно такое же (сохранил перед постингом в блокноте, я сам так делаю регулярно). Оно так же попадает в очередь. Я правильно понимаю, что при записи в базу второго сообщение будет отклонено (считаем md5 хэш) если оно дословно повторяет первое, и размещено, если оно примерно такое же?
Описанный случай выходит за рамки ответственности сервера очередей
Это чистой воды бизнес логика обработчика сообщений из очереди.
В каких-то случаях можно по уникальному идентификатору контента догадаться что это то же самое что пытались вставить, а в каких-то банальный MD5
Мы пару лет как разрабатываем свое решение — IronMQ, есть вариант как сервиса так on-premise. Из описанных вами фич разве что еще нет приоритетов сообщений, но выруливаем при помощи разных очередей.
Интересный проект
расстраивает текстовый протокол для взаимодействия с сервером
Также есть частичная реализация протокола beanstalk (хотя там тоже ASCII over TCP)
У вас по ТЗ сразу получился сервер задач, а не сервер очередей сообщений. Ключевое отличие как раз в том, с чего у вас начинается первая картинка — существование такого признака как «время активации». Отсюда и причины некоторых других различий.
А в целом всё круто, респект. Особенно респект за каскадные очереди, мы тоже голову сломали в своё время, пока додумались.
Все верно,
нам нужен был нестандартный сервер сообщений, скорее специализированное хранилище
поверх которого построена ферма для выполнения разнообразных задач
К сожалению, ничего не могу сказать.
Это относительно новая система, на тот момент в Моем Мире не один год существовал свой сервер приложений
Может ли очередь жить в нескольких инстансах сервера (гео распределённость) на случай нештатных ситуаций?
К сожалению, нет.
Очередь может быть пошаржена между несколькими серверами
одно сообщение всегда обрабатывается с конкретного мастера шарда, жить может и в репликах

В планах есть реализация этого функционала, но, это будет нескоро.
а разве производительности тарантула не хватает?
его вполне хорошо можно использовать для очередей

вопрос 2 — кто пишет сервер очередей или перефразирую вопрос:
команда тарантула в разработке как-то участвует ;)
или просто взят их код?
не хватает,
очереди в тарантуле хороши для быстрого старта
по сравнению с нашим решением больше дополнительных данных на каждое сообщение

Разрабатывает команда Моего Мира.
Команда Tarantool не участвует.
По родословной все наоборот. Tarantool родился из фреймворка разработанного в Моем Мире
А запись на диск делается аналогично как в Redis? Я правильно понимаю, что запись выполняется с некоторой периодичностью (таймер? событие?) и если в очередь попало сообщение, но до момента следующей записи сервер упал, то сообщение теряется?
write вызывается на каждую команду.
Мы не отвечаем клиенту пока не получили подтверждение о выполнении write (это еще не гарантия того что данные попали на диск)
Каждые N записываемых записей, вызываем fdatasync, что бы данные все таки попали на диск.

Что у нас получается:
— Если упал софт, но, сервер не перезагружался — все нормально. Не будет потеряна ни одна команда.
— Если отключилось питание на сервере, то, мы можем потерять несколько записей которые находились в файловых кэшах, но, еще не записаны операционной системой на диск (записи после последнего fdatasync)

Кэш в жестких дисках считаем выключенным.
Т.е. так же как в Redis если нет кластера, то потеря данных возможно при падении всего сервера. Значит ли это, что через этот сервер очередей критичные данные (биллинг там) не ходят?
Все верно.
Единственное что мы можем обеспечить, что потеряно будет не больше N записей
Коммерческие серверы очередей рассматривались? Например IBM WebSphere MQ?
Приветствую…

Уже в который раз удивляюсь желанию скрестить «ежиков и ужиков». Тем не менее, идея протащить scheduling в messaging возникает раз за разом :-) Речь идет о, так называемой, отложенной доставке — delayed delivery.

Просто интересно, вы задумывались над тем почему, например, в AMQP этого нет? А, например, TTL у сообщений есть. Почему этого нет в стандарте JMS?

Желание, «с экономить на спичках» и при этом потенциально поиметь геморрой размером даже не с кулак… Я не говорю, что потребности в DD нет — она как раз возникает почти всегда когда пытаются использовать messaging. Но вот способ, которым в подавляющем большинстве случаев пытаются эту потребность удовлетворить, вызывает то смех, то слезы.

Люди… ну объясните, почему вы считаете, что DD это задача «транспорта»?!
Поддерживаю, тут явно 2 системы в одной. Причем и система очередей, и система планировщика явно выиграли бы от разделения.

Причем события, которые нужно выполнить «сейчас», вполне ложатся под идею планировщика. А вот отложенные события в идею очередей – никак.

Рассуждая дальше приходим к выводу, что это не система очередей, а планировщик, использующий проприетарную самописанную систему очередей.

Что вобщем-то довольно полезно.

Хотя сервер очередей можно было бы вынести во внешний протокол и разрешить использование сторонних MQ-серверов через протокол AMQP например.
Здравствуйте,
выше писал что у нас получилась специализированная система, не соответствующая классическому представлению о Message Queue

Почему и зачем — у нас были задачи, которые нужно было решать.
Сделать это существующими решениями с необходимой производительностью не удалось.
Для нас смешение логики оказалось очень удачным.
Сервер очередей одна из немногих активно используемых систем, которая не вызывает постоянного желания что-то доделывать и переделывать.
Наше решение пришлось по вкусу и в других проектах, как оказалось у них то же хватает «странных» задач
И вам — не хворать.

Я-то представляю себе, что примерно у вас получилось :-) Тем не менее, в заголовок вы вынесли именно «Сервер очередей».

>> Почему и зачем — у нас были задачи, которые нужно было решать.
>>Сделать это существующими решениями с необходимой производительностью не удалось.

Сдается, мне — не больно то и хотелось :-) Но, мне действительно интересно, изучали ли предметно тот же AMQP, или просто «по втыкали» в «кролика» и успокоились? Почему ограничились только «кроликом»? Если я все правильно понял про 2009 год, то он чуть менее чем ужасен в плане реализации AMQP на тот момент. Тот же qpid от Апачей на то время — практически эталон.

>> Для нас смешение логики оказалось очень удачным.

Ну дай-то бог :-) Рад за вас, если это действительно так.
От себя могу сказать, что за 10 лет я таких «удачных» решений видел множество. И все они — какие-то раньше, какие-то и после нескольких лет эксплуатации — показывали свое истинное лицо :-)

Что касается вашей системы, то, осмелюсь предположить, что ваши «группировки событий» — на самом деле «костыль со стразиками» :-)

По тексту не видно большинства интересующих нюансов… но, таки попробую полюбопытствовать. Я правильно понял, что сервер у вас не знает о наличии/отсутствии подписчик[а/ов]?
Я-то представляю себе, что примерно у вас получилось :-) Тем не менее, в заголовок вы вынесли именно «Сервер очередей».

Правильно, так как система ближе всего именно к серверу очередей

Сдается, мне — не больно то и хотелось :-) Но, мне действительно интересно, изучали ли предметно тот же AMQP, или просто «по втыкали» в «кролика» и успокоились? Почему ограничились только «кроликом»? Если я все правильно понял про 2009 год, то он чуть менее чем ужасен в плане реализации AMQP на тот момент. Тот же qpid от Апачей на то время — практически эталон.

Пробовали разное, но, больше всего RabbitMQ.
В qpid для сохранения данных на диск используется bdb, о какой производительности можно говорить… В моих экспериментах результаты были хуже RabbitMQ

От себя могу сказать, что за 10 лет я таких «удачных» решений видел множество

Звучит очень знакомо :)
Сейчас отвлекусь от очередей, Вы затронули популярную тему.

Обычно, новые сотрудники первые 3 месяца высказываются аналогично. А потом приходит понимание, зачем приходится писать свои велосипеды.
Мы пытаемся упростить себе жизнь и использовать сторонние разработки, к сожалению, очень редко удается пройти этап тестирования.
В подавляющем большинстве случаев разрекламированные решения не устраивают или по производительности, или по надежности или по критерию стабильности работы. Пробовали привлекать разработчиков сторонних систем, писать патчи — чаще всего не помогает.
И приходится в очередной раз разрабатывать свои собственные решения, менее универсальные, зато оптимизированные под наши задачи.

Это не значит что мы не используем чужие системы — используем, но, не так часто как хотелось бы.

Что касается вашей системы, то, осмелюсь предположить, что ваши «группировки событий» — на самом деле «костыль со стразиками» :-)

Не совсем так. Это своего рода бонус, который позволяет небольшими доработками улучшить производительность в критичных местах. Не будете же Вы говорить, что улучшать локальность данных, для того что бы уменьшить промахи кэша процессора сродни костылю? В данном случае работает тот же принцип.

Я правильно понял, что сервер у вас не знает о наличии/отсутствии подписчик[а/ов]?

Все верно. Вся логика управления подписчиками вынесена в отдельную подсистему. Это отдельный кластер машин со своей внутренней диспетчеризацией и управлением ресурсами выделяемыми для обработки той или иной очереди.

В qpid для сохранения данных на диск используется bdb, о какой производительности можно
говорить… В моих экспериментах результаты были хуже RabbitMQ


Если меня не подводит мой склероз, то в те времена «кролик» для message store использовал исключительно mnesia. А в qpid, опять же, емнип, «испокон веков» persister — вещь настраиваемая. Заменить bdb на что-то более подходящее — не проблема. По крайней мере у нас с этим никаких проблем не возникло в свое время. И отказались мы от него по совсем другой причине.

Звучит очень знакомо :)
Сейчас отвлекусь от очередей, Вы затронули популярную тему.


Я может быть не вполне ясно выразился… но, под «удачными решениями» я имел ввиду исключительно попытки «вшить» scheduling в messaging. А не какие-то абстрактные «велосипеды».

Не совсем так. Это своего рода бонус, который позволяет небольшими доработками улучшить производительность в критичных местах.


Да понятно, что бонус :-) Не было бы бонуса, не было бы и «стразиков» :-) А можно, тогда по подробней за эти группировки рассказать? Я же правильно понимаю, что речь идет о «группировке» сообщений одного и того же продюсера, которые он положил в разные очереди? «Группировку» делает сам продюсер, а не сервер?

Все верно. Вся логика управления подписчиками вынесена в отдельную подсистему.


Хм… но это означает, в том числе и то, что сервер не имеет возможности уведомить подписчика о том, что то сообщение, которое он обрабатывает, повторно активировано. Т.е. что истек timeout и данное сообщение разблокировано.

Каким образом избегаете повторной обработки сообщений, если не секрет?

И вот смотрите… отдельная подсистема «управления подписчиками» у вас есть. Но, что-то помешало вам сделать отдельную (от messaging) подсистему для DD. Хотя, казалось бы :-) Вот в чем сакральный смысл хранить активные и не активные сообщения в одних и тех же очередях? Это же еще и мониторинг системы усложняет.
Заменить bdb на что-то более подходящее — не проблема

Насколько помню, в то время, из альтернатив bdb qpid поддерживал только SQL базы в качестве системы записи данных

под «удачными решениями» я имел ввиду исключительно попытки «вшить» scheduling в messaging

Но, что-то помешало вам сделать отдельную (от messaging) подсистему для DD. Хотя, казалось бы :-) Вот в чем сакральный смысл хранить активные и не активные сообщения в одних и тех же очередях?

Суровая реальность вносит коррективы в идеальную картину мира :)
Потребуется в 2 раза больше дисков, 1 в сервере очередей, чтоб не потерялись события, другой в планировщике, так как в нем большое количество событий могут жить длительное время. Если писать все это на один диск, упадет производительность, на каждое событие придется делать 2 записи на диск вместо одной.
Что делать если клиент захотел внести коррективы в сообщение, уже доставленное планировщику? Можно слать через систему очередей служебные сообщения для планировщика, который будет понимать что это сообщение с мета данными полученного ранее события. Можно обучить клиентов знанию еще и о планировщике и слать изменения напрямую в него. Но, это тоже приводит к нехорошим последствиям. Сообщение по каким-то причинам может быть еще не получено планировщиком из очереди сообщений, а клиент уже прислал изменения. Получается много граничных условий, усложняющих жизнь
Это всего лишь несколько примеров.
Скрестив 2 сущности в одной, мы не очень-то усложнили реализацию сервера очередей, зато добились упрощения архитектуры проекта в целом.

Я же правильно понимаю, что речь идет о «группировке» сообщений одного и того же продюсера, которые он положил в разные очереди? «Группировку» делает сам продюсер, а не сервер?

Речь идет о группировке событий в рамках одной очереди с помощью установки одинакового времени активации. Необязательно должен быть именно один продюсер, обычно группировка делается по внутреннему id данных, например по id пользователя, это позволяет группировать события разным продюсерам.
Сервер про группировку ничего не знает.
Приведу пример:
— Анти спам решил что пользователь X потенциальный спамер (неважно почему)
— Мы хотим проверить последние Y действий пользователя попадающих в ленту к друзьям (загрузка фото, высказывания и т.п.)
— Продюсер получает список ID этих Y событий (из какого-то хранилища)
— Для каждого ID из этих Y порождает событие в очередь и устанавливает одинаковое время активации
В чем польза:
Обработчики событий будут ходить в дисковое хранилище, в котором лежит вся активность пользователей и получение каждого события, если его нет в кэше хранилища — это поход на диск.
Если же события активируются в близкий период времени, то, поход на диск если и будет, то только один. При чтении данных с диска в большинстве наших хранилищ делается prefetch, в кэше на какое-то время оседают и другие события пользователя, а не только те, что были запрошены select'ом из конкретного обработчика.
Таким образом мы уменьшаем суммарное время обработки всех Y событий пользователя X.
И все это на распределенной ферме подписчиков, без каких либо ухищрений в виде синхронизаций на уровне обработчиков.

Хм… но это означает, в том числе и то, что сервер не имеет возможности уведомить подписчика о том, что то сообщение, которое он обрабатывает, повторно активировано. Т.е. что истек timeout и данное сообщение разблокировано.

Каким образом избегаете повторной обработки сообщений, если не секрет?

Верно, сервер никого ни о чем не уведомляет.
Рецепт простой, проблем не доставляет.
Время реактивации сообщений в большинстве очередей выставляется в 1 — 2 часа.
Ни одно сообщение не может так долго обрабатываться. Если за это время оно не было обработано, значит что-то случилось или с обработчиком, или с хранилищем в которое ходит обработчик.
Если среднее время обработки сообщений начинает расти, или в очереди копятся заблокированные события — срабатывает мониторинг и разработчики бизнес логики идут искать ошибки.
Насколько помню, в то время, из альтернатив bdb qpid поддерживал только SQL базы в качестве системы записи данных


Сдается мне — плохо помните :-) Емнип, SQL persister был написан сторонними ребятами специально для Windows релиза, т.к. там были сложности с работой libbdbstore. Но, это даже не важно… важно то, что весь persistence слой в qpid — by design — это plug-in. В отличие от «кролика», где — на тот момент — использование dets и mnesia было «прибито гвоздями».

Так что, реализовать свой message persister для qpid — абсолютно не проблема. По крайней мере, у нас это проблем не вызвало.

Суровая реальность вносит коррективы в идеальную картину мира :)


Я как-то слабо начал Вас понимать. Вы хотите меня убедить в том, что в текущей реализации изменение состояния сообщения (active/passive) не требует дисковых операций?! Или в чем?

Рассуждение о том, что «в 2 раза больше дисков» я, с вашего позволения, спишу на выходные :-) Мне сложно представить причину зачем это может потребоваться в реальности. Но — не спорю — реализовать можно и так, что нужно будет и в 3 раза больше дисков :-)

Что делать если клиент захотел внести коррективы в сообщение, уже доставленное планировщику?… и т.д. по скипано мной


Мне совершенно не понятно, зачем продюсеру с «планировщиком» общаться асинхронно? С сервером очередей же взаимодействие синхронное. Я правильно понял? Зачем — при наличии «планировщика» — продюсеру вообще знать о сервере очередей?

Более того, если «планировщик» реализован как некий фасад сервера очередей, вполне возможно обеспечить и zero-copy сообщений.

Речь идет о группировке событий в рамках одной очереди с помощью установки одинакового времени активации. Необязательно должен быть именно один продюсер...


Хм… а можно тогда пример, с разными продюсерами? А то как-то не понятно, каким образом этим _разным_ продюсерам удается промеж собой договорится об одинаковой группировке? Плюс, из примера не ясно, в чем же именно заключается _группировка_? Только в том, что у этих событий одинаковое время активации? Т.е. все события в одной очереди с временем активации X — это одна и та же группа?

По вашему примеру вопрос: почему в описанном случае продюсер не может сам прочитать эти Y событий, и послать уже данные этих событий, а не их ID? В этом, случае, имхо, гораздо больше гарантий что пробоя кэша при чтении не случится.

Верно, сервер никого ни о чем не уведомляет.
Рецепт простой, проблем не доставляет.
Время реактивации сообщений в большинстве очередей выставляется в 1 — 2 часа.
Ни одно сообщение не может так долго обрабатываться.


Извините, но тут я вообще перестаю что-либо понимать. Понятно, что два часа на обработку сообщения — это много. Но сдается мне, если обработчик забрал у сервера 1000 сообщений, и у всех у них «время реактивации» выставлено пусть даже в 2 часа, то у обработчика на самом деле есть только 7.2 секунды на обработку одного сообщения. Иначе, последнее сообщение — будет повторно активировано.

7 секунд — это, конечно, тоже не мало… но и с выставленным «временем реактивации» у сообщения тоже имеет мало общего. Или обработчик, как-то дополнительно уведомляет сервер о начале обработки конкретного сообщения?
Если не сложно, то могли бы привести пример каскадных очередей в ваших проектах?
Можете ли указать, какие вопросы (проблемы) очередей разрешались эвристиками (их семантика), а какие строгим математическим подходом и как это обосновывалось?
Только полноправные пользователи могут оставлять комментарии. Войдите, пожалуйста.

Информация

Дата основания
Местоположение
Россия
Сайт
team.mail.ru
Численность
5 001–10 000 человек
Дата регистрации

Блог на Хабре