Как стать автором
Обновить

Комментарии 25

Мы используем у себя AirFlow для запуска периодических задач. И с ним постоянно какие-то проблемы. Регулярно видим "ядерный гриб". Сабдаги просто не работают. Старые issue закрываются, новые открываются. Давно уже хотим съехать с AirFlow на какой-то другой сервис. Но вот на какой — непонятно. Может кто-то посоветует?

Согласна с seidzi, airflow очень гибкий. Хотя и без недочётов.
У нас гриб появляется в трёх случаях:
— когда в даге что-то не в порядке и тогда мы используем консольную команду list_dags, чтобы выяснить, что именно;
— при маркировке mark success за большое количество дней;
— при большом или неправильном запросе при работе с источниками напрямую, через интерфейс Ad Hoc Query.
Про сабдаги я сказать ничего не могу, мы ими ещё не пользовались.

Что касается альтернативы, из предложенных open source продуктов слышала хорошие отзывы о Luigi. Он попроще, чем airflow, но для некоторых задач подходит лучше.

Смотрели и на Luigi и на Oozie, но они выглядят почти такими же. Не хочется менять шило на мыло.

Можете ещё Azkaban попробовать. Мы сначала его юзали, но потом из-за того, что не хватало функционала перешли на Airflow.

Сейчас набирает силу Prefect. Есть даже такая дока на него
https://docs.prefect.io/guide/examples/airflow_tutorial_dag.html


Но вопрос в том, что


  • нужен ли еще один продукт, если можно заполировать уже существующий?
  • насколько качественно его сделают, т.к. Prefect пока на очень ранней стадии разработки
а мы используем Oozie и хотим перейти с него на Airflow, сейчас тестирую его возможности и по моим наблюдениям он очень гибкий и нужно уметь его «готовить»

Сабдаги отлично работают. Мы их используем для:


  • объединения несколько задач в один с целью улучшения читаемости всего ДАГа.
  • переиспользования кода, когда один набор задач используется в нескольких сабдагов.

У них есть свои минусы в том, что появляется дополнительная сущность сабдага, которая одновременно является задачей в главном ДАГе и также является ДАГом для внутренних тасков. Т.е. при чистке и повторном прогоне например, надо не забывать и то и другое вычищать из базы.

Подскажите, как вы решаете задачу версионирования дагов/тасков? И как сбрасываете кеш дагов?
У нас код дагов лежат в гите, мы их разрабатываем локально и после внесения изменений обновляем на сервере.

Что касается сбрасывания кеша, я не поняла, что имеется ввиду. Если обновление дага по свежему коду, то пользуемся кнопкой «Обновить» в веб-интерфейсе.
Я так понимаю, что речь идёт об устаревших данных. Например когда ДАГ переименовывается, по факту файла уже нет, а в вебсервесе он ещё висит и в базе есть. Мы в таком случае идём в базу Postgres airflow и там запросами вручную чистим устаревшие ДАГи.
где то читал что в новой версии должны для этого сделать кнопку в web интерфейсе, но пока что да другого решения кроме как удалять руками нет, можно так же сделать свою тулзу по чистке старых дагов
А можно ли вообще не делать сложной логики внутри Airflow, а вместо этого делать параллельную загрузку и все проверки внутри Python скрипта?

DAG при этом будет выглядеть так:
START -> export_from_all_shards_and_action -> FINISH

Внутри скрипта запускаем 3-6-10-400-1000 потоков и загружаем данные. Сразу после загрузки проверяем результат и, если были фатальные ошибки, то отправляем на рестарт. Иначе делаем какое-то полезное действие.

Со временем образуется 5-7 стандартных загрузчиков из принципиально разных источников. И можно эту логику в отдельные классы вынести, и просто запускать с разными параметрами.

Логика ETL склонна усложнятся со временем. Есть риск, что через годик во всех хитросплетениях и костыликах уже сложно будет разобраться.
Это вполне себе вариант, но тогда Airflow вырождается в планировщик задач на Питоне.

Основное преимущество использования всех встроенных возможностей Airflow на мой взгляд — это простая локализация ошибки. Можно с одного взгляда понять, что пропал доступ к источнику или задача в соседнем таске никак не может просчитаться и после починки перезапустить только эту часть.

Что же касается костыликов и хитросплетений, во-первых, у нас есть регламент, согласно которому мы называем задачи и используем те или иные сенсоры и операторы, а во-вторых, тяжёлую повторяющуюся логику мы инкапсулируем в самописные операторы.
Было б интересно про этот регламент и прочие know how почитать, позволяющие не утонуть.
На понимание темы на уровне рекомендаций в разных ситуациях я пока не претендую.
А если рассматривать конкретно наш регламент, это достаточно скучное чтиво вроде чтения ГОСТ'ов. Два главных момента — удобство и последовательное их соблюдения — кажутся очевидными. Можно при случае обсудить.
Ну вот за счёт чего достигаются удобство и непотопляемость — вопрос кажется интересный
Это вполне себе вариант, но тогда Airflow вырождается в планировщик задач на Питоне.

Мне кажется, что он именно таким и должен быть. Остальное — от Лукаваго.
Мне очень понравился подход от BlueCore, описанный на medium: https://medium.com/bluecore-engineering/were-all-using-airflow-wrong-and-how-to-fix-it-a56f14cb0753
Самое верное использование airflow (берем все его преимущества и очень аккуратно избегаем недостатки)

Спасибо вам. Только начал изучать Airflow, и не покидало ощущение, что с ним что-то не так. Статья по ссылке прояснила моё смутное недовольство: инструмент по оркестрации берёт на себя слишком много и начинает мешать в одну кучу и оркестрацию, и логику задач.
Вот бы жаргонизмы типа «шарды», «стеджинговый сервер» и т.п. были б вкратце раскрыты через более широко известные словечки при первом упоминании…

У нас недавно возникла небольшая проблема, может подскажите как из неё лучше выйти?


Есть DAG, где находится 120 задач, 60 из них на первом уровне, т.е. зависимостей никаких нет. Выполняются, в основном, довольно быстро (от 5 секунд до 120 секунд). Когда мы делаем backfill на этот DAG на 30 дней, то получается 30*120 = 3600 задач, из которых сразу в очередь встаёт 1800. При этом, если попадаются лёгкие задачи, airflow worker быстро его отрабатывает и потом долго перебирает все задачи из пула. В итоге вместо 15 worker, которые положены по конфигу, успевает отработать только 1-2. Это можно решить дополнительной оберткой bash скриптом, где будут задаваться начальные и конечные даты, но, кажется, должен быть вариант получше.


Под airflow у нас отдельная машина (24 CPU, 64Gb памяти), работает с помощью LocalExecutor. Celery пока не прикуритили. Конфиги airflow.cfg у нас следующие:


[core]
parallelism = 16
dag_concurrency = 15
max_active_runs_per_dag = 16


[scheduler]
max_threads = 10

У нас одно из слабых мест airflow — это шедулер. Время от времени он не справляется с несовершенством некоторых дагов и падает. Поэтому у нас на Кроне стоит проверка его состояния и перезапуск при необходимости (с несовершенствами мы тоже работаем, конечно :) ). Если бы я кого и подозревала в ситуации, которая у вас сложилась, так это его. Может в его логах можно что-нибудь раскопать?

Ещё я бы посмотрела на параметры вроде этого:
# The scheduler constantly tries to trigger new tasks (look at the
# scheduler section in the docs for more information). This defines
# how often the scheduler should run (in seconds).
scheduler_heartbeat_sec = 5

Мы, кстати, довольно много внимания уделяем pool'ам, в которых запускаются задачи и priority_weight, которые помогают ранжировать таски в очереди по приоритетам. Это не совсем в тему данной проблемы, но, например, выделение тяжёлых тасков в отдельный pool с ограничением максимального количества одновременно-запущенных задач может в будущем уберечь от пиков загрузки сервера.

Варианты решения вашей проблемы, которые мне пришли в голову:
* Нужно ли вам, чтобы все дни стартовали одновременно? Если нет, можно выставить параметр дага depends_on_past в True, тогда таски последующих дней не будут становиться в очередь, пока эти же таски в предыдущем дне не выполнятся.
* Параметризация дагов. У нас есть несколько проектов, по которым данные на источнике меняются задним числом. Иногда нам нужно забирать данные за довольно большой период времени. Тогда мы вешаем переключение логики на параметр airflow: обычно это или флаг «пересчитать всю историю» или дата, начиная с которой нужно забрать все данные. И в даге зашиваемся на этот параметр. Но это как раз вариант с баш-скриптом, наверное.

Спасибо за ответ.
У нас шедулер работает через systemd, и он вроде сам следит за процессом шедулера.


Про prioriy_weight думали менять, а вот про pool спасибо, попробуем.


По поводу предлагаемых вариантов решения:


  • depends_on_past может здесь помочь, хотя явно они получаются независимы, но это хотя бы ограничит одним днейм.
  • в качестве параметризации дагов мы решили использовать файлы конфигурации в формате YAML и соответственно там держать все ключевые параметры дагов и всех возможных изменяемых параметров сабдагов и тасков. В частности у нас в переменной airflow содержится ключевое слово production или staging и по нему уже парсится соотвествующая ветка YAML конфига.

P.S. Еще при scheduler_heartbeat_sec = 5 у нас довольно много логов сыпится, почти под 1Гб в сутки, при этом в основном ненужная инфа. Вот думаем либо величину этого параметра увеличить, либо логирование настроить на другой уровень информативности.

Кстати, у нас есть маленький чатик для разработчиков airflow: t.me/ruairflow
Присоединяйтесь!
Зарегистрируйтесь на Хабре, чтобы оставить комментарий