Комментарии 25
Мы используем у себя AirFlow для запуска периодических задач. И с ним постоянно какие-то проблемы. Регулярно видим "ядерный гриб". Сабдаги просто не работают. Старые issue закрываются, новые открываются. Давно уже хотим съехать с AirFlow на какой-то другой сервис. Но вот на какой — непонятно. Может кто-то посоветует?
У нас гриб появляется в трёх случаях:
— когда в даге что-то не в порядке и тогда мы используем консольную команду list_dags, чтобы выяснить, что именно;
— при маркировке mark success за большое количество дней;
— при большом или неправильном запросе при работе с источниками напрямую, через интерфейс Ad Hoc Query.
Про сабдаги я сказать ничего не могу, мы ими ещё не пользовались.
Что касается альтернативы, из предложенных open source продуктов слышала хорошие отзывы о Luigi. Он попроще, чем airflow, но для некоторых задач подходит лучше.
Смотрели и на Luigi и на Oozie, но они выглядят почти такими же. Не хочется менять шило на мыло.
Вот интересное, для общего понимания, сравнение Airflow, Luigi и Pinball — bytepawn.com/luigi-airflow-pinball.html#luigi-airflow-pinball
Сейчас набирает силу Prefect. Есть даже такая дока на него
https://docs.prefect.io/guide/examples/airflow_tutorial_dag.html
Но вопрос в том, что
- нужен ли еще один продукт, если можно заполировать уже существующий?
- насколько качественно его сделают, т.к. Prefect пока на очень ранней стадии разработки
Сабдаги отлично работают. Мы их используем для:
- объединения несколько задач в один с целью улучшения читаемости всего ДАГа.
- переиспользования кода, когда один набор задач используется в нескольких сабдагов.
У них есть свои минусы в том, что появляется дополнительная сущность сабдага, которая одновременно является задачей в главном ДАГе и также является ДАГом для внутренних тасков. Т.е. при чистке и повторном прогоне например, надо не забывать и то и другое вычищать из базы.
Что касается сбрасывания кеша, я не поняла, что имеется ввиду. Если обновление дага по свежему коду, то пользуемся кнопкой «Обновить» в веб-интерфейсе.
DAG при этом будет выглядеть так:
START -> export_from_all_shards_and_action -> FINISH
Внутри скрипта запускаем 3-6-10-400-1000 потоков и загружаем данные. Сразу после загрузки проверяем результат и, если были фатальные ошибки, то отправляем на рестарт. Иначе делаем какое-то полезное действие.
Со временем образуется 5-7 стандартных загрузчиков из принципиально разных источников. И можно эту логику в отдельные классы вынести, и просто запускать с разными параметрами.
Логика ETL склонна усложнятся со временем. Есть риск, что через годик во всех хитросплетениях и костыликах уже сложно будет разобраться.
Основное преимущество использования всех встроенных возможностей Airflow на мой взгляд — это простая локализация ошибки. Можно с одного взгляда понять, что пропал доступ к источнику или задача в соседнем таске никак не может просчитаться и после починки перезапустить только эту часть.
Что же касается костыликов и хитросплетений, во-первых, у нас есть регламент, согласно которому мы называем задачи и используем те или иные сенсоры и операторы, а во-вторых, тяжёлую повторяющуюся логику мы инкапсулируем в самописные операторы.
А если рассматривать конкретно наш регламент, это достаточно скучное чтиво вроде чтения ГОСТ'ов. Два главных момента — удобство и последовательное их соблюдения — кажутся очевидными. Можно при случае обсудить.
Это вполне себе вариант, но тогда Airflow вырождается в планировщик задач на Питоне.
Мне кажется, что он именно таким и должен быть. Остальное — от Лукаваго.
Мне очень понравился подход от BlueCore, описанный на medium: https://medium.com/bluecore-engineering/were-all-using-airflow-wrong-and-how-to-fix-it-a56f14cb0753
Самое верное использование airflow (берем все его преимущества и очень аккуратно избегаем недостатки)
У нас недавно возникла небольшая проблема, может подскажите как из неё лучше выйти?
Есть DAG, где находится 120 задач, 60 из них на первом уровне, т.е. зависимостей никаких нет. Выполняются, в основном, довольно быстро (от 5 секунд до 120 секунд). Когда мы делаем backfill на этот DAG на 30 дней, то получается 30*120 = 3600 задач, из которых сразу в очередь встаёт 1800. При этом, если попадаются лёгкие задачи, airflow worker быстро его отрабатывает и потом долго перебирает все задачи из пула. В итоге вместо 15 worker, которые положены по конфигу, успевает отработать только 1-2. Это можно решить дополнительной оберткой bash скриптом, где будут задаваться начальные и конечные даты, но, кажется, должен быть вариант получше.
Под airflow у нас отдельная машина (24 CPU, 64Gb памяти), работает с помощью LocalExecutor. Celery пока не прикуритили. Конфиги airflow.cfg у нас следующие:
[core]
parallelism = 16
dag_concurrency = 15
max_active_runs_per_dag = 16
[scheduler]
max_threads = 10
Ещё я бы посмотрела на параметры вроде этого:
# The scheduler constantly tries to trigger new tasks (look at the
# scheduler section in the docs for more information). This defines
# how often the scheduler should run (in seconds).
scheduler_heartbeat_sec = 5
Мы, кстати, довольно много внимания уделяем pool'ам, в которых запускаются задачи и priority_weight, которые помогают ранжировать таски в очереди по приоритетам. Это не совсем в тему данной проблемы, но, например, выделение тяжёлых тасков в отдельный pool с ограничением максимального количества одновременно-запущенных задач может в будущем уберечь от пиков загрузки сервера.
Варианты решения вашей проблемы, которые мне пришли в голову:
* Нужно ли вам, чтобы все дни стартовали одновременно? Если нет, можно выставить параметр дага depends_on_past в True, тогда таски последующих дней не будут становиться в очередь, пока эти же таски в предыдущем дне не выполнятся.
* Параметризация дагов. У нас есть несколько проектов, по которым данные на источнике меняются задним числом. Иногда нам нужно забирать данные за довольно большой период времени. Тогда мы вешаем переключение логики на параметр airflow: обычно это или флаг «пересчитать всю историю» или дата, начиная с которой нужно забрать все данные. И в даге зашиваемся на этот параметр. Но это как раз вариант с баш-скриптом, наверное.
Спасибо за ответ.
У нас шедулер работает через systemd, и он вроде сам следит за процессом шедулера.
Про prioriy_weight думали менять, а вот про pool спасибо, попробуем.
По поводу предлагаемых вариантов решения:
- depends_on_past может здесь помочь, хотя явно они получаются независимы, но это хотя бы ограничит одним днейм.
- в качестве параметризации дагов мы решили использовать файлы конфигурации в формате YAML и соответственно там держать все ключевые параметры дагов и всех возможных изменяемых параметров сабдагов и тасков. В частности у нас в переменной airflow содержится ключевое слово production или staging и по нему уже парсится соотвествующая ветка YAML конфига.
P.S. Еще при scheduler_heartbeat_sec = 5 у нас довольно много логов сыпится, почти под 1Гб в сутки, при этом в основном ненужная инфа. Вот думаем либо величину этого параметра увеличить, либо логирование настроить на другой уровень информативности.
Присоединяйтесь!
del
Airflow Workshop: сложные DAG’и без костылей