Pull to refresh

Comments 19

Зачем здесь кафка, если есть нативный для go nats?

Nats из коробки как минимум не персистентный (не берём в рассмотрение Jetstream)

Ну кстати это хороший вопрос, я не имею отношения к статье и ее кейсу, но со своей стороны я бы сказал, что джетстрим по моему опыту показал себя гораздо медленнее и менее надёжным, чем кафка

А где можно посмотреть на такие числа?

На 5-ти мегабайтных пакетах для push и pull c lz4 (но не snoopy, как по умолчанию), да еще в protobuf, Kafka существенно опередит Jetstream. Особенно если много автономных подписчиков-сервисов на один топик. На одиночных сообщениях с одним подписчиком - скорее всего проиграет.

Мне хотелось рассмотреть решения связанные именно с kafka. Ведь kafka довольно часто применяют для решения подобных задач.

Как раз последнее и накладывает ограничения на его применение. Ведь в жизни почти всегда сталкиваемся с гетерогенной средой, как бы нам не хотелось этого избежать. Кроме того наличие Debezium и Confluent - большой плюс. А управление метаданными в protobuf/avro/json через schema registry из коробки - огромный плюс.

А статус лидирующей партиции и "остальных" как то повлияют на балансировку?

Такие эксперименты не продовил. Попробую порассуждать.

Насколько мне известно, producer всегда записывает сообщения в Leader-партицию.

Пусть producer посылает сообщение в партицию 0. Запись в партицию 0 в данный момент недоступна. Пусть запись недоступна по следующей причине. У consumer установлен параметр acks=all, сообщение записалось в Leader-партицию, и не записалось в необходимое количество Follower-партиций (запись подтвердили меньше Follower-партиций, чем установлено в параметре брокера min.insync.replicas). Тогда сообщение, которое producer пытался записать в партицию 0 не будет записана ни в какую другую партицию - producer получит ошибку.

Получается, что при записи сообщение не попадет в неправильную партицию

Теперь посмотрим на чтениие.

По умолчанию consumer читает сообщения из Leader-партиции. Пусть чтение из Leader-партиции невозможно. Такое могло произойти из-за сетевых задержек или перегрузки брокера. Тогда из Follower-партиций назначается новая Leader-партиция, и из нее продолжит читать сообщения consumer. Если новая Leader-партиция будучи Folower-партицией не успела получить все сообщения от прежней Leader-партиции, то consumer не получит часть сообщений. Насколько я знаю, эти сообщения kafka не восстановит.

Тогда отвечу на ваш вопрос так: не повлияют.

Но, повторюсь, собственноручно я это не проверял.

А почему нельзя сделать три партиции по количеству БД и не повесить на них три синка, каждый на свою БД? По умолчанию они будут вливать в БД по 500 собщений суммарным размером не больше мегабайта, но эти параметры можно настраивать. А уже обработка сообщений, если таковая требуется, в том же PostgreSQL в триггерах все равно построчная. Зато обработка 500 сообщений в одной транзакции будет производиться на порядок быстрее, чем обработка этих же сообщений в 500 транзакциях.

Согласен с вами, есть более оптимальные решения.

Хотелось показать, как можно действовать в условиях, когда есть возможность использовать только ограниченное количество партиций. Пример, конечно, сильно упрощенный. Параллельный consumer имеет смысл использовать, когда обработка сообщения заключается в обращении к нескольким внешним системам и применении проверок предметной области, а не только в записи в базу.

Надо было больше внимания в статье уделить ограничениям, тогда, возможно, решения выглядели бы более оправданными. Учту это в будущем.

Спасибо вам за комментарий.

А, насколько целесообразно использовать каналы вместе с Kafka?

Если мы наберём много сообщений из Kafka, в калалы. И наше приложение упадет, то все они потеряются...

Если будем сильно ограничивать, канал, у нас может получится так, что один из каналов будет простаивать.

Целесообразность подхода

На вопрос, когда такой подход целесообразно применить, гораздо лучше меня ответит список сценариев использования библиотеки Parallel Consumer: https://github.com/confluentinc/parallel-consumer/blob/master/README.adoc (пункт 3.4. Scenarios).
В основном, насколько я понимаю, авторы Parallel Consumer предлагают использовать этот подход, когда увеличивать количество партиций не представляется возможными, либо когда увеличение партиций значительно не ускоренит обработку сообщений.

Потеря сообщений после падения приложения

Потери сообщений можно избежать, если фиксировать пакет прочитанных сообщений явно вызовом метода Commit в коде обработчика сообщений только после того, как сообщения обработаны. Если приложение упадет в процессе обработки пачки сообщений, то после перезапуска приложения все незафиксированные сообщения считаются consumer'ом повторно.

Тут еще надо сказать, что segmentio/kafka неявно для клиента библиотеки считывает из kafka сообщения в буферный канал msgs, и именно из msgs извлекается по одному сообщения вызовом метода Fetch. Думаю, что большинство реализаций consumer поступают аналогично - используют буфер считанных сообщений в оперативной памяти приложения.

Менее активное использование одного канала по сравнению с остальными (если я правильно вас понял).

Думаю, что это проблема не подхода с каналами, а больше проблема балансировки нагрузки и распределения сообщения по очередям. Ведь может оказаться, что и одна из партиций kafka будет пуста, потому что алгоритм распределения сообщений не гарантирует равномерного распределения.

Я согласен, что в любом подходе есть плюсы и минусы. Думаю, что нужно находить компромиссные решения с учетом исходных данных и ограничений решаемой задачи.

Код еще не смотрел, вопрос по фразе:

Библиотека segmentio/kafka-go позволяет реализовать интерфейс Balancer, чтобы направить потоки сообщений в соответствующие партиции. 

Балансер здесь подразумевается, что находится в продюсере или является частью кафки?

Балансер находиться в продюсере.

Конфигурация "Две партиции - четыре обработчика"

Не разбирался, что это было. Переставил кафку, теперь не воспроизводится.

Я тоже наблюдал подобные сбои и на других конфигурациях и на этом docker-образе kafka в целом. На постоянной основе не воспроизводится, поэтому разобраться тоже пока не удалось.

Sign up to leave a comment.

Articles