Как стать автором
Обновить

Комментарии 4

хорошобы такую статью про ksql server, т.к. там вообще ничего не понятно как всё это дело горизонтально масштабируется…
так же при множества ksql server (горизонтальное масштабирование) не ясно как что работает при JOIN допустим стримов если ksql на разных машинах…
в общем масса вопросов как N-KSQL проецируются на 1 или N kafka (кластер)
Спасибо за статью! Пользуюсь RabbitMQ, сильно смущает его производительность. Рассматриваю варианты.

В RabbitMQ можно задать message-ttl и x-dead-letter-exchange, что позволяет перебросить сообщение в другую очередь, если сообщение никто не успел забрать. Как с этим в Kafka?
Спасибо за спасибо:)

В Кафке нет встроенного функционала, который позволяет назначать TTL на отдельные сообщения и перемещать их между очередями. По большей части это связанно с подходом к хранению сообщений, который используется в Кафке. В партиции, где хранятся сообщения, вы не можете менять их местами или удалять по-отдельности. Так же у сообщения в Кафке нет глобального статуса прочитанности, как в AMQP-брокерах. Консьюмеры могут читать сообщения в партиции произвольное количество раз, причем параллельно.

Однако ничего не мешает вам реализовать описанный функционал на стороне клиента. Как вариант, можно записывать в тело сообщения TTL. Когда консьюмер дойдет до сообщения, он сравнит его таймстэмп + TTL с текущим временем, после чего либо штатно обработает, либо пропустит сообщение, попутно записыв его в dead letter queue.
Спасибо за ответ, я примерно тоже самое понял из гугления. К сожалению не сходится, нужно сильно переосмысливать архитектуру в таком случае, т.к. у меня помимо ttl идёт завязка на prefetch count. Ну и приоритетные очереди я тоже использую.

Пока размышляю написать свой сервер очередей на PHP. Звучит подозрительно, но кажется там всё очень просто. Условный RabbitMQ на потоке в 10к сообщений (не persistent!) / сек уже полностью съедает CPU условного сервера, что сильно смущает — там работы-то раз-два и обчёлся.

Моя задача следующая. Есть поток сообщений разного приоритета, которые я должен асинхронно передать поставщику со скоростью не более N сообщений в секунду. Передача идёт в формате запрос-ответ, как только поставщик ответил — сообщение считается переданным. Соответственно есть размер асинхронного окна равный prefetch count.

В случае с RabbitMQ у меня получается очень простой код:
$amqp->prefetch(ASYNC_WINDOW_SIZE);
$amqp->consume('queue', function($message){
    $this->queue[] = $message;
});
setInterval(1 / MSG_PER_SECOND, function(){
    if(!count($this->queue)) return;
    $message = array_shift($this->queue);
    $this->sendMessageToProvider($message)->then(function() use($message){
        $message->ack();
    });
});


Если сообщение просрочилось — оно уйдёт в очередь отправки на резервного поставщика, с точно таким же консьюмером. Соответственно если поток сообщений превышает разрешенную скорость на поставщика — у меня сработает TTL и часть сообщений уйдёт на резерв, почти без задержки (ttl = 1 сек, не критично). Если поставщик прилёг или тормозит с приёмом сообщений — оно так же уйдёт на резерв.

Так же тут важно то, что я могу отправить ACK не по порядку получения (из-за чего мне не подходят очереди на Redis, который у меня используется и работает очень шустро).

PS: Ещё один камень в огород AMQP — либо я не нашёл, либо там нет от сервера подтверждения на отправленный ACK. А подтверждение на publish включается отдельно и по-умолчанию отключено… Мне критично контролировать, что операция выполнена успешна.
Зарегистрируйтесь на Хабре, чтобы оставить комментарий