Pull to refresh

Comments 24

> Переходить на скалу пока не видем никакого смысла.

Разумеется, ничего не имею против использования Spark + Java8. Но если попробуете переписать на скалу хотя бы пример кода из статьи, будет сложно не заметить, насколько более чистым и лаконичным он станет.

Наверно, с точки зрения перфекциониста, уже знакомого со скалой всё так и есть. Но с точки зрения команды джавистов, и заказчика (которому в случае чего надо будет искать не скала а джава разработчиков) всё выглядит иначе :-)
ИМХО, вряд ли в Спарк-джобах придется использовать самые продвинутые фишки скалы, поэтому любой толковый джавист не только без особого труда разберется в этом коде, но и сможет его писать. Поэтому скала в этом случае будет просто удобной «better Java».
Поэтому присоединяюсь к совету использовать скалу :)
немного переслащена статья, как и большинство статей про спарк =)

уж не обессудьте, но:
  1. Т.е все промежуточные данные между Map и Reduce фазами, сбрасывались в HDFS
    Стоит сказать что дисковый ввод/вывод всё таки используется (на этапе shuffle)

    что такое shuffle если не промежуточный этап между Map и Reduce фазами? хотя в некоторых случаях действительно можно его избежать, если в процессе операции map не менялось партицирование
  2. МapReduce запускает для каждой задачи новую JVM, со всеми вытекающими последствиями
    mapred.job.reuse.jvm.num.tasks существует почти с первых версий hadoop, для исключения лишних созданий jvm
  3. Ну и наконец Spark оперирует RDD абстракциями (Resilient Distributed Dataset), которые более универсальны чем MapReduce. которые очень сложно оптимизировать когда вы работаете с ними так как спарк (агрегирующая операция ждет выполнения) и в результате почти все текущие оптимизации вливаются в sparksql, по нему строим дерево и генерируем код
  4. почему-то все в разговорах про спарк опускают запуск hadoop на tez. хотя тут примерно понятно, та же клоудера его демонстративно игнорирует, так как это продукт основного конкурента


отдельно по поводу первого пункта: spark полностью совпадает с pull моделью используемой в классической реализации хадупа, в тоже время flink уже перешел на push, да и хадуп уже умеет это делать
просто Just for FUN
pl.postech.ac.kr/~eastcirclek
www.slideshare.net/FlinkForward/dongwon-kim-a-comparative-performance-evaluation-of-flink

тот же проект tungsten из спарка по выносы в offheap и кодогенерация почти целиком взят с идеи flink ( flink.apache.org ). Причем у флинка стриминг сделан более качественно, плюс совместим с гугловым Dataflow
www.slideshare.net/FlinkForward/william-vambenepe-google-cloud-dataflow-and-flink-stream-processing-by-default

причем на более высоком уровне, в отличии от спакра, у которого стриминг это микробатчи. поэтому честная поддержка dataflow сейчас в обсуждении как же сделать это на спарке

отдельно заслуживает упоминание совместимости, переезд ваших существующих задач с минимальными модификациями:
1) old hadoop map reduce — ci.apache.org/projects/flink/flink-docs-release-0.10/apis/hadoop_compatibility.html
2) storm (с учетом что твитер отказалась от дальнейшего развития) — ci.apache.org/projects/flink/flink-docs-release-0.10/apis/storm_compatibility.html

я не имею ничего против спарка =) сами используем, но читать в каждой статье хвалебные отзывы про техносовершенство спарка и убогости хадупа тоже напрягает.
После Вашей критики хочется задать вопрос: так в чем же все-таки преимущества спарка по сравнению с хадупом?
во первых более высокоуровневый api, который хорошо ложится на скалу и неплохо на java 8. совсем первые версии spark в своих rdd по api были совместимы с коллекциями =) такой вот подход «пишем для кластера в манере написания для одной машинки»

тот кто пробовал вручную писать join + aggregation какой-нибудь на map-reduce со мной согласятся, что это немного ад. именно поэтому и появились такие проекты как hive (sql язык манипуляции) и pig (парадигма data stream и работа над стримами данных), которые и снимали с разработчика весь этот низкоуровневый ад.

второй плюс это смесь как классического подхода по обработке данных (императивны и функциональный за счет лямбд) с декларативным (sparksql) и гибкие переходы между ними

ну и третий наверное один из наиболее главных для математиков: он почти нативно поддерживает пандусовские датафреймы, а значит легко интегрируется с уже существующим кодом.

ну а другие «плюсы»: его активно форсят любители скалы, первый коммент это подтверждает ;) точно также как любители clojure форсят storm в качестве примеров больших систем.

в общем как очередная реализацию парадигмы map-reduce с переменных количеством map и reduce фаз в pipeline он очень неплох, да еще и скрывает многие сложности от пользователей.

с другой стороны возьмем клоудеру с её идеей спарка везде vision.cloudera.com/one-platform

но даже они признают, что у спарка до сих пор по сравнению с хадупом есть проблемы производительности на очень больших кластерах.
immutable state это не панаценя ( Even though Spark is fast, there’s room for improvement in stream processing. Performance will continue to be a focus area across the platform, but in Spark Streaming in particular, there are some obvious changes we can make, in persistent mutable state management and elsewhere, that will deliver some big benefits. ) и тд.

но я сомневаюсь, что тут найдется много людей с кластерами в тысячи машин, а значит и выходит на первое место удобство использования и порог вхождения. а вот тут спарк выигрывает. хотя последнее время очень активно и flink движется в этом направлении, чего стоят только примеры внедрения flink-forward.org/?post_type=session
Спасибо за ваши очень интересные комментарии.

Если я не ошибаюсь, то поддержка в Spark DataFrame появилась сравнительно недавно (то есть с версии 1.4 — databricks.com/blog/2015/08/12/from-pandas-to-apache-sparks-dataframe.html ). Так что я бы не записывал это в «плюс» в историческом контексте.

Так же хотелось бы отметить что «ругать» HDFS и превозносить Spark не стоит, так как судя по заявлениям создателей Spark, одно из решающих решений была поддержка как раз таки HDFS в самых первых версиях, что позволил интегрироваться в экосистему Hadoop достаточно безболезненно.

Так же нужно не забывать что Spark активно продвигается, так же самым O'Reilly, а это не маловажный фактор в популярности продукта (помимо технических характеристик).

python api был уже давно, правда он даже в питоне представлялся в виде rdd, проблем с сериализацией и тд был вагон, но это работало и многие математики это использовали.

сами датафреймы добавили как абстракцию уже почти год назад, а для bigdata год это достаточно большой срок, за это время многие продукты рождаются и успевают умереть в забвении =)

ругают не hdfs, а map-reduce как подход, хотя спарк это тот же map-reduce по сути, а вот тут и начинаются мифы… но всегда стоит отделять тех часть от маркетинга
Привет.

1. shuffle — действительно между фазами Map и Reduce. Но ведь в сложных workflow зачастую несколько map фаз. Плюс там не должно быть репликации, если сброс данных не идет в HDFS
2. Тут соглашусь, есть така оптимизация. Надо более глубоко копать как это сделано в Спарке, но по дефолту интуитивно спарк-задачи запускаются быстрее.
3. Оптимизация — это вообще вещь не простая в том числе для олдскульных MR задач )) Не всегда целесообразно тратить время разработчиков на оптимизацию, особенно если приложение в целом устраивает по производительности.

Ну и на конец восторженность Spark-ом не вызвана какими то маркетинговыми уловками. Я например не верю в эту картинку :-)

image

Но объективно я вижу, что те же самые задачи стали работать быстрее. И API стал более богатым. Жить стало легче :-)

1. пачка map фаз в spark'е, точно так же схлопываются в один map в MR задаче вручную, или с использование tez автоматически он пайплайнит задачи. хотя согласен что по красоте спарк выигрывает. про репликацию спорно, когда у вас все машинки по pull модели ломятся на один сервер хорошего мало. map.groupBy.map.groupBy — 2 фазы shuffle и изнасилованный диск, так что не стоит думать что диск так уж редко используется.

2. вы tez проверяли или судите по классическому mr? если классический, то это архитектурная особенность работы jobtracker'а, если уж тогда и сравнивать, то spark-submit на yarn

3. я говорю о глобальных изменениях, rdd по сути только в offheap вынесли, все вещи в dataframe интересные попадают. текущую схему работы без переделки всей архитектуры не поменять, а вот с dataframe еще что-то можно сделать. к тому же я говорю про оптимизацию платформы, не отдельных ваших задач, зачастую улучшение платформы сразу влияние на все задачи оказывает.

та картинка является правдой, НО для итеративной задачи, когда все данные в памяти, в итоге сделали бы они в 2 раза больше итераций еще больший бы разрыв получили. насколько помню там был pagerank, берем графовый giraph или flink delta iteration и получаем точно такие же графики, но на вершине уже спарк. и это тоже будет правдой. бенчмарки они такие…

> Но объективно я вижу, что те же самые задачи стали работать быстрее.

переписанные задачи, перевели бы вы их на impala стало бы еще быстрее местами. переписали на hive+tez другие результаты.
а использовали бы тот же flink и даже переписывать бы не пришлось, 2-3 строки изменить в коде только.

я не против спарка, как инструмент он неплох, как уже говорилось, мы сами его используем. я против того когда начинают хвалить инструмент ссылаясь на мифы, я их перечислил:
1) спарк все в памяти, поэтому такой быстрый, а вот хадуп все на диске
2) спарк реиспользует jvm, а вот хадуп подымает каждый раз новые инстансы
3) rdd универсальней map-reduce (на самом деле rdd это способ представления данных, а map-reduce парадигма вычисления, в спарке map-reduce и использует с небольшими модификациями)

если уже и приводить плюсы спарка, то это:
1) более удобный и высокоуровневый api, следовательно ниже порог вхождения разработчиков и более быстрое написание кода
2) местами более высокая скорость работы, но далеко не все объемы он может переварить, на которых справляется хадуп
3) более удобная обертка для оркестрации задачами, так как все можно делать в основной программе

p.s. по поводу как реиспользование в спарке работает: в спарке подымается воркер и он все задачи обслуживает, в хадупе от этого по умолчанию отказались, так как потенциально может вызвать утечку ресурсов в пределах долгой и нагруженной MR таски, в спарке забили, типо будем считать что утечек нету между отдельными тасками. поэтому в хадупе не проблема в пределах все mr job'ы сказать переиспользовать jvm, а вот в спарке запретить данное поведение уже нельзя.
Спасибо за комментарии. Расскажите, пожалуйста, что такое pull и push модели в контексте всех этих фреймворков. Или ссылку, если найдете.
можно поискать отдельно как работает map-reduce в хадупе и спарке:
1) фаза map и пишет в память
2) память периодически сбрасывает результаты на локальный диск
3) reduce фаза запрашивает ноды на которых проходили map фазы на получение данных

то есть редьюсеры сами выступают в роли инициатора получения данных, pull модель более устойчивая к отказам, если у нас reduce какой и отвалился, то все входные данные остаются и пересчитать не проблема

flink & mr 2.0 push
1) map отрабатывает и пишет в память
2) подымаются reduce элементы
3) по p2p топологии память стразу транслируется на удаленные машины
4) при необходимости reduce сбрасывает данные на диск после частичного merge, если данные в память не влазят

в push есть проблема, что если у нас reduce упал, то нам нужно еще и map фазу повторить для получения кусочка данных для данного блока, поэтому даже в push остается возможность настройки сброса копии на диск, чтобы можно было потом просто её забрать без пересчета.

Но вообще у нас снижается latency за счет того, что reducer сразу получает входные данные для свертки, без необходимости делать запрос. В классической реализации для снижения latency reducer'ы обычно стартуют чуть раньше чем завершатся все map task'и. этим параметром управляет mapreduce.job.reduce.slowstart.completedmap. 0.1 означает, что как только закончат работу 10% всех мап тасков начнут запускаться редьюсеры и вытягивать себе копии данных, то есть к моменту отрабатывания последнего map есть вероятность, что только его и будут ждать редьюсеры, чтобы скопировать последний кусочек и начать свертку.

из ссылок
flink-forward.org/?session=a-comparative-performance-evaluation-of-flink

одна из наиболее интересных статей с разбором на какой стадии больше диск насилуется видно в
eastcirclek.blogspot.com.by/2015/06/terasort-for-spark-and-flink-with-range.html

можно еще глянуть на это все в контексте стрим процессинга, смысл там остается таким же
gdfm.me/2013/01/02/distributed-stream-processing-showdown-s4-vs-storm
Из коробки спарк не работает с S3, не поддерживаются хитрые типы данных Postgres и есть еще кое-какие нюансы. Для того чтобы это забороть приходится делать кастомные сборки спарка с нужными патчами.
Подробности можно почитать тут — tech.grammarly.com/blog/posts/Petabyte-Scale-Text-Processing-with-Spark.html Даже не знаю сколько бы я времени потратил на то чтобы сделать его работоспособным без этой статьи.
Вы Амазоновский спарк используете?
Если «амазоновский спарк» это EMR, то нет, мы его не используем. Мы запускаем спарк на ЕС2 инстансах, а сам спарк — собранный вручную (точнее наш форк собранный CI сервисом) с нужными зависимостями (обновленным хадупом, jets3s, включенными постгрес либами и патчами на отсутствующие постгрес типы).
> Из коробки спарк не работает с S3

Хм, вообще в обычной сборке Спарка это два ключа в sparkContext.hadoopConfiguration и вызов sequenceFile с соответствующей схемой в пути (s3n / s3a / s3). Вроде бы мне больше ничего не потребовалось для работы S3.
Привет, прочитайте статью ребят из граммарли. Просто так это не работает. По крайней мере не заработало ни у меня ни у них.
У меня работало через s3n, как написано в комменте выше, после подключения библиотеки hadoop-aws. Ни кастомных сборок, ни патчей не было нужно.
Не буду спорить. У меня не работало, у ребят из граммарли не работало. Если почитать тикеты и статью то видно что там была регрессия, оно работало, потом отвалилось. Конкретно у нас не работал изкоробочный спарк 1.5.0 (версию хадупа в нем не помню), собранный 1.5.1 с патчем на jets3s и hadoop 2.7.1 работает отлично…
Хорошо, раз зашел разговор про версии и регрессии, то уточню: у нас работало на ветке 1.4.x.
Всё дело в новом архитектурном подходе, который значительно выигрывает в производительности у классических MR приложений
Основные преимущества Spark:
  • Облегчение процесса разработки – меньше кода, код проще, интерактивный интерпретатор для Scala, Python, R
  • Удобное кэширование данных – ускоряет итеративные алгоритмы
  • Интеграция в одном проекте как пакетное обработки, так и потоковой (micro-batch)
  • Большое community – более 700 контрибьюторов

все промежуточные данные между Map и Reduce фазами, сбрасывались в HDFS
Промежуточные данные MapReduce кладутся на локальные диски серверов, выполняющих mapper'ы, в единственной копии. HDFS не используется. Данные перед reduce-фазой также собираются на локальных дисках без использования HDFS. Подробности можете посмотреть в моей статье тут

Теперь промежуточные данные сериализуются и хранятся в оперативной памяти, а обмен данными между узлами происходит напрямую, через сеть, без лишних абстракций. Стоит сказать что дисковый ввод/вывод всё таки используется (на этапе shuffle). Но его интенсивность значительно меньше.
Все промежуточные данные в Spark во время shuffle сбрасываются на диск точно так же, как в MapReduce. Если данные обрабатываются по одному алгоритму, то интенсивность ввода-вывода будет одинаковой

Ну и наконец Spark оперирует RDD абстракциями (Resilient Distributed Dataset), которые более универсальны чем MapReduce. Хотя для справедливости надо сказать, что есть Cascading. Это обёртка над MR, призванная добавить гибкости.
Также вы забыли упомянуть, что есть и другие обертки над MR, вроде Pig и Hive, которые на текущий момент популярнее Spark

Да, стоит сказать, что Spark API доступно для Scala, Java и Python
а также для R

Приведенный вами пример кода является скорее контрпримером. В презентации по архитектуре Spark я привожу для контраста пример классического «word count» на MapReduce и PySpark, чтобы показать, насколько громоздско и неудобно писать сразу в MR. Вот реализация примера из вашей статьи на PySpark:
data = ["user_id:0000, habrahabr.ru",
        "user_id:0001, habrahabr.ru",
        "user_id:0002, habrahabr.ru",
        "user_id:0000, abc.ru",
        "user_id:0000, yxz.ru",
        "user_id:0002, qwe.ru",
        "user_id:0002, zxc.ru",
        "user_id:0001, qwe.ru"]
rdd = sc.parallelize(data)
counts = rdd.map(lambda x: x.split(',')).map(lambda x: (x[1],1))
tops = counts.reduceByKey(lambda x,y: x+y).takeOrdered(10, key = lambda x: -x[1])
print tops

Согласитесь, в разы более читабельно
Спасибо за питоновский пример. Как раз люди интересовались. Тут наверно был бы интересен целостный пример, с настройкой SparkContext, и с запуском на кластере (spark-submit ?)
Плюс примера с Python в том, что он интерактивный, то есть вы просто поднимаете процесс PySpark и контекст уже создан для вас. Если вас интересует вариант с поднятием кластера, можно сделать, допустим, так:
pyspark --master yarn-client --num-executors 6 --executor-memory 4g --executor-cores 12

Запуск через spark-submit хорошо описан в официальной документации, нужно просто вместо jar-файла передать py-скрипт
Вообще, Apache Flink за последние полгода бурно растёт (см. блог его разработчиков и вышеупомянутые материалы с конференции Flink Forward). По-видимому, скоро он будет вполне способен тягаться с Apache Spark.
Sign up to leave a comment.

Articles