Как стать автором
Обновить
110.71
Леруа Мерлен
Мы строим технологическую компанию-платформу.

DAG’и без напрягов: наш опыт использования метаданных при работе с Apache Airflow

Время на прочтение4 мин
Количество просмотров6.3K

Apache Airflow – простой и удобный batch-ориентированный инструмент для построения, планирования и мониторинга дата-пайплайнов. Ключевой его особенностью является то, что, используя Python-код и встроенные функциональные блоки, можно соединить множество различных технологий, использующихся в современном мире. Основная рабочая сущность Airflow – DAG – направленный ацикличный граф, в котором узлами являются задачи, а зависимости между задачами представлены направленными ребрами. 

Те, кто использует Apache Airflow для оркестрации задач загрузки данных в хранилище, наверняка оценили гибкость, которую он предоставляет для решения шаблонных задач. Когда весь процесс разработки сводится к заполнению конфигурационного файла с описанием параметров DAGа и списком задач, которые должны выполняться. У нас в Леруа Мерлен такой подход успешно используется для создания задач по перекладыванию данных из raw-слоя в ods-слой хранилища. Поэтому было решено распространить его на задачи по заполнению витрин данных.  

Основная сложность состояла в том, что единой методологии разработки витрин данных и процедур по их заполнению у нас пока нет. И каждый разработчик решал задачу, основываясь на своих личных предпочтениях и опыте. Это укладывается в один из основных корпоративных IT принципов - ”You build it – you run it”, который означает, что разработчик несет ответственность за свое решение и сам его поддерживает. Данный принцип хорош для быстрой проработки гипотез, но для однотипных вещей больше подходит стандартное решение. 

Как было

Тут стоит рассказать, как велась до этого разработка для загрузки витрин данных. Разработчик пишет процедуры загрузки в GreenPlum, разрабатывает DAGи для их запуска, после чего создает по шаблону новый репозиторий на GitHub, загружает код своих DAGов и добавляет свой репозиторий в основной проект Airflow в качестве сабмодуля. При таком подходе возникали следующие трудности:

  1. Нужно погружение в Python и Apache Airflow;

  2. На момент начала разработки релиз основного проекта происходил раз в неделю, поэтому, чтоб увидеть свои DAGи на проде Airflow, нужно было подождать;

  3. Основной проект постепенно разрастался и начал притормаживать при деплое;

  4. Разбросанным по разным репозиториям кодом DAGов, выполняющих однотипные задачи, сложно управлять;

  5. Отсутствие единого подхода также влияло и на качество SQL-кода процедур. Часто можно было встретить сложную логику по управлению параметрами загрузки, которую легко можно было «перевесить» на Airflow.

Все вышеперечисленное привело нас к мысли, что пора брать ситуацию под свой контроль и заняться разработкой стандартного решения. Анализ существующих DAG-ов показал, что большинство из них очень простые, не содержат сложных зависимостей и состоят, в основном, из DummyOperator-ов и PostgresOperator-ов. Это и послужило отправной точкой для разработки нового инструмента, который, в свою очередь, должен был:

  1. Уметь создавать DAG-и на основе конфигурационного файла в формате YAML, в которым бы были указаны основные параметры, как-то: дата старта, расписание, параметры подключения к БД, названия запускаемых процедур, их параметры и т. д. YAML файлы должны храниться внутри корпоративного сервиса по управлению метаданными, получить их содержимое можно через API;

  2. Быть максимально простым, иметь понятную документацию, чтобы погружение не занимало много времени;

  3. В то же время быть максимально гибким, уметь работать с максимально возможным количеством параметров настройки DAG-а в Airflow.

Что есть

Получился примерно вот следующий шаблон для конфигурационного файла:

Из которого создается вот такой DAG:

Описание параметров

Общие параметры:

  • module_name – нужен для формирование DAG_ID;

  • pool – пул, в котором будут запущены задачи;

  • queue – очередь для задач;

  • owner – владелец DAGа;

  • postgres_conn_id – строка подключения к БД;

  • email – список емейлов для рассылки алертов;

  • tags – список тэгов для поиска DAGа в UI;

  • access_control: роль для управление DAGом;

  • schedule_interval – расписание для запуска DAGа;

  • start_date и catchup – параметры, управляющие глубиной истории загрузки. Airflow использует интервальный подход. Это означает, что временной период от start_date и до опциональной end_date (мы не используем) разбивается на интервалы, указанные в schedule_interval. Если catchup True, то запуск DAGА начнется от start_date, если False, то с текущего интервала;

  • schema_name – схема БД, в которой находится витрина;

  • task_list – список задач в DAGе.

Основные параметры задач:

  • task_name – соответствует task_id Airflow

  • task_type – тип задачи

  • task_schema_name - схема БД, в которой находится витрина, если схема отличается от общей

  • task_conn_id – строка подключения, если отличается от общей

  • procedure_name – процедура загрузки витрины

  • params – список параметров процедуры и их значений

  • task_depends_on – список задач, от которых зависит запуск данной задачи

  • priority_weight – приоритет данной задачи по отношению к другим задачам

  • task_concurrency - количество одновременно запущенных экземпляров задачи во всех запущенных экземплярах DAGа

Сейчас существует три типа задач(task_type):

1) Dummy – соответствует DummyOperator. Задача, которая ничего не выполняет и обычно служит начальной и конечной задачей, а также для разделения задач на блоки.

2) Обычная загрузка – соответствует PostgresOperator в Airflow 

Вот так выглядит SQL-код, который генерит эта задача:

3) Множественная загрузка – много PostgresOperator(если нужно создать кучу однотипных задач, различающихся по одному параметру) 

У этого типа есть свои специфические параметры:

  • task_multiply - может иметь 2 значения: "schema" или "params". Если указано   schema", то значения из task_multiply_list добавляются в выражение SEARCH_PATH.                     Если "params", то значения из task_multiply_list добавляются в список параметров процедуры для параметра из списка params, у которого в значении указано 'task_multiply_list’

  • task_multiply_list  - список значений для параметра, по которому будут создаваться однотипные задачи

В результате получается такой SQL-код.

Для “schema”:

Для “params”:

А вот так проставляются зависимости между задачами:

Куда пойдем

Внедрение инструмента позволило существенно сократить время на разработку DAGов. Глубокое погружение в Apache Airflow больше не нужно, хотя почитать про макросы и расписание все-таки придется. Шаблон конфигурационного файла заполняется минут за 10-15. Время, затрачиваемое на ревью и деплой на прод, тоже сильно сократились. Однако здесь же и кроется основная зона для развития: сейчас ревью и деплой происходят в ручном режиме. Хочется обложить все это тестами и предоставить разработчику возможность самому отправлять свои DAGи на прод.

Теги:
Хабы:
+3
Комментарии5

Публикации

Информация

Сайт
leroymerlin.ru
Дата регистрации
Дата основания
2004
Численность
свыше 10 000 человек
Местоположение
Россия
Представитель
Nastianastasia