Спасибо за серию, очень глубокий, продуманный материал. По нему бы прямо митап провести, чтобы все вопросы по ходу задавать, обсудить среди коллег. Всё очень полно, подробно и по делу.
Спасибо! Я подумаю об этом)

Достаточно нишево внутри Big Data систем и велосипедно. IMHO смешать акторы с hadoop, spark и тихо использовать велосипед у себя в компании, но выносить велосипедостроительство в массы все же не стоит. Хотя, возможно, в Ланит просто платят премии за количество символов на Хабре. Плюсанул за литературные таланты!

Можете поподробнее пояснить, в чем заключается «велосипедность» и как бы на ваш взгляд выглядело невелосипедное решение?
Буду рассказывать про это решение на Big Data Moscow: bigdatadays.ru/ru/alexander-tokarev
А не кажется ли вам, что частично это… самообман, если угодно? Не, я понимаю, идея в какой-то степени напрашивается, я сам про такое думал много раз (только у меня вместо ML сервиса на сегодня сервис геокодирования, и очень хочется его развернуть в виде Yarn приложения, с репликацией и восстановлением в случае падения, и вынести из spark задачи).

Но с другой стороны, сразу куча ограничений… Вот вы, к примеру, при ошибке сервиса сразу позволяете себе завершить работу. Т.е. http-запрос, ответ 404 -;), или таймаут, что еще хуже — и все, обработка очередного петабайта данных завершилась неудачей. Когда и кто выполнит повторную попытку?

Наши же данные таковы, что лучше иметь половину обработанными, чем не иметь ничего. Это не всегда верно (как впрочем и обратное). Мы поэтому не можем себе позволить остановку после первой ошибки, и пытаемся продолжать обработку (тем более, что бывают как «легкие» ошибки, не означающие, что сервис не работоспособен, так и временная недоступность самого сервиса).

В общем, тут есть и другие минусы, и их наверное стоило бы огласить.

>используется HTTP протокол, являющийся стандартным средством коммуникации между приложениями. Благодаря этому реализация модели не привязана к ее интерфейсу.

Я бы сказал, что тут намного важнее описание самого интерфейса — а для этого нужен не HTTP, а скорее что-то типа IDL/protobuf/avro/parquet и описание сообщений, которыми вы обмениваетесь. Да даже тот же SOAP, может быть. А HTTP тут легко заменяется на kafka, к примеру.

>В зависимости от того, что является самой нагруженной частью, мы можем добавить машин либо в Hadoop кластер, либо для запуска дополнительных экземпляров модели.

Вообще говоря, для этого и существует Yarn. И я бы не рассматривал такие сервисы модели как нечто, не являющееся частью кластера, а наоборот, попросил бы ресурсы в виде памяти и ядер у Yarn, который как раз хорошо знает, какие узлы меньше нагружены.
Но с другой стороны, сразу куча ограничений… Вот вы, к примеру, при ошибке сервиса сразу позволяете себе завершить работу. Т.е. http-запрос, ответ 404 -;), или таймаут, что еще хуже — и все, обработка очередного петабайта данных завершилась неудачей. Когда и кто выполнит повторную попытку?

Это уже зависит от конкретной задачи. В нашем случае мы применяем подход fail-fast во избежание появления некорректных данных. На то есть несколько причин:
  1. hadoop-кластер и сервис классификации находятся в одном периметре, и если между ними нарушена коммуникация, то необходимо разобраться по какой причине, и прекратить дальнейшую обработку. При падении джобы срабатывает алерт в мониторинге, который позволяет оперативно отреагировать.
  2. 404 ошибка в нашем случае является нестандартной ситуацией, так как всегда используется один и тот же URI, поэтому в этом случае также необходимо прекратить обработку и разобраться с причиной. Это же справедливо и для 5хх ошибок.
  3. Reactive Streams как раз позволяют дозировать нагрузку на внешний сервис и избегать таймаутов вызванных большим количеством входящих запросов. Кроме того, akka-http умеет автоматически повторять запросы, которые завершились таймаутом, но не более определенного количества раз.
  4. Механизмы запуска повторных попыток тоже могут различаться в зависимости от задачи. Это может быть как ручной перезапуск, так и автоматический. Например, джоба может запускаться с определенным периодом, и после успешной обработки сохранять информацию об обработанных частях. В случае падения у нас просто при следующем запуске будут заново запущены те же порции данных на повторную обработку.

Наши же данные таковы, что лучше иметь половину обработанными, чем не иметь ничего. Это не всегда верно (как впрочем и обратное). Мы поэтому не можем себе позволить остановку после первой ошибки, и пытаемся продолжать обработку (тем более, что бывают как «легкие» ошибки, не означающие, что сервис не работоспособен, так и временная недоступность самого сервиса).

Ваша задача несколько отличается от нашей, так как мы отправляем на классификацию уже предварительно очищенные и подготовленные данные. Поэтому и стратегия работы в случае возникновения нештатных ситуаций у вас тоже будет другая. На мой взгляд, у вас должно быть четкое разделение между ошибками, вызванными непосредственно данными, и ошибками, причиной которых является инфраструктура. В первом случае вам нужно продолжать обработку и как-то обрабатывать такие результаты, во втором разумнее будет разобраться с проблемами в инфраструктуре и при необходимости запустить обработку заново.
Я бы сказал, что тут намного важнее описание самого интерфейса — а для этого нужен не HTTP, а скорее что-то типа IDL/protobuf/avro/parquet и описание сообщений, которыми вы обмениваетесь. Да даже тот же SOAP, может быть. А HTTP тут легко заменяется на kafka, к примеру.

В случае kafka в нашем случае есть один нюанс: так как у нас батчевая обработка, необходимо каким-то образом сигнализировать о границе очередного батча. Так как топики в кафке имеют определенное количество независимых партиций, то сделать это достаточно непросто.
Вообще говоря, для этого и существует Yarn. И я бы не рассматривал такие сервисы модели как нечто, не являющееся частью кластера, а наоборот, попросил бы ресурсы в виде памяти и ядер у Yarn, который как раз хорошо знает, какие узлы меньше нагружены.

Согласен, но на мой взгляд для этого лучше подходят DC/OS или Kubernetes, тем более Spark умеет работать поверх Mesos, а в последней версии научился и поверх Kubernetes. В нашем случае мы используем YARN потому что изначально все было на нем построено, но так как сервис классификации мы заворачиваем в docker-контейнер, поэтому нам проще его деплоить отдельно на специально выделенные под него машины.
Да, от задачи зависит многое, если не все. В частности, наш http-сервис — не под нашим управлением, и мы даже не знаем, какую нагрузку он точно может выдержать, как и не знаем точно, какие ошибки являются исправляемыми. Например, не далее как вчера используемый сервис отвалился на уровне DNS, и для примерно 1000 строк данных я получил unknown host… А потом оно восстановилось, и пошло обрабатывать дальше.

Что касается Yarn, то я имел в виду, что вы фактически заводите два кластера — один для спарка, а второй для сервисов. Возможно, что размещение обоих видов задач на одном кластере (это может быть и не Yarn) позволит более точно распределять нагрузку.

И еще, насколько я помню, Yarn уже научился запускать тот же docker (правда, это Hadoop 3, и скажем нам это не грозит в ближайшее время).
В случае kafka в нашем случае есть один нюанс: так как у нас батчевая обработка, необходимо каким-то образом сигнализировать о границе очередного батча. Так как топики в кафке имеют определенное количество независимых партиций, то сделать это достаточно непросто.

Возможно я что-то пропустил, но Вы не рассматривали использование Spark Streaming?
Он изначально использовал Akka, потом переехал на Netty.
Как мне кажется может подойти для Вас:
  • отлично работает с Kafka, но может использовать различные источники данных(Flume, Kinesis, TCP sockets)
  • данные всегда обрабатываются батчами (задается промежуток времени между ними)
  • есть возможность использовать backpressure(spark.streaming.backpressure.enabled по-умолчанию false)

Как вариант, обработку можно разделить на две подзадачи — одна подготавливает данные и отправляет их классификатору, вторая занимается постобработкой после классификации(Spark Streaming + Kafka). Классификатор, в свою очередь, работает только с Kafka(запись и чтение).
Spark Streaming мы рассматривали, но это не совсем наш случай. У нас уже есть большой массив данных, который хранится на HDFS и который мы должны отфильтровать по определенному критерию, проклассифицировать и сохранить результат. Поэтому в нашем случае мы остановились на батчевом варианте.

А в случае непрерывного входящего потока данных Spark Streaming и интеграция через Kafka с сервисом классификации как раз было бы идеальным решением.
Только полноправные пользователи могут оставлять комментарии.
Войдите, пожалуйста.