Skip to content

Latest commit

 

History

History
117 lines (86 loc) · 6.27 KB

airflow_guide.md

File metadata and controls

117 lines (86 loc) · 6.27 KB

Датаинженерный гайд по работе с Airflow

В этом гайде собраны договорённости, который мы соблюдаем при работе с Apache Airflow.

Структура кода

SQL

Весь SQL лежит в папочке sql, раскиданный по отдельным файликам. Один запрос – один файл.

DAG

Даги сгруппированы по директориям, одна директория – один даг.

Структура внутри директории такая:

  • dag.py - здесь только DAG, его параметры и операторы;
  • helpers.py;
  • constants.py;
  • tests:
    • test_dag.py;
    • test_helpers.py;
    • factories.py.

Общий код

Функции и операторы, которые используются в нескольких дагах, выносятся в папку utils и импортируются оттуда.

Структура utils:

  • operators;
  • sensors;
  • hooks;
  • utils.

В названии файлов внутри директорий не дублируем тип сущности. Так плохо: sensors/github_sensor.py, так хорошо: sensors/github.py.

Модели

Некоторые данные мы храним в PostgreSQL и используем SQLAlchemy для работы с ними.

  • Каждая таблица находится в отдельном файле в папке models. Нейминг файлов по названию таблиц (например dataset_losses.py). -Названия полей – <наименование таблицы-источника>_<наименование поля>, например patient_last_name, clinic_legal_title.
  • В названиях моделей датасетов используем префикс Dataset.
  • В названиях моделей витрин используем префикс Datawallmart.

Миграции

  • Название новой миграции состоит из номера – номера миграций идут подряд, независимо от изменений.
  • В названии после номера указываем суть изменений.

Примеры:

  • 1_some_table_create.py;
  • 2_some_other_table_create.py;
  • 3_some_table_add_beauty_field.py;
  • 4_dataset_my_precious_change_field_types.py.

Pandas

Мы не используем Pandas. Верим, что стандартной библиотеки достаточно для написания хорошего, понятного, поддерживаемого и быстрого кода.

Возможно, с разрастанием кодовой базы мы переиграем это правило, но пока так.

Промежуточные результаты

Промежуточные результаты храним в csv файлах в /tmp директории.

Сами файлы именуются по правилу <dag_id>_<task_id>.csv и имена хранятся в строковых литералах, у которых также есть префиксы:

  • extract_ — для файлов, в которые экспортируются первичные данные;
  • temp_ — для промежуточных файлов после преобразования, если из них данные не записываются сразу в таблицы;
  • load — для файлов с данными, которые загружаются в базу данных.

Правила написания дагов

  • Мы используем контекст менеджеры и не передаем DAG внутрь оператора явно – чтобы было абсолютно четко понятно из каких операторов состоит DAG.
  • start_date задается в виде статического datetime с обязательным указанием часов и минут – потому что у Airflow есть проблемы с динамическими датами при переходе через полночь, а также чтобы всегда было явно понятно время с которого DAG начинает исполнятся.
  • start_date должен быть в прошлом всегда и к текущему моменту как минимум один интервал должен умещаться в этом временном отрезке – это позволяет избежать ненужных проблем со scheduler.
  • schedule_interval задается строкой с использованием cron-синтаксиса – он короче, чем timedelta, более гибкий и больше нам нравится.
  • Мы не используем пресеты @hourly, @daily и т.д. – потому что explicit is better than implicit – в кроне мы явно задаем время исполнения DAG и контролируем это;
  • SubDag'и запрещены – под капотом у них SequentialExecutor, который не годится для бое использования.
  • Для определения зависимостей между тасками используем оператор побитового сдвига >> – потому что это наглядно и читается лучше чем some_task.set_upstream(some_other_task). При этом зависимости задаются слева направо – потому что мы так читаем текст.

Обработка данных

  • General rule of thumb – если можно выполнить простую операцию на стороне datasource, мы выполняем ее на стороне datasource.
  • Все результирующие датасеты (dataset_ & datawalmart_) должны содержать поле last_modified_at типа timestamp.
  • Все даты и времена хранятся в UTC.