This project implements an end-to-end ETL pipeline to ingest, process, and store weather data from multiple sources (REST API and CSV file) into a cloud-based data warehouse, making it ready for visualization. This pipeline is designed to handle both real-time and batch processing, with orchestration and monitoring capabilities for robust, scalable data processing.
- Event and Metadata Driven: Batch pipeline is triggered by file uploads, whereas hourly pipeline is scheduled and takes input from control table (dim.city)
- Idempotency: Upserts/merge are used to ensure that no duplication of any weather record occurs and all rows in transformed and semantic layers are unique for an hour, city and weather_id.
- Operational Montoring: Failure updates for any pipeline are configured for all activities, ensuring prompt measures.
- Holistic Visualization: Dashboard visualizes live and historical data, along with an attempt to forecast weather.
- Data Lineage: Audit columns such as raw_timestamp and source allow source and ingestion time tracking.
- Event Driven: Use of dim city as control table and scheduled triggers make the pipeline highly automated, requiring minimum intervention.
- Backfilling: Due to the nature of idempotency, backfilling is an embeded feature and old data can be updated or even inserted based on hour, city and weather_id.
- Robustness: Retry mechanisms are in place within the orchestration (e.g., if the API or file ingestion fails, the system should retry before sending failure notifications)
The pipeline follows a layered architecture:
- Data Sources:
- OpenWeather API (Hourly): Provides real-time (hourly) weather data through an API.
- Historical Weather Data CSV: Stored in a cloud-based data lake for batch loading.
- Data Lake: Used as a landing area for the CSV file before ingestion into the data warehouse.
- Data Warehouse: Organized into three layers:
- Raw Layer: Stores raw ingested data from both the API and batch sources.
- Transformed Layer: Data is cleaned, normalized, and transformed here for analysis.
- Semantic/Gold Layer: Final, processed data ready for visualization.
- Orchestration: Schedules and coordinates data loads (hourly and batch).
- Monitoring: Provides alerts and failure notifications to ensure the pipeline is running smoothly.
- Visualization: Connects the final semantic layer to Power BI for live and historical reporting.
The data model for this ETL pipeline is designed with three primary layers to manage data processing and transformation, ensuring each layer serves a specific purpose in preparing data for analysis and reporting. The model progresses through raw, transformed, and semantic layers, each with its own tables and processes. This layered data model ensures a clean, organized, and optimized approach to data processing. Each layer serves a specific function in the ETL pipeline:
- Raw Layer: Ingests and stores unprocessed data.
- Dimensional Table: Enriches data with city information.
- Transformed Layer: Cleanses and prepares data for reporting.
- Semantic Layer: Final reporting-ready data for visualization.
This structured approach makes the pipeline scalable, maintainable, and adaptable for new data sources or additional transformations in the future. More detail in Data directory.
This pipeline automates the batch data ingestion and processing for weather data stored in blob storage, performing a sequence of ETL steps and handling notifications for errors. Here’s a breakdown of each step:
- Copy Data - Blob to Raw Batch Load:
- This step copies data from the blob storage (data lake) into the
raw.batch_weatherdata
table in the data warehouse. It initiates the batch load process.
- This step copies data from the blob storage (data lake) into the
- Stored Procedure -
uspUpsertDimCity
:- This procedure upserts city information into the
dim.city
table, ensuring that city metadata is available for data enrichment in subsequent steps.
- This procedure upserts city information into the
- Stored Procedure -
uspBatchUpsertTransformedWeatherData
:- This procedure merges and transforms raw data with city information, populating the
transformed.weatherdata
table. - The upsert is based on unique columns such as city_name, dt and weather_id
- This procedure merges and transforms raw data with city information, populating the
- Stored Procedure -
uspTruncateInsertSemanticWeatherData
:- This step truncates and inserts data into the
semantic.weatherdata
table, which is optimized for reporting and visualization.
- This step truncates and inserts data into the
- Archive Bulk Load:
- After successfully loading and processing data, this step archives the batch data from the landing directory to a specified archival directory in blob storage, preserving historical data for future reference.
- Delete Landing Bulk Load:
- Finally, the pipeline deletes the original batch data from the landing directory, freeing up storage space and maintaining data lake hygiene.
- Failure Notification Emails:
- If any of the critical stored procedures fail, a notification email is sent to alert the team, allowing for prompt troubleshooting and resolution.
This pipeline ingests and processes real-time weather data from the OpenWeather API on an hourly basis. It iterates over each city, fetching the latest weather data and updating the data warehouse. Here’s a breakdown of each step in this process:
- Lookup - List Cities:
- This initial step retrieves a list of cities from the
dim.city
table. The list is used to iterate over each city in the subsequent steps.
- This initial step retrieves a list of cities from the
- ForEach - ForEachCity:
- This loop iterates through each city obtained from the lookup step. For each city, the following activities are performed:
- API to Raw Hourly Load:
- Loads the hourly weather data from the OpenWeather API into the
raw.hourly_weatherdata
table, storing it as raw data for auditing and historical reference.
- Loads the hourly weather data from the OpenWeather API into the
- Stored Procedure -
uspHourlyUpsertTransformedWeatherData
:- This stored procedure transforms and upserts data into the
transformed.weatherdata
table. It processes the raw data, applies any necessary transformations, and enriches it by joining with city information.
- This stored procedure transforms and upserts data into the
- Stored Procedure -
uspTruncateInsertSemanticWeatherData
:- This step further processes the data by truncating and inserting it into the
semantic.weatherdata
table, which is optimized for reporting. It ensures that only the most current data is available for analysis and visualization.
- This step further processes the data by truncating and inserting it into the
- Failure Notification:
- If any part of the pipeline fails, a failure notification is triggered, sending an email to alert the team. This allows for immediate troubleshooting and ensures that issues are promptly addressed.
This Logic App is designed to handle failure notifications by sending an automated email when triggered by an HTTP request. Here’s how it works:
-
When an HTTP Request is Received:
- This trigger listens for an incoming HTTP request, which can be sent from the data pipeline or other workflows in case of an error or failure event.
- When the HTTP request is received, it initiates the process to send an email notification.
-
Send an Email (V2):
The Power BI dashboard provides a comprehensive view of current and historical weather data, offering insights into real-time conditions and trends over time. It is designed to help users track weather patterns, monitor current conditions, and analyze trends based on historical data.
- Latest Weather:
- Displays the most recent hourly weather data, including temperature, feels-like temperature, wind speed, humidity, and weather conditions.
- Key metrics such as the 7-day rolling average temperature, today’s maximum and minimum temperatures are highlighted for quick insights.
- City Selector:
- Allows users to filter data by city (e.g., Bristol, Cardiff, London), enabling a focused view of weather data specific to each location.
- Historic Trends:
- Provides interactive filters for year, month, day, and date, allowing users to explore historical weather data and trends.
- Graphs show the Average Temperature (actual vs. feels-like) and Average Wind speed over the day.
- Forecast:
- Visualizes projected average temperatures for the coming years, providing a forecasted trend line with confidence intervals.
- Summary Statistics:
- Displays important aggregated metrics such as average temperature, maximum/minimum temperature, maximum humidity, and maximum wind speed, giving users a snapshot of weather extremes and averages.
- Geographical Map:
- Shows the average temperature distribution across selected cities, providing a spatial overview of temperature variations.
This dashboard combines real-time data, historical trends, and forecasted weather to give users a complete overview of weather patterns. With interactive filtering options, users can customize their view to explore specific time frames, cities, or weather metrics, making it a powerful tool for analyzing and understanding weather data trends.
- Schema Validation: Get Metadata activity in ADF can be used to validate schema of API and bulk CSV.
- Optimization: Possible use of lakehouse for partitioning. This will result in cost reduction for storage and compute resulting in scalability. Optimization of joins and queries by indexing key columns.