Информация

Дата основания
Местоположение
Россия
Сайт
team.mail.ru
Численность
5 001–10 000 человек
Дата регистрации

Блог на Хабре

Обновить
Комментарии 14
В питоне 3.5 появился сахар для асинхронных штук, так что можно переписать так:
async def worker(tube):
    while True:
        task = await tube.take(.5)
        if not task:
            break
        # process(task.data)
        await task.ack()
И все-таки это не «сахар для асинхронных штук». Для того, чтобы ваш пример работал, надо все имеющиеся корутины превращать в awaitable-objects. Либо с помощью @types.coroutine, либо внедрением __async__ метода для существующих классов.
Код
import asyncio


class Coro:
    def __init__(self, val):
        self.val = val

    @asyncio.coroutine
    def result(self):
        return self.val


async def worker(coro: Coro):
    while True:
        res = await coro.result()
        print(res)
        await asyncio.sleep(res)


if __name__ == '__main__':
    loop = asyncio.get_event_loop()

    workers = [asyncio.async(worker(c), loop=loop)
               for c in (Coro(1), Coro(1.5), Coro(2.33))]

    loop.run_until_complete(asyncio.wait(workers))


Работает и без ручного превращения. В 3.5 декоратор asyncio.coroutine сам сделает декорируемый объект Awaitable.

Или я не так понял?
Согласен! Можно использовать «async def» и «await», разрабатывал и отлаживал aiotarantool еще под python-3.4.3
Планирую скоро перейти на этот сахар.
Для этого на одном из серверов необходимо выбрать «пачку», в очередь на «соседнем» сервере добавить маленькое сообщение с информацией о последнем id-пользователя из текущей «пачки». При обработке маленького сообщения на втором сервере мы должны получить «новую пачку» начиная с указанного id, сформировать аналогичное сообщение «серверу-соседу», и т.д., пока не переберем всю базу. Текущую «пачку» нужно всегда обрабатывать локально в своей очереди. Таким образом, мы «как бы» переместим код к нашим данным, распараллелим генерацию «пачек» по серверам и не будем гонять данные по сети.

Зачем так все усложнять с это эстафетной палочкой и делать отдельные локальные очереди?
Что произойдет, если один из серверов не сможет достать пачку и передать эстафету дальше? Цепочка может оборваться в любом месте. Что один из серверов упал? Будет отправлена только часть пушей, а маленькое сообщение даже не сможет попасть в очередь?

Я вижу решение этой проблемы в виде каскадной очереди (цифры взяты из головы, должны настраиваться, их нужно подобрать для своего проекта):

Для инициализации отправки нужно либо сделать крон-скрипт, который будет запускаться раз нужный промежуток времени, либо специальную очередь, которая для примера, описанного в статье будет иметь 1 сообщение "подготовить пуши пользователям", после обработки этого сообщения для него (этого сообщения) будет выставляться delay (если нужно отсылать раз в сутки):
+(24 часа - время обработки)

Эта очередь может использоваться и для инициализации и других каскадных очередей. Назовем эту очередь Очередь1.

Когда сообщение «подготовить пуши пользователям» берется в обработку, обработчик извлекает минимальный и максимальный id пользователя, делает, допустим 10000 диапазонов, и создает 10000 сообщений в Очередь2 с сообщением "подготовить пуши пользователям, у которых id от n до m". Когда сообщение "подготовить пуши пользователям, у которых id от n до m" из очереди Очередь2 берется в обработку, обработчик делит диапазон, допустим, еще на 10000 диапазонов и создает еще 10000 сообщений в эту же очередь. Так происходит до тех пор, пока диапазоны не станут достаточно маленькими.

Когда диапазоны становятся достаточно маленькими, то создаются сообщения на непосредственно саму отправку пушей (для iOS в Очередь3 для Android в Очередь4) с сообщением, содержащим id пользователя. Соответственно при обработке очередей Очередь3 и Очередь4 происходит непосредственно отправка пушей. Для iOS и Android сделано разделение затем, что для них нужен как минимум разный коннект, который можно использовать для нескольких сообщений.

Теперь о серверах. Где будет находиться и обрабатываться очередь Очередь1 — не принципиально, но важно мониторить то, что тачка живая и если что создавать и обрабатывать эту очередь на другом сервере. Для очередей Очередь2, Очередь3 и Очередь4 можно использовать хоть все доступные сервера очередей, при этом решать, на какой сервер записать сообщение для очереди Очередь2 можно хоть просто случайным образом, а для Очередь3 и Очередь4 можно, например, использовать остаток отделения user_id на кол-во серверов.

Где обрабатывать Очередь2, не принципиально, а очереди Очередь3 и Очередь4 лучше на разных группах серверов, что бы было проще контролировать нагрузку. Обработку очередей не обязательно делать на тех же серверах, где крутится сервер очередей.
При обработке сообщений сообщения можно брать из доступных серверов по пачками по очереди. Допустим обрабатываем Очередь3, у нас доступно 5 серверов, мы достаем 1000 сообщений из первого, обрабатываем, 1000 из второго, обрабатываем и так по кругу.

Получается что у есть только одна точка отказа — сервер, где крутится Очередь1. С остальным проблем нет — падает очередь на одном из серверов — сообщения с этого сервера временно не обрабатываются, зато сам сервер может использоваться для обработки. Либо на сервере может по каким-то причинам упасть обработчик, но сообщения с этого сервера будут забираться другими серверами, на которых обработчики работают.

Надеюсь понятно объяснил.
Схема с диапазонами вполне рабочая. Да, в Tarantool Queue можно выставить delay и подготовить сообщения заранее.
Min и Max для ключа также можно быстро извлечь из Tarantool.
Кейс с delay мы используем при рассылки пушей по временным зонам для приложения «Гороскопы».
Сообщения генерируются заранее во все очереди, обрабатываются по delay.

Если данные по primary key распределены неравномерно, то возможно деление по диапазонам приведет к неэффективной выборке пачек.
Также нужно подумать как разбивать на диапазоны, если primary key не числовой.
Что будет, если упадет сервер, на котором генерируются самые первые диапазоны в момент их генерации?

По поводу падения сервера. Маленькое сообщение отправляется перед обработкой пачки.
Если «соседний» сервер упал, то сообщение будет отправлено на следующий доступный сервер.
Для сообщений нужно гарантированно выполнить ack.
Также в Tarantool есть WAL и восстановление после аварии из снапшота.

В нашем случае мы не выбираем данные с очередей на соседних серверах намеренно.
Иначе все обработчики должны слушать все очереди в кластере, это увеличит нагрузку на инстансы Tarantool Queue,
а также создаст дополнительный ненужный траффик по сети.

Также нет опроса очередей 1000 с одного сервера, 1000 с другого, и т.д.
Все логические очереди обрабатываются асинхронно на одном сервере локально в один коннект к Tarantool Queue.
Именно это дает большую скорость их обработки.

По поводу «падает очередь», Tarantool Queue не падает, работает надежно!
«Умирает» сервер (такое у нас случалось не раз) — все работает как и работало.
Да, сообщения из очередей на этом сервере, если они там были, будут обработаны только после того, как сервер реанимируют.
Также такая конфигурация очень проста в эксплуатации, все сервера — одинаковые.
В нашем случае мы не выбираем данные с очередей на соседних серверах намеренно.

Согласен, такую оптимизацию сделать можно, и она так же будет работать в каскадной очереди.

По поводу падения сервера. Маленькое сообщение отправляется перед обработкой пачки.
Если «соседний» сервер упал, то сообщение будет отправлено на следующий доступный сервер.

Я вижу здесь следующий тонкий кейс.
Что если сообщение было успешно отправлено на следующий сервер, но не было там обработано из-за того, что сервер упал? Ведь от получения сообщения до его обработки может пройти относительно не маленький промежуток времени.
В этом случае поднятие этого сервера будут ждать все остальные.

В схеме с единственной точкой отказа решить проблему будет гораздо проще. В случае недоступности такого сервера, активировать эту очередь на другом, причем это можно автоматизировать.

По поводу «падает очередь», Tarantool Queue не падает, работает надежно!

Так не бывает. Во первых баги есть везде, возможно с ними вы просто еще не сталкивались. Во вторых здесь может быть элементарно человеческий фактор. Так что совсем исключать такой сценарий я бы не стал.
Что если сообщение было успешно отправлено на следующий сервер, но не было там обработано из-за того, что сервер упал?

Да, будет нужно ждать пока сервер поднимут.
Тогда можно сделать отдельную очередь для таких «эстафетных» сообщений, и в случае падения сервера с такой очередью активировать ее где-то автоматически. Подумаю о такой фиче.
Да, еще один момент хотел спросить.
Как я понимаю рассылка пушей должна начинаться на каком-то сервере. Что если упал этот сервер? Тогда рассылка вообще не начнется, так же как и в случае каскадной очереди.
Как я понимаю решить проблему можно так же, как и в случае с каскадной очередь.
Рассылку новости инициирует редактор через админку, от админки сайта приходит http-запрос, и если HTTP 200 ОК, то начинается рассылка пуш-уведомлений. Также http-запрос может быть автоматически отправлен на другой доступный сервер.
При таком подходе у каскадной очереди не запуститься будет еще меньше вероятность:)
Для будущих поколений напишу, что на 2017 год в статье есть некорректная фраза: «Одновременно с такими рассылками кластер из восьми серверов отправляет около 10 тыс. пуш-уведомлений в секунду».

По нашим замерам, один инстанс tarantool queue в докер контейнере на машине разработчика выдаёт 4 тысячи сообщений в секунду (put + take + ack). Соответственно, потребовалось бы всего 3 ядра для 12 тысяч сообщений в секунду, а не кластер из 8 серверов.
Учитывалась ли отправка в ios/android в замерах?
nekufa 10 тыс. пуш-уведомлений в секунду — это нагрузка с нашего продакшен на момент написания статьи.
Это вовсе не означает, что в нет запаса на рост нагрузки.

А еще не могли бы тогда уточнить как вы измеряли? Сообщения какого размера использовали?
Только полноправные пользователи могут оставлять комментарии. Войдите, пожалуйста.