Skip to content

Commit

Permalink
[FSTORE-933] DBT Scheduling with External FG (#176)
Browse files Browse the repository at this point in the history
* Tutorial for DBT with External FG
  • Loading branch information
Maxxx-zh authored Oct 12, 2023
1 parent 4596a04 commit b0c8f3d
Show file tree
Hide file tree
Showing 14 changed files with 288 additions and 395 deletions.
13 changes: 0 additions & 13 deletions integrations/dbt_bq/Dockerfile

This file was deleted.

134 changes: 32 additions & 102 deletions integrations/dbt_bq/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@

This tutorial shows you how to perform feature engineering in DBT on BigQuery, storing offline computed features in a table in BigQuery (that is mounted as an external feature group in Hopsworks) and online features in Hopsworks. The online features are written to Hopsworks using a Python module that is run on a DataProc cluster. The feature group created in Hopsworks has its offline data stored in BigQuery and its online data stored in Hopsworks online store (RonDB).

### <span style='color:#ff5f27'> 🏡 Cluster setup </span>
![pipeline](images/pipeline.png)

### <span style='color:#ff5f27'> 🏡 Dataproc Cluster Setup </span>

First, you need to setup a Dataproc (Spark) cluster that will run the Python model in our DBT workflow. The Python model will write to the online feature store in Hopsworks.

Expand Down Expand Up @@ -39,134 +41,62 @@ Grant your service account the next roles:
- Storage Admin


### <span style='color:#ff5f27'>📡 DBT Setup </span>

Install the BigQuery adapter by running
`pip install dbt-bigquery`

Create a new profile inside your ~/.dbt/profiles.yml file.

```
{YOUR_DBT_PROJECT_NAME}:
target: dev
outputs:
dev:
# Type of DBT connector (BigQuery, Snowflake, etc)
type: bigquery
# Authentication method
method: service-account-json
# Your Google Cloud project id
project: [YOUR_GCP_PROJECT_ID]
# Your BigQuery dataset name
dataset: {YOUR_DATASET_NAME}
threads: 1
# These fields come from the service account json keyfile
keyfile_json:
type: xxx
project_id: xxx
private_key_id: xxx
private_key: xxx
client_email: xxx
client_id: xxx
auth_uri: xxx
token_uri: xxx
auth_provider_x509_cert_url: xxx
client_x509_cert_url: xxx
# Your Bucket name
gcs_bucket: {YOUR_BUCKET_NAME}
# Your Dataproc region
dataproc_region: {YOUR_DATAPROC_REGION}
```
### <span style='color:#ff5f27'>👔 DBT Setup </span>

To schedule a DBT model you will use the DBT Cloud Platform.

### <span style='color:#ff5f27'>⚙️ DBT Launch </span>
To begin with, register on the [DBT Cloud page](https://cloud.getdbt.com).

Fill in `read_bigquery_data.sql` and `data_pipeline.py` files with your feature engineering code that creates features and writes them to the BQ offline table and Hopsworks online table.
Next, you need to set up a new project. Name it and press continue.

Use the next command to run DBT models pipeline:
Choose the BigQuery connection.

`dbt run`

You will see the next output:
![output](images/output.png)

> To see the job logs, check your cluster **Job details**.
## <span style='color:#ff5f27'>🗓️ DBT Scheduling </span>
Then upload your **Service Account JSON file**, define a Google Cloud **Storage Bucket name**, **Dataproc Region** and **Dataproc Cluster name**. Set the **Job Execution Timeout Seconds** parameter to **800**.

To schedule a DBT model you will use the Google Cloud Scheduler.
Press **Test Connection**.

You need the next files:
- **script.sh** with commands we want dbt to run in GCP.
- **invoke.go** to create the HTTP server that will run a script.sh.
- **Dockerfile** to build DBT project image.
To setup a repository, select GitHub option and choose a repository with your DBT tutorial project.

All these files are ready for you and are present in repository.
Great! Your project is ready!📈

### <span style='color:#ff5f27'>👩🏻‍🍳 Build Docker image with Cloud Build</span>

To build a Docker image with Cloud Build, navigate to the **dbt_bq** folder using `cd dbt_bq` command and run the next commands in your terminal:
### <span style='color:#ff5f27'> 🏃🏻‍♂️ Run DBT </span>

`gcloud artifacts repositories create {YOUR_DOCKER_REPO_NAME} --repository-format=docker \
--location={YOUR_REGION} --description="Docker repository"`

`gcloud builds submit --region={YOUR_REGION} --tag {YOUR_REGION}-docker.pkg.dev/${gcloud config get-value project}/{YOUR_DOCKER_REPO_NAME}/dbt-tutorial-image:tag1`
Create a new branch.

Now you should see your Docker image in Cloud Build.
Fill in model files with your credentials and push changes on your GitHub repository.

### <span style='color:#ff5f27'>🕵🏻‍♂️ Secret Manager </span>

You need to store the json keyfile in **Secret Manager** to use your credentials inside our Docker image.

Navigate to **Secret Manager** page and press **Create Secret** button.

Name your secret as **dbt_tutorial_secret** and upload your json keyfile.
Use the next command in the DBT terminal to run DBT models pipeline:

`dbt run`

### <span style='color:#ff5f27'>🏃🏻‍♂️ Cloud Run Set Up </span>
> To see the job logs, check your Dataproc cluster **Job details**.
Go to the Cloud Run and press **Create Service**.

Name your service, select your region.
### <span style='color:#ff5f27'>⏰ DBT Job Schedule </span>

In **Container image URL** select your Docker image.
Press the `Deploy` button at the top left corner and then press `Environments`.

In the **Secrets** tab select your created secret. **Reference method** should be *Mounted as volume* and in **Mouth Path** type */secrets*.
Create a new environment, pass environment name.

To test your service use the next command:
Press **Only run on a custom branch** and then pass your branch name.

`curl -H \
"Authorization: Bearer $(gcloud auth print-identity-token)" \
{YOUR_SERVICE_URL}`
Pass the BigQuery table name where DBT models are stored. Press **Save**.

### <span style='color:#ff5f27'>⏰ Cloud Scheduler set up </span>
The last step is to create a scheduled job, that will invoke our Cloud Run service
Press `Create Job` and the `Deploy Job` button.

First, we need to create a service account for this.
Add job name, select environment.

Navigate to IAM & Admin → Service Accounts → Create Service Account.
In `Commands` pass the next: `dbt build --select data_pipeline+`.

Name your account and grant the next Roles:
- Cloud Run Invoker
- Cloud Run Service Agent
This command will run `data_pipeline` model and all models that follow it in order.

Now go to Cloud Scheduler and press **Create a new job**.
Then select `Run on Schedule` and choose the desired schedule.

Name your schedule, select your region, add a description and use `'0 0 * * *'` cron expression to run job at 00:00 (midnight) every day.
In **Advanced Settings** set the `Run Timeout` to **900**.

Configure the job execution:
- **Target Type** - HTTP.
- Enter your Cloud Run URL
- Choose GET method for HTTP requests
- Choose Add OIDC token
- Enter service account email that you’ve created
- Enter your Cloud Run URL
Press `Save` and that's it!

![config_image](images/config.png)
To run your pipeline, press the `Run Now` button.

After creation, select created job and press **Force Run**.
<!-- #endregion -->
Now your feature pipeline is scheduled.
Binary file removed integrations/dbt_bq/images/config.png
Binary file not shown.
Binary file added integrations/dbt_bq/images/pipeline.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
28 changes: 0 additions & 28 deletions integrations/dbt_bq/invoke.go

This file was deleted.

23 changes: 23 additions & 0 deletions integrations/dbt_bq/models/example/big_query_insert.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
def model(dbt, session):
# Setup cluster usage
dbt.config(
submission_method="cluster",
dataproc_cluster_name="{YOUR_DATAPROC_CLUSTER_NAME}",
)

# Read data_pipeline Python model
data_pipeline = dbt.ref("data_pipeline")

# Define the list of columns to drop
columns_to_drop = ['index_column', 'hour', 'day', 'temperature_diff', 'wind_speed_category']

# Drop the specified columns
data_pipeline = data_pipeline.drop(*columns_to_drop)

# Write data to BigQuery table
data_pipeline.write.format('bigquery') \
.option('table', '{YOUR_DATASET_NAME}.{YOUR_TABLE_NAME}') \
.mode('append') \
.save()

return data_pipeline
Loading

0 comments on commit b0c8f3d

Please sign in to comment.