- Airflow: Оркестрация.
- Spark: Обработка данных.
- PostgreSQL и MySQL: Реляционные базы данных.
- Kafka: Брокер сообщений.
- Python: Генераторы данных для PostgreSQL и Kafka.
- Docker: Контейнеризация сервисов.
Система состоит из 14 контейнеров:
- PostgreSQL
- MySQL
- Spark Master
- Spark Worker 1,2,3
- Генератор данных для PostgreSQL (
pg_datagen
) - Генератор данных для Kafka (
kafka_datagen
) - Kafka Init
- Kafka
- Zookeeper
- Airflow Init
- Airflow Scheduler
- Airflow Webserver
Все контейнеры основаны на открытых Docker Image. Они автоматически настраиваются и проверяют свою готовность с использованием Healthcheck. Система обеспечена скриптами для автоматической генерации данных, их репликации, стриминга и создания аналитических витрин.
-
users: Информация о пользователях.
user_id
(PK): Уникальный идентификатор пользователя.first_name
: Имя пользователя.last_name
: Фамилия пользователя.email
: Электронная почта (уникальное значение).phone
: Номер телефона.registration_date
: Дата регистрации.loyalty_status
: Статус лояльности (Gold
,Silver
,Bronze
).
-
productcategories: Иерархия категорий товаров.
category_id
(PK): Уникальный идентификатор категории.name
: Название категории.parent_category_id
(FK): Ссылка на родительскую категорию.
-
products: Информация о товарах.
product_id
(PK): Уникальный идентификатор товара.name
: Название товара.description
: Описание товара.category_id
(FK): Категория товара.price
: Цена товара.stock_quantity
: Количество товара на складе.creation_date
: Дата добавления товара.
-
orders: Информация о заказах.
order_id
(PK): Уникальный идентификатор заказа.user_id
(FK): Пользователь, сделавший заказ.order_date
: Дата заказа.total_amount
: Общая сумма заказа.status
: Статус заказа (Pending
,Completed
, и т.д.).delivery_date
: Дата доставки.
-
orderdetails: Детали заказов.
order_detail_id
(PK): Уникальный идентификатор детали заказа.order_id
(FK): Ссылка на заказ.product_id
(FK): Ссылка на товар.quantity
: Количество товаров в заказе.price_per_unit
: Цена за единицу товара.total_price
: Общая стоимость позиции.
-
reviews: Отзывы о товарах.
review_id
(PK): Уникальный идентификатор отзыва.user_id
(FK): Пользователь, оставивший отзыв.product_id
(FK): Продукт, на который оставлен отзыв.rating
: Оценка товара (от 1 до 5).review_text
: Текст отзыва.created_at
: Дата создания отзыва.
-
loyaltyPoints: Система лояльности.
loyalty_id
(PK): Уникальный идентификатор записи.user_id
(FK): Пользователь, получивший бонусные баллы.points
: Количество начисленных баллов.reason
: Причина начисления (например, "Order", "Promotion").created_at
: Дата начисления.
Вся конфигурация проекта управляется через единый файл .env
, который содержит параметры для всех сервисов, включая логины, пароли, порты и настройки генерации данных.
- URL:
jdbc:postgresql://localhost:5432/postgres_db?currentSchema=source
- Логин:
db_user
- Пароль:
qwerty
- URL:
jdbc:mysql://localhost:3306/mysql_db
- Логин:
db_user
- Пароль:
qwerty
- URL:
http://localhost:8080
- Логин:
admin
- Пароль:
admin
- URL:
http://localhost:8081
- Bootstrap Servers:
localhost:9092
- Топик:
users-data
Параметры генерации для PostgreSQL по умолчанию:
- Количество пользователей: 500
- Количество товаров: 800
- Количество заказов: 3000
- Детали заказов: от 1 до 10 на заказ
- Категории товаров: 20
- Отзывов о товарах: 2000
- Начислений бонусов: 3000
Параметры генерации для Kafka по умолчанию:
- Интервал генерации событий: 5 секунд
- Топик Kafka:
users-data
- События генерируются в формате JSON с полями:
first_name
,last_name
,email
,phone
,registration_date
,loyalty_status
.
В Airflow реализована репликация данных из PostgreSQL в MySQL. DAG выполняет следующие задачи:
- Извлечение данных из PostgreSQL.
- Трансформация данных через Spark.
- Сохранение данных в MySQL.
- Путь к DAG:
code/airflow/dags/replicate_postgres_to_mysql.py
- Описание: DAG, который использует
SparkSubmitOperator
для запуска Spark-скрипта репликации таблиц из PostgreSQL в MySQL.
В Airflow реализована обработка данных из Kafka. DAG выполняет следующие задачи:
- Получение данных из топика Kafka.
- Обработка данных с использованием Spark.
- Сохранение данных в PostgreSQL.
- Путь к DAG:
code/airflow/dags/kafka_to_postgres_streaming.py
- Описание: DAG, который использует
SparkSubmitOperator
для запуска Spark-скрипта стриминга данных из Kafka в PostgreSQL.
Витрина для последующего анализа поведения пользователей - количества заказов и общей суммы затрат, разбитых по статусам заказов.
Поле | Описание |
---|---|
user_id | Идентификатор пользователя |
first_name | Имя пользователя |
last_name | Фамилия пользователя |
status | Статус заказа |
order_count | Количество заказов |
total_spent | Общая сумма затрат |
Витрина для последующего анализа рейтинга товаров. Может быть использована, например, для исследования популярности товаров и их соответствия ожиданиям клиентов.
Поле | Описание |
---|---|
product_id | Идентификатор товара |
name | Название товара |
rating | Средний рейтинг |
Витрина для последующего анализа пользователей по количеству бонусных баллов. Может быть использована, например, для исследования групп пользователей в рамках бонусной системы.
Поле | Описание |
---|---|
user_id | Идентификатор пользователя |
first_name | Имя пользователя |
last_name | Фамилия пользователя |
loyalty_status | Уровень бонусной системы |
total_loyalty_points | Количество бонусных баллов |
Витрина для последующего анализа средних чеков с разбивкой по статусу заказа и статусу лояльности. Может быть использована, например, для оценки эффективности маркетинговых стратегий и программ лояльности, или для настройки акций и предложений для разных групп клиентов.
Поле | Описание |
---|---|
status | Статус заказа |
loyalty_status | Статус лояльности пользователя |
average_check | Средний чек для группы заказов |
Для создания витрин используются Spark-скрипты. Скрипты загружают данные из MySQL и PostgreSQL, выполняют агрегации и сохраняют результаты обратно в базу данных. Шаги:
- Загрузка исходных данных из базы данных.
- Выполнение трансформаций (объединения, группировки, агрегации и т.д.).
- Сохранение результирующих витрин в целевую базу данных.
- Запустить команду:
docker compose up --build -d
- После сборки проекта и его развертывания будут доступны интерфейсы PostgreSQL, MySQL, Airflow, Kafka и Spark по указанным выше URL.
- Все, что остается сделать вручную после окончания деплоя — включить (перевести в
unpaused
) DAG в UI Airflow.
Проект организован следующим образом:
DE2024_PY_CourseWork/
├── .env # Переменные окружения для настройки всех сервисов
├── docker-compose.yml # Конфигурация Docker Compose
├── setup/ # Инфраструктура контейнеров (ранее "infra")
│ ├── airflow/ # Конфигурация Airflow
│ │ ├── init/
│ │ ├── scheduler/
│ │ └── webserver/
│ ├── datagen/ # Конфигурация для генераторов данных
│ │ ├── pg_datagen/
│ │ └── kafka_datagen/
│ ├── db/ # Конфигурация СУБД
│ │ ├── mysql/
│ │ └── postgresql/
│ ├── messaging/ # Конфигурация ZK&Kafka
│ │ ├── kafka/
│ │ ├── kafka_init/
│ │ └── zookeeper/
│ └── spark/ # Конфигурация Spark
│ ├── spark-master/
│ └── spark-worker/
├── code/ # Исходный код
│ ├── airflow/ # DAG и скрипты для Airflow
│ │ ├── dags/
│ │ └── scripts/
│ │ ├── helpers/
│ │ └── pyspark_scripts/
│ ├── datagen/ # Генераторы данных
│ │ ├── datagen_postgres/ # Скрипты для генерации данных в PG
│ │ └── kafka_datagen/ # Скрипты для генерации данных в Kafka
│ └── scripts/ # Дополнительные скрипты
├── postgresdb/ # Модели и схемы для PostgreSQL
│ ├── models.py
│ ├── schemas/
│ │ ├── loyalty_schema.py
│ │ ├── review_schema.py
│ │ ├── user_schema.py
│ │ ├── category_schema.py
│ │ ├── product_schema.py
│ │ ├── order_schema.py
│ │ └── order_detail_schema.py
│ └── db_config.py
└── README.md # Текущий файл документации