Skip to content

Commit

Permalink
[FSTORE-1018] Airflow Tutorial with GCP (#202)
Browse files Browse the repository at this point in the history
* Airflow with GCP
  • Loading branch information
Maxxx-zh authored Sep 29, 2023
1 parent e0812e8 commit 4596a04
Show file tree
Hide file tree
Showing 14 changed files with 425 additions and 0 deletions.
199 changes: 199 additions & 0 deletions integrations/airflow_gcp/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
## <span style='color:#ff5f27'> 👨🏻‍🏫 Airflow Tutorial with GCP </span>

This tutorial shows how to schedule a Feature Pipeline using Airflow with GCP.

Apache Airflow is an open-source platform used for orchestrating complex workflows and data pipelines. It provides a framework for defining, scheduling, and monitoring workflows as directed acyclic graphs (DAGs). Airflow is often used for automating and managing data ETL (Extract, Transform, Load) processes, data pipelines, and other batch or stream processing tasks.

Feature Pipeline consists of two steps:

1. Parse weather data.

2. Insert parsed data into Hopsworks Feature Group.

You will schedule your Feature Pipeline to run daily.

### <span style='color:#ff5f27'>👮🏻‍♂️ Service Account Setup </span>

To create a service account in GCP console go to IAM & Admin → Service Accounts → Create Service Account.

Grant your service account the following role:

- Dataflow Worker

### <span style='color:#ff5f27'> 🏡 VM instance Setup </span>

You can find the code to create the VM instance in the `provision.sh`.

To make `provision.sh` file executable, run the following command:

`chmod +x provision.sh`

Fill in your cluster information and then run the `./provision.sh` command.

Now you should see your cluster. Press `SSH connect` to open a terminal.

![instance](images/instance.png)

Type `python3 -V` to make sure Python is installed (You should see your Python version).

The next step is to install miniconda and create a virtual environment.

Paste the next command in your instance terminal:

```
mkdir -p ~/miniconda3
wget https://repo.anaconda.com/miniconda/Miniconda3-latest-Linux-x86_64.sh -O ~/miniconda3/miniconda.sh
bash ~/miniconda3/miniconda.sh -b -u -p ~/miniconda3
rm -rf ~/miniconda3/miniconda.sh
~/miniconda3/bin/conda init bash
~/miniconda3/bin/conda init zsh
```

> ⚠️ Close your terminal and open a new one.
Now you can create a virtual environment and activate it. Paste the next command in your instance terminal:

```
mkdir airflow
cd airflow
conda create --name airflow python=3.8 -y
conda activate airflow
```

#### <span style='color:#ff5f27'>⚙️ Airflow Installation </span>

The next step is Airflow installation. Paste the next command in your instance terminal:

```
AIRFLOW_VERSION=2.0.1
PYTHON_VERSION=3.8
CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt"
pip install "apache-airflow[gcp]==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}"
```

To make sure that Airflow is correctly installed, use the `airflow version` command.

#### <span style='color:#ff5f27'>⚙️ Hopsworks Installation </span>

To begin with, you need to install the "build-essential" package, which is a metapackage that includes a collection of essential development tools and libraries.

Run the next command in your instance terminal:

```
sudo apt-get update -y
sudo apt-get install build-essential
```

The next step is the installation of the **twofish** and **hopsworks** packages. Run the next command:

```
pip install twofish
pip install hopsworks
```

To verify that the **hopsworks** library is correctly installed, use the next command:

```
pip show hopsworks
```

Great, your VM instance is ready! 📈 🎉

### <span style='color:#ff5f27'>🗄️ Init metadata database and register the admin user </span>

Run the next command to init a database and register the admin user:

```
airflow db init
airflow users create -r Admin -u username -p mypassword -e [email protected] -f yourname -l lastname
```

### <span style='color:#ff5f27'>🛠️ Airflow Setup </span>

To be able to access the Airflow webserver you need to **open the 8080 port**.

Go to the **VPS Network -> Firewall -> Create Firewall Rule**.

![firewallrule1](images/firewallrule1.png)

![firewallrule2](images/firewallrule2.png)

Press the **Create** button.

Go to **Compute Engine -> VM instances** and click on your instance.

Then press the **Edit** button and add the firewall rule you created.

![attach_firewallrule](images/attach_firewallrule.png)

Go back to the terminal and start the Web Server using the next command:

```
airflow webserver -p 8080
```

> ⚠️ Leave your terminal running and open a new one.
Now you need to start the Scheduler. The scheduler ensures that tasks run in the correct order and at the right time.

```
cd airflow
AIRFLOW_HOME=$(pwd)
export AIRFLOW_HOME=$AIRFLOW_HOME
conda activate airflow
airflow db init
airflow scheduler
```

The next step is to copy the external IP of your instance to your clipboard.

![external_ip](images/external_ip.png)

On your browser go to the `https://{YOUR_INSTANCE_EXTERNAL_IP}:8080`.

Login with the user and password you have created when the DB was initialized.

![airflow_ui](images/airflow_ui.png)

Congratulations, your Airflow Webserver is ready! 🥳 📈

The next step is to prepare a DAG with tasks for Feature Pipeline.

### <span style='color:#ff5f27'> 📝 DAG Setup </span>

You can find the code to create the DAG with tasks for Feature Pipeline in the `data_parsing.py` file which is located in the `dags` folder.

Open your code editor to edit `data_parsing.py` file and replace `{YOUR_HOPSWORKS_API_KEY}` with your `HOPSWORKS_API_KEY` which you can get on [Hopsworks App](https://app.hopsworks.ai/login).

To upload this file to the cluster open a new terminal and press **Upload File**. Choose the `data_parsing.py` and press **Upload Files**.

To ensure that file is uploaded use the `ls` command. The `data_parsing.py` should be present there.

The next step is to create a **dags** folder and move `data_parsing.py` file there. Use the next command:

```
conda activate airflow
cd airflow
mkdir dags
cd ..
mv data_parsing.py airflow/dags
```

Refresh your Airflow UI page. Now you should see newly created **Feature_Pipeline** DAG.

![feature_pipeline](images/feature_pipeline.png)

> In case you don't see your DAG, just wait a moment for it to refresh.
Activate(1) the **Feature_Pipeline** DAG and then trigger(2) it. The green circle(3) means that run was successful.

![dag_run](images/dag_run.png)

In the Hopsworks UI you can see that the **Weather Feature Group** is successfully created.

![fg](images/fg.png)

![data_preview](images/data_preview.png)


Congratulations! You successfully scheduled your Feature Pipeline using Airflow with GCP! 🥳 🎉
193 changes: 193 additions & 0 deletions integrations/airflow_gcp/dags/data_parsing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
import datetime
import requests
import time
import json
import datetime
import pandas as pd
import hopsworks

# Parse Weather Data
def get_city_weather_data(
city_name: str,
coordinates: list,
start_date: str,
end_date: str = None,
forecast: bool = True
):
"""
Takes city name, coordinates and returns pandas DataFrame with weather data.
Examples of arguments:
coordinates=(47.755, -122.2806), start_date="2023-01-01"
"""
start_of_cell = time.time()

if not end_date:
end_date = start_date

latitude, longitude = coordinates

params = {
'latitude': latitude,
'longitude': longitude,
'hourly': ["temperature_2m",
"relativehumidity_2m",
"weathercode",
"windspeed_10m",
"winddirection_10m",
],
'start_date': start_date,
'end_date': end_date,
'timezone': "Europe/London"
}

if forecast:
# historical forecast endpoint
base_url = 'https://api.open-meteo.com/v1/forecast'
else:
# historical observations endpoint
base_url = 'https://archive-api.open-meteo.com/v1/archive'

try:
response = requests.get(base_url, params=params)
except ConnectionError:
response = requests.get(base_url, params=params)

response_json = response.json()
res_df = pd.DataFrame(response_json["hourly"])

# rename columns
res_df = res_df.rename(columns={
"time": "base_time",
"temperature_2m": "temperature",
"weathercode": "weather_code",
"relativehumidity_2m": "relative_humidity",
"windspeed_10m": "wind_speed",
"winddirection_10m": "wind_direction"
})

# change columns order
res_df = res_df[
['base_time',
'temperature',
'relative_humidity',
'weather_code',
'wind_speed',
'wind_direction']
]

# convert dates in 'date' column
res_df["base_time"] = pd.to_datetime(res_df["base_time"])
res_df['city_name'] = city_name
res_df['forecast_hr'] = 0

end_of_cell = time.time()
print(f"Parsed weather for {city_name} since {start_date} till {end_date}.")
print(f"Took {round(end_of_cell - start_of_cell, 2)} sec.\n")

return res_df

def get_weather_data():
city_coords = {
"London": [51.51, -0.13],
"Paris": [48.85, 2.35],
"Stockholm": [59.33, 18.07],
"New York": [40.71, -74.01],
"Los Angeles": [34.05, -118.24],
"Singapore": [1.36, 103.82],
"Sydney": [-33.87, 151.21],
"Hong Kong": [22.28, 114.16],
"Rome": [41.89, 12.48],
"Kyiv": [50.45, 30.52]
}

# Get today's date
today = datetime.datetime.today().date().strftime("%Y-%m-%d")

# Parse and insert updated data from observations endpoint
parsed_df = pd.DataFrame()

for city_name, city_coord in city_coords.items():
weather_df_temp = get_city_weather_data(
city_name,
city_coord,
today,
)
parsed_df = pd.concat([parsed_df, weather_df_temp])

# Perform feature engineering
parsed_df['index_column'] = parsed_df.index
parsed_df['hour'] = parsed_df['base_time'].dt.hour
parsed_df['day'] = parsed_df['base_time'].dt.day
parsed_df['temperature_diff'] = parsed_df.groupby('city_name')['temperature'].diff()
parsed_df['wind_speed_category'] = pd.cut(
parsed_df['wind_speed'],
bins=[0, 2.5, 5.0, 7.5, float('inf')],
labels=['Low', 'Moderate', 'High', 'Very High']
).astype(str)
parsed_df["base_time"] = parsed_df["base_time"].astype(int) // 10**9
parsed_df.fillna(0, inplace=True)

return parsed_df.to_json(orient='records')


def insert_data(**kwargs):

# Retrieve the output from the weather_data task
ti = kwargs['ti']
weather_data_json = ti.xcom_pull(task_ids='parse_weather_data')

# Parse the JSON string into a list of dictionaries
weather_data_list = json.loads(weather_data_json)

# Convert the list of dictionaries into a Pandas DataFrame
weather_data = pd.DataFrame(weather_data_list)

# Your code to insert data into Hopsworks Feature Store using weather_data
print("Received weather data:", weather_data)

project = hopsworks.login(
api_key_value='{YOUR_HOPSWORKS_API_KEY}',
)

fs = project.get_feature_store()

weather_fg = fs.get_or_create_feature_group(
name="weather_fg",
version=1,
description="Weather data",
primary_key=["city_name", "hour"],
online_enabled=True,
event_time="base_time",
)
weather_fg.insert(weather_data)

return


default_args = {
'start_date': datetime.datetime(2023, 1, 1),
}

with DAG(
'Feature_Pipeline',
description='DAG to Parse, Transform and Insert data into Hopsworks Feature Store',
default_args=default_args,
schedule_interval='@daily',
catchup=False,
) as dag:

weather_data = PythonOperator(
task_id='parse_weather_data',
python_callable=get_weather_data,
)

insertion = PythonOperator(
task_id='insert_weather_data',
python_callable=insert_data,
)

weather_data >> insertion

Binary file added integrations/airflow_gcp/images/airflow_ui.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added integrations/airflow_gcp/images/dag_run.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added integrations/airflow_gcp/images/data_preview.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added integrations/airflow_gcp/images/external_ip.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added integrations/airflow_gcp/images/fg.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added integrations/airflow_gcp/images/instance.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading

0 comments on commit 4596a04

Please sign in to comment.