В этом гайде собраны договорённости, который мы соблюдаем при работе с Apache Airflow.
Весь SQL лежит в папочке sql
, раскиданный по отдельным файликам.
Один запрос – один файл.
Даги сгруппированы по директориям, одна директория – один даг.
Структура внутри директории такая:
dag.py
- здесь только DAG, его параметры и операторы;helpers.py
;constants.py
;tests
:test_dag.py
;test_helpers.py
;factories.py
.
Функции и операторы, которые используются в нескольких дагах, выносятся
в папку utils
и импортируются оттуда.
Структура utils
:
operators
;sensors
;hooks
;utils
.
В названии файлов внутри директорий не дублируем тип сущности.
Так плохо: sensors/github_sensor.py
, так хорошо: sensors/github.py
.
Некоторые данные мы храним в PostgreSQL и используем SQLAlchemy для работы с ними.
- Каждая таблица находится в отдельном файле в папке models.
Нейминг файлов по названию таблиц (например
dataset_losses.py
). -Названия полей –<наименование таблицы-источника>_<наименование поля>
, напримерpatient_last_name
,clinic_legal_title
. - В названиях моделей датасетов используем префикс
Dataset
. - В названиях моделей витрин используем префикс
Datawallmart
.
- Название новой миграции состоит из номера – номера миграций идут подряд, независимо от изменений.
- В названии после номера указываем суть изменений.
Примеры:
1_some_table_create.py
;2_some_other_table_create.py
;3_some_table_add_beauty_field.py
;4_dataset_my_precious_change_field_types.py
.
Мы не используем Pandas. Верим, что стандартной библиотеки достаточно для написания хорошего, понятного, поддерживаемого и быстрого кода.
Возможно, с разрастанием кодовой базы мы переиграем это правило, но пока так.
Промежуточные результаты храним в csv файлах в /tmp
директории.
Сами файлы именуются по правилу <dag_id>_<task_id>.csv
и имена
хранятся в строковых литералах, у которых также есть префиксы:
extract_
— для файлов, в которые экспортируются первичные данные;temp_
— для промежуточных файлов после преобразования, если из них данные не записываются сразу в таблицы;load
— для файлов с данными, которые загружаются в базу данных.
- Мы используем контекст менеджеры и не передаем DAG внутрь оператора явно – чтобы было абсолютно четко понятно из каких операторов состоит DAG.
start_date
задается в виде статическогоdatetime
с обязательным указанием часов и минут – потому что у Airflow есть проблемы с динамическими датами при переходе через полночь, а также чтобы всегда было явно понятно время с которого DAG начинает исполнятся.start_date
должен быть в прошлом всегда и к текущему моменту как минимум один интервал должен умещаться в этом временном отрезке – это позволяет избежать ненужных проблем со scheduler.schedule_interval
задается строкой с использованием cron-синтаксиса – он короче, чемtimedelta
, более гибкий и больше нам нравится.- Мы не используем пресеты
@hourly
,@daily
и т.д. – потому чтоexplicit is better than implicit
– в кроне мы явно задаем время исполнения DAG и контролируем это; - SubDag'и запрещены – под капотом у них SequentialExecutor, который не годится для бое использования.
- Для определения зависимостей между тасками используем оператор побитового
сдвига
>>
– потому что это наглядно и читается лучше чемsome_task.set_upstream(some_other_task)
. При этом зависимости задаются слева направо – потому что мы так читаем текст.
- General rule of thumb – если можно выполнить простую операцию на стороне datasource, мы выполняем ее на стороне datasource.
- Все результирующие датасеты (
dataset_
&datawalmart_
) должны содержать полеlast_modified_at
типаtimestamp
. - Все даты и времена хранятся в UTC.