Comments 28

Ну зачем же вот так-то писать?


            while (true)
            {
                if (cancellationToken.IsCancellationRequested)
                    break;

Почему нельзя проверку в условие цикла вынести?


Зачем создается новый экземпляр MessageBus в методе SubscribeOnTopic и почему ему никто не делает Dispose?


Почему _consumer разрушается как в методе Dispose, так и в методе SubscribeOnTopic?


Зачем SubscribeOnTopic принимает параметр CancellationToken, если вы туда передаете None?


Что такое TimeSpan.FromMilliseconds(10) в методе SubscribeOnTopic, и, если это тайм-аут операции чтения, зачем он вообще нужен? Вы же все равно после истечения тайм-аута сразу же делаете повторную попытку.

Спасибо за конструктивную критику реализации библиотеки, многие моменты не помешало бы исправить.
1) Проверку внести в условие цикла — не помешает, но не критично.
2) Насчет MessageBus — решил сделать так из-за метода, которым мы подписываемся на сообщения и внутри которого у нас цикл. И потом выполнять метод этого обьекта в отдельном потоке.
3) В методе Dispose он может и не разрушится, т.к. там он обозначен как тип Nullable. Но согласен, избыточность.
4) SubscribeOnTopic может принимать CancellationToken по известным причинам, но я просто решил не использовать этот функционал. Еще там есть перегруженный конструктор, через аргумент которого мы можем указать хост, где у нас выполняется Apache Kafka, но его я тоже не использовал
5) Этот параметр в библиотеке Confluent.Kafka отвечает, как вы правильно заметили, за тайм-аут операции чтения из топика, и здесь указано просто дефолтное значение, на котором я не заострял внимание.
Насчет MessageBus — решил сделать так из-за метода, которым мы подписываемся на сообщения и внутри которого у нас цикл. И потом выполнять метод этого обьекта в отдельном потоке.

Но зачем создавать новый объект только ради одного его поля? Код был бы намного менее индусским если бы _consumer был не полем, а локальной переменной.


Этот параметр в библиотеке Confluent.Kafka отвечает, как вы правильно заметили, за тайм-аут операции чтения из топика, и здесь указано просто дефолтное значение, на котором я не заострял внимание.

Этого тайм-аута вообще не должно быть! Вы, фактически, выделили отдельный поток для потребителя — а значит, можете позволить себе ожидание до победного.

Этого тайм-аута вообще не должно быть! Вы, фактически, выделили отдельный поток для потребителя — а значит, можете позволить себе ожидание до победного.

Так и есть, для каждого потребителя я создаю отдельных поток, и этот параметр необходим для того, чтобы определить, как часто мы будем обращаться к Apache Kafka и получать оттуда сообщения. Кроме того, такой вариант с потоками для каждого топика мне показался наиболее удачным в плане понимания того, как взаимодействовать с Apache Kafka

Нет, этот параметр определяет как часто поток будет переставать получать сообщения от брокера.

Да, while (!canceled) { } — тоже чудесный цикл...


Вы вообще осознаете, что с таким подходом ваши распределенные микросервисные приложения будут всухую проигрывать классическим монолитам по производительности и требованиям к железу? Добавить еще пару десятков микросервисов, как того учит микросервисная архитектура — и вот уже для их запуска требуется кластер из десятка серверов даже при отсутствии нагрузки?

Упор был сделан на максимальное упрощение системы для знакомства с концепцией микросервисной архитектуры. Никто и не говорит о промышленной реализации кода со всевозможными проверками и оптимизациями.

А я и не говорю про оптимизации. Я говорю о том, что проект уровня "Hello, world!" не должен подвешивать систему.


Для знакомства с концепцией это особенно важно.

Два вопроса:
Сколько времени реально уходит на цикл запрос-ответ?
Почему kafka женского рода?

Хороший вопрос, если честно, в разговоре я просто привык употреблять Apache Kafka в женском роде. Но, можно было бы ответить, что на официальном сайте написано, что Apache Kafka™ is a distributed streaming platform. А слово «платформа» — женского рода:)
И да, на одну итерацию у меня уходило довольно много времени, что-то порядка 300-500 миллисекунд. Связано, по всей видимости, как с реализацией библиотеки, так и с настройками Apache Kafka

На самом деле, это связано с тем, что у вас шесть потоков висят в активном ожидании, не давая происходить никакой полезной работе.

Почему kafka женского рода?

— А вы Кафку любите?
— Да, особенно грефневую!
:)
Одно но: cейчас если мы, например, запустим 5 штук NameService, то в MainApp придет 5 имен, а не одно. Это из-за настроек Apache Kafka, прописанных в файле server.properties. В рамках туториала я этого намеренно не касаюсь, чтобы не усложнять материал.

У вас получается по коду, что делая запрос из клиента MainApp, мы сразу выходим из цикла и пишем ответ уже в другом потоке. При этом получается, что будь это веб-приложение, нам бы пришлось поддерживать на стороне бэкенда свою очередь тоже (т.е. пользователь нажимает на кнопку "Сгенерировать имя", мы сопоставляем этому запросу некий идентификатор, сохраняем его, а потом клиент через какое-то время достает ответ отдельным запросом к бэкенду, имея на руках этот идентификатор). Это, как известно, имеет свои последствия (в частности, возникает вопрос о масштабируемости такого решения). Но я всегда считал, что Message Broker должен позволять это сделать без всяких самописных очередей. Разве протокол Kafka не позволяет сделать нечто подобное?


var response = await msgBus.SendMessage(topic: gTopicNameCmd, message: gMessage);
Console.WriteLine(response);

Здесь подразумевается, что запрос и ответ имеют одинаковый topic gTopicNameCmd. Следовательно, Kafka сама разруливает кому какой ответ отдать, а мы ждем от нее ответ и никуда не уходим, пока он не прилетит.

Вы правы, некоторая неоднозначность с отправкой/приемом сообщения имеет место быть. Но вот с очередями на стороне сервера — не уверен что правильно вас понял. Apache Kafka — это такой большой черный ящик, и всю механику того, кем и в какой последовательности было отправлено сообщение контролирует тоже Kafka. Нашими идентификаторами принадлежности сообщений в данном случае являются названия топиков.
var response = await msgBus.SendMessage(topic: gTopicNameCmd, message: gMessage);
Console.WriteLine(response);

Я очень хотел реализовать также, когда это все задумывал, потому что это было бы намного проще и понятнее, но не нашел подходящих вариантов реализации клиента для общения с Kafka.

Да, Вы меня не поняли немного. Переформулирую: как вы отдадите случайное имя JS-скрипту, который обратился к вашему бэкенду? Вы либо будете писать в тело каждого сообщения для Kafka какой-то идентификатор, а потом этот идентификатор отдавать клиенту (то что я описал выше). Вариант второй: Вы поставите в паузу текущий поток общения клиента с сервером и будете ждать, пока другой поток не запишет результат в Ваше хранилище (при этом потребность в идентификаторах никуда не уходит, потому что вам нужно как-то различать одинаковые топики). В первом варианте клиент может оставить в покое сокет и сделать еще несколько запросов, дожидаясь результата на своей стороне. Во втором варианте сокет между браузером и приложением же будет висеть, поток для обработки тоже будет в повисшем состоянии, и вскоре сервер поднимет лапки (если мы говорим о более менее нагруженном сайте). Прибавьте к этому горизонтальное расширение бэкенд-серверов и распределение нагрузки между ними, и вы получите какую-то кашу.

Переформулирую: как вы отдадите случайное имя JS-скрипту, который обратился к вашему бэкенду?
В таком случае у нас есть 2 варианта того, как его отдать(оба кстати хорошо описаны здесь): либо мы делаем запросы к каждому из наших NameService, у которых должно быть WebApi, либо обращаемся к некому GatewayApi, который аккумулирует информацию со всех NameService.
Запросы JS скрипта происходят синхронно по HTTP.
Обмен сообщениями между сервисами NameService и GatewayApi происходит асинхронно с помощью Kafka, что также позволяет не привязываться к конкретным сущностям NameService за счет того, что все они подписываются/отправляют сообщения по заранее установленной схеме.
Когда мы делаем синхронный запрос, например, к GatewayApi, то GatewayApi посылает сообщение в Kafka о том, что нужно новое имя. Это сообщение видят все (или некоторые, в зависимости от кол-ва разделов в топике, но не суть) сущности NameService, для которых оно предназначалось, и генерируют имя, которое в конце концов получает GatewayApi. Можно безболезненно ввести сколько угодно таких сущностей NameService для распределения нагрузки, и это будет легко сделать.

Вас спрашивают о деталях реализации GatewayApi — а вы зачем-то начинаете про масштабируемость NameService рассказывать. Вы вообще читать умеете?!


Вы так и не объяснили, каким образом информация дойдет до точки назначения — скрипта на фронтенде.


GatewayApi получил HTTP-запрос и отправил сообщение в кафку. Через какое-то время из кафки пришел ответ. Как связать запрос от клиента с ответом-то?

Я ответил развернуто по нескольким ключевым моментам, о которых был вопрос. О GatewayApi вообще-то изначально в вопросе ни слова, это уже я предложил вариант реализации той задачи, о которой говорит Dobby007, прочитайте его комментарий внимательнее.
Видимо, написал недостаточно ясно: мы делаем синхронный HTTP запрос к GatewayApi и ждем, пока он нам не отдаст имя, которое получит из Kafka.
Видимо, написал недостаточно ясно: мы делаем синхронный HTTP запрос к GatewayApi и ждем, пока он нам не отдаст имя, которое получит из Kafka.

Как он поймет, кому из клиентов его отдавать?

По сессии. Делаются обычные запросы, и в простейшем случае у GatewayApi есть 2 поля для мужского и женского имени, которые обновляются при получении сообщений из кафки. Они же и и отдаются при запросе клиентов

То есть вы в принципе не рассматриваете задачу отдавать разным клиентам — разные имена?


Это надо было написать сразу же.

А в чем в данном случае преимущество общения микросервисов через Kafka (messaging) по сравнению с обычным синхронным request/response + load balancer?
Преимущество заключается, например, в том, что в последней версии Kafka 0.11 появилась новая фича, благодаря которой теперь можно реализовать «exactly-once» семантику доставки сообщений. Но, возвращаясь к вопросу, в конкретно «данном» случае, если говорить о сравнении с синхронным запросом к балансировщику, то у нас синхронным является только отправка сообщения о том, что мы хотим получить имя в Kafka. А ответ мы ждем в отдельном потоке.

Но является ли ожидание ответа в отдельном потоке преимуществом?

Так я про преимущества спрашивал, не про особенности. Мне вот интересно, чем в данном случае оправдано применение сложной распределенной системы и почему это лучше простого LB?
Ну смотрите: это обучающий материал, призванный, с одной стороны, познакомить с микросервисами и Apache Kafka, а в другой — сделать это максимально понятно и просто. Потенциальные преимущества такого подхода — exactly once семантика, гарантируемая Apache Kafka, о которой я уже написал, возможность повторить сообщения «из прошлого», их параллельная обработка за счет разбиения топика на разделы, и др.
Но в статье о этих преимуществах почти ничего нет(разве что ссылка на них на офф. сайте), и я сделал это намеренно, чтобы не усложнять, потому что сразу все охватить не получится. Вот и написал вводную, чтобы читатель, если захочет, пойдет изучать Apache Kafka уже дальше самостоятельно.
Only those users with full accounts are able to leave comments. Log in, please.