Как стать автором
Обновить
49
-3
Александр Токарев @alextokarev

Big Data Solutions Architect

Отправить сообщение

В целом да, сейчас flink используется больше как труба для перекачки данных из кафки в s3, и из фичей флинка пока что используются только оконные функции. Но в будущем мы планируем развивать этот модуль для выполнения более сложных аналитических расчетов на потоках данных.

Ведём свой дата каталог

NiFi мы уже на текущем этапе исключили, заменив его Flink-ом

К сожалению, этот доклад пришлось отменить в силу ряда обстоятельств личного характера.
Можете поподробнее пояснить, в чем заключается «велосипедность» и как бы на ваш взгляд выглядело невелосипедное решение?
Spark Streaming мы рассматривали, но это не совсем наш случай. У нас уже есть большой массив данных, который хранится на HDFS и который мы должны отфильтровать по определенному критерию, проклассифицировать и сохранить результат. Поэтому в нашем случае мы остановились на батчевом варианте.

А в случае непрерывного входящего потока данных Spark Streaming и интеграция через Kafka с сервисом классификации как раз было бы идеальным решением.
Но с другой стороны, сразу куча ограничений… Вот вы, к примеру, при ошибке сервиса сразу позволяете себе завершить работу. Т.е. http-запрос, ответ 404 -;), или таймаут, что еще хуже — и все, обработка очередного петабайта данных завершилась неудачей. Когда и кто выполнит повторную попытку?

Это уже зависит от конкретной задачи. В нашем случае мы применяем подход fail-fast во избежание появления некорректных данных. На то есть несколько причин:
  1. hadoop-кластер и сервис классификации находятся в одном периметре, и если между ними нарушена коммуникация, то необходимо разобраться по какой причине, и прекратить дальнейшую обработку. При падении джобы срабатывает алерт в мониторинге, который позволяет оперативно отреагировать.
  2. 404 ошибка в нашем случае является нестандартной ситуацией, так как всегда используется один и тот же URI, поэтому в этом случае также необходимо прекратить обработку и разобраться с причиной. Это же справедливо и для 5хх ошибок.
  3. Reactive Streams как раз позволяют дозировать нагрузку на внешний сервис и избегать таймаутов вызванных большим количеством входящих запросов. Кроме того, akka-http умеет автоматически повторять запросы, которые завершились таймаутом, но не более определенного количества раз.
  4. Механизмы запуска повторных попыток тоже могут различаться в зависимости от задачи. Это может быть как ручной перезапуск, так и автоматический. Например, джоба может запускаться с определенным периодом, и после успешной обработки сохранять информацию об обработанных частях. В случае падения у нас просто при следующем запуске будут заново запущены те же порции данных на повторную обработку.

Наши же данные таковы, что лучше иметь половину обработанными, чем не иметь ничего. Это не всегда верно (как впрочем и обратное). Мы поэтому не можем себе позволить остановку после первой ошибки, и пытаемся продолжать обработку (тем более, что бывают как «легкие» ошибки, не означающие, что сервис не работоспособен, так и временная недоступность самого сервиса).

Ваша задача несколько отличается от нашей, так как мы отправляем на классификацию уже предварительно очищенные и подготовленные данные. Поэтому и стратегия работы в случае возникновения нештатных ситуаций у вас тоже будет другая. На мой взгляд, у вас должно быть четкое разделение между ошибками, вызванными непосредственно данными, и ошибками, причиной которых является инфраструктура. В первом случае вам нужно продолжать обработку и как-то обрабатывать такие результаты, во втором разумнее будет разобраться с проблемами в инфраструктуре и при необходимости запустить обработку заново.
Я бы сказал, что тут намного важнее описание самого интерфейса — а для этого нужен не HTTP, а скорее что-то типа IDL/protobuf/avro/parquet и описание сообщений, которыми вы обмениваетесь. Да даже тот же SOAP, может быть. А HTTP тут легко заменяется на kafka, к примеру.

В случае kafka в нашем случае есть один нюанс: так как у нас батчевая обработка, необходимо каким-то образом сигнализировать о границе очередного батча. Так как топики в кафке имеют определенное количество независимых партиций, то сделать это достаточно непросто.
Вообще говоря, для этого и существует Yarn. И я бы не рассматривал такие сервисы модели как нечто, не являющееся частью кластера, а наоборот, попросил бы ресурсы в виде памяти и ядер у Yarn, который как раз хорошо знает, какие узлы меньше нагружены.

Согласен, но на мой взгляд для этого лучше подходят DC/OS или Kubernetes, тем более Spark умеет работать поверх Mesos, а в последней версии научился и поверх Kubernetes. В нашем случае мы используем YARN потому что изначально все было на нем построено, но так как сервис классификации мы заворачиваем в docker-контейнер, поэтому нам проще его деплоить отдельно на специально выделенные под него машины.
Согласен. Если можно использовать jpmml — то так и нужно поступить. Но в нашем случае была немного другая история:

  1. Мы использовали для тренировки модели библиотеку fastText, в которой нет опции сохранения в PMML, а только в бинарный формат.
  2. Мы тренировали модель на достаточно большой обучающей выборке, поэтому сама модель получилась порядка 8ГБ в бинарном виде. Мы посчитали, что это будет достаточно накладно хранить ее на каждой машине кластера, и, кроме того, при работе ее приходилось бы полностью загружать в оперативную память, а это значит что на каждой машине мы теряли бы не менее 8 ГБ ОЗУ.

Поэтому было принято решение вынести модель на отдельную машину. Это дало следующие преимущества:

  1. Команда Data Scientist-ов получила возможность использовать абсолютно любые средства для своей работы, так как интеграция идет через стандартный HTTP-протокол.
  2. Если этот сервис становится узким местом, мы легко можем его горизонтально масштабировать просто добавляя дополнительные машины и равномерно распределяя нагрузку между ними.
При расчете ядер, памяти и количества блоков я хотел наглядно показать, какой максимальный уровень параллелизма нам может позволить YARN на одной машине. Но так, как нам приходится взаимодействовать с внешними сервисами, то возникает простой основных потоков и этого уровня становится недостаточно (смотри рисунки 5 и 6). А описание работы Spark я привел для того, чтобы показать, как запускается задача на машинах кластера на уровне JVM процессов. Это нам пригодится в следующей части.
Спасибо! Вообще, это статья не столько про Data Science и его коммерческое применение, сколько про Software Engineering, поэтому основной акцент я сделал в первую очередь на технологиях, а не практических кейсах. Теоретически, в этой схеме может быть использована любая модель.

Про женские блоги писал мой коллега art_pro, который призывается сюда чтобы дать ответы на остальные вопросы)
Классификация происходит с использованием внешнего сервиса, который запущен на отдельной машине. О технической реализации взаимодействия спарка с внешними сервисами я более подробно расскажу в следующей части.
Ну, Scala вместе с Groovy были пионерами в области альтернативных языков для JVM, это уже потом пошло-поехало) Кстати, что касается Scala — сейчас очень активно разрабатывается компилятор в нативный код https://github.com/scala-native/scala-native. Под JVM в своё время было написано огромное количество отличных библиотек, которые грех не переиспользовать.
Или например List[String Either Int Either Long Either Date Either URL Either SomethingElse]
Ок, спасибо за уточнение, поправил.
Почему запретили? Вот, вполне себе валидный код в 2.12:
case class User(name: String)
class SuperUser extends User("Super User")

println(new SuperUser().name)

Меченое — что вы имеете ввиду?
Именно. Используя терминологию алгебраических типов данных (ADT), пересечение типов — это Product(тип-произведение), то есть он обладает свойствами обоих исходных типов, а объединение типов — это CoProduct(тип-сумма), тип, обладающий свойствами одного из исходных типов.
Это не полиморфизм, это перегрузка методов. Объединение типов можно например использовать в качестве параметра для других типов:
trait Container[A] {
  def put(a: A): Unit
  def count: Int
}

class StringAndIntContainer extends Container[String | Int]

Здесь базовая реализация не предполагает перегрузки метода put, и c помощью объединения типов мы можем создать такой контейнер, который будет принимать на вход String и Int, но компилятор будет запрещать вызывать метод put для других типов.
1

Информация

В рейтинге
Не участвует
Откуда
Москва, Москва и Московская обл., Россия
Работает в
Дата рождения
Зарегистрирован
Активность