Comments 28
Ну зачем же вот так-то писать?
while (true)
{
if (cancellationToken.IsCancellationRequested)
break;
Почему нельзя проверку в условие цикла вынести?
Зачем создается новый экземпляр MessageBus в методе SubscribeOnTopic и почему ему никто не делает Dispose?
Почему _consumer разрушается как в методе Dispose, так и в методе SubscribeOnTopic?
Зачем SubscribeOnTopic принимает параметр CancellationToken, если вы туда передаете None?
Что такое TimeSpan.FromMilliseconds(10) в методе SubscribeOnTopic, и, если это тайм-аут операции чтения, зачем он вообще нужен? Вы же все равно после истечения тайм-аута сразу же делаете повторную попытку.
1) Проверку внести в условие цикла — не помешает, но не критично.
2) Насчет MessageBus — решил сделать так из-за метода, которым мы подписываемся на сообщения и внутри которого у нас цикл. И потом выполнять метод этого обьекта в отдельном потоке.
3) В методе Dispose он может и не разрушится, т.к. там он обозначен как тип Nullable. Но согласен, избыточность.
4) SubscribeOnTopic может принимать CancellationToken по известным причинам, но я просто решил не использовать этот функционал. Еще там есть перегруженный конструктор, через аргумент которого мы можем указать хост, где у нас выполняется Apache Kafka, но его я тоже не использовал
5) Этот параметр в библиотеке Confluent.Kafka отвечает, как вы правильно заметили, за тайм-аут операции чтения из топика, и здесь указано просто дефолтное значение, на котором я не заострял внимание.
Насчет MessageBus — решил сделать так из-за метода, которым мы подписываемся на сообщения и внутри которого у нас цикл. И потом выполнять метод этого обьекта в отдельном потоке.
Но зачем создавать новый объект только ради одного его поля? Код был бы намного менее индусским если бы _consumer был не полем, а локальной переменной.
Этот параметр в библиотеке Confluent.Kafka отвечает, как вы правильно заметили, за тайм-аут операции чтения из топика, и здесь указано просто дефолтное значение, на котором я не заострял внимание.
Этого тайм-аута вообще не должно быть! Вы, фактически, выделили отдельный поток для потребителя — а значит, можете позволить себе ожидание до победного.
Этого тайм-аута вообще не должно быть! Вы, фактически, выделили отдельный поток для потребителя — а значит, можете позволить себе ожидание до победного.
Так и есть, для каждого потребителя я создаю отдельных поток, и этот параметр необходим для того, чтобы определить, как часто мы будем обращаться к Apache Kafka и получать оттуда сообщения. Кроме того, такой вариант с потоками для каждого топика мне показался наиболее удачным в плане понимания того, как взаимодействовать с Apache Kafka
Да, while (!canceled) { }
— тоже чудесный цикл...
Вы вообще осознаете, что с таким подходом ваши распределенные микросервисные приложения будут всухую проигрывать классическим монолитам по производительности и требованиям к железу? Добавить еще пару десятков микросервисов, как того учит микросервисная архитектура — и вот уже для их запуска требуется кластер из десятка серверов даже при отсутствии нагрузки?
Два вопроса:
Сколько времени реально уходит на цикл запрос-ответ?
Почему kafka женского рода?
Почему kafka женского рода?
— А вы Кафку любите?
— Да, особенно грефневую!
:)
Одно но: cейчас если мы, например, запустим 5 штук NameService, то в MainApp придет 5 имен, а не одно. Это из-за настроек Apache Kafka, прописанных в файле server.properties. В рамках туториала я этого намеренно не касаюсь, чтобы не усложнять материал.
У вас получается по коду, что делая запрос из клиента MainApp, мы сразу выходим из цикла и пишем ответ уже в другом потоке. При этом получается, что будь это веб-приложение, нам бы пришлось поддерживать на стороне бэкенда свою очередь тоже (т.е. пользователь нажимает на кнопку "Сгенерировать имя", мы сопоставляем этому запросу некий идентификатор, сохраняем его, а потом клиент через какое-то время достает ответ отдельным запросом к бэкенду, имея на руках этот идентификатор). Это, как известно, имеет свои последствия (в частности, возникает вопрос о масштабируемости такого решения). Но я всегда считал, что Message Broker должен позволять это сделать без всяких самописных очередей. Разве протокол Kafka не позволяет сделать нечто подобное?
var response = await msgBus.SendMessage(topic: gTopicNameCmd, message: gMessage);
Console.WriteLine(response);
Здесь подразумевается, что запрос и ответ имеют одинаковый topic gTopicNameCmd. Следовательно, Kafka сама разруливает кому какой ответ отдать, а мы ждем от нее ответ и никуда не уходим, пока он не прилетит.
var response = await msgBus.SendMessage(topic: gTopicNameCmd, message: gMessage);
Console.WriteLine(response);
Я очень хотел реализовать также, когда это все задумывал, потому что это было бы намного проще и понятнее, но не нашел подходящих вариантов реализации клиента для общения с Kafka.
Да, Вы меня не поняли немного. Переформулирую: как вы отдадите случайное имя JS-скрипту, который обратился к вашему бэкенду? Вы либо будете писать в тело каждого сообщения для Kafka какой-то идентификатор, а потом этот идентификатор отдавать клиенту (то что я описал выше). Вариант второй: Вы поставите в паузу текущий поток общения клиента с сервером и будете ждать, пока другой поток не запишет результат в Ваше хранилище (при этом потребность в идентификаторах никуда не уходит, потому что вам нужно как-то различать одинаковые топики). В первом варианте клиент может оставить в покое сокет и сделать еще несколько запросов, дожидаясь результата на своей стороне. Во втором варианте сокет между браузером и приложением же будет висеть, поток для обработки тоже будет в повисшем состоянии, и вскоре сервер поднимет лапки (если мы говорим о более менее нагруженном сайте). Прибавьте к этому горизонтальное расширение бэкенд-серверов и распределение нагрузки между ними, и вы получите какую-то кашу.
Переформулирую: как вы отдадите случайное имя JS-скрипту, который обратился к вашему бэкенду?В таком случае у нас есть 2 варианта того, как его отдать(оба кстати хорошо описаны здесь): либо мы делаем запросы к каждому из наших NameService, у которых должно быть WebApi, либо обращаемся к некому GatewayApi, который аккумулирует информацию со всех NameService.
Запросы JS скрипта происходят синхронно по HTTP.
Обмен сообщениями между сервисами NameService и GatewayApi происходит асинхронно с помощью Kafka, что также позволяет не привязываться к конкретным сущностям NameService за счет того, что все они подписываются/отправляют сообщения по заранее установленной схеме.
Когда мы делаем синхронный запрос, например, к GatewayApi, то GatewayApi посылает сообщение в Kafka о том, что нужно новое имя. Это сообщение видят все (или некоторые, в зависимости от кол-ва разделов в топике, но не суть) сущности NameService, для которых оно предназначалось, и генерируют имя, которое в конце концов получает GatewayApi. Можно безболезненно ввести сколько угодно таких сущностей NameService для распределения нагрузки, и это будет легко сделать.
Вас спрашивают о деталях реализации GatewayApi — а вы зачем-то начинаете про масштабируемость NameService рассказывать. Вы вообще читать умеете?!
Вы так и не объяснили, каким образом информация дойдет до точки назначения — скрипта на фронтенде.
GatewayApi получил HTTP-запрос и отправил сообщение в кафку. Через какое-то время из кафки пришел ответ. Как связать запрос от клиента с ответом-то?
Видимо, написал недостаточно ясно: мы делаем синхронный HTTP запрос к GatewayApi и ждем, пока он нам не отдаст имя, которое получит из Kafka.
Видимо, написал недостаточно ясно: мы делаем синхронный HTTP запрос к GatewayApi и ждем, пока он нам не отдаст имя, которое получит из Kafka.
Как он поймет, кому из клиентов его отдавать?
Но является ли ожидание ответа в отдельном потоке преимуществом?
Но в статье о этих преимуществах почти ничего нет(разве что ссылка на них на офф. сайте), и я сделал это намеренно, чтобы не усложнять, потому что сразу все охватить не получится. Вот и написал вводную, чтобы читатель, если захочет, пойдет изучать Apache Kafka уже дальше самостоятельно.
Создаем микросервисную архитектуру вместе с Apache Kafka и .NET Core 2.0