Skip to content

Commit

Permalink
Add databricks version of module 3
Browse files Browse the repository at this point in the history
Signed-off-by: Danny Chiao <[email protected]>
  • Loading branch information
adchia committed Oct 27, 2022
1 parent 802de24 commit 8652344
Show file tree
Hide file tree
Showing 46 changed files with 604 additions and 9 deletions.
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,6 @@ terraform.tfstate.backup
*.iml
**/feast-postgres-data/*
**/airflow_demo/airflow_home/*
.vscode/*
.vscode/*
**/derby.log
**/metastore_db/*
13 changes: 7 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,10 @@ Since we'll be learning how to leverage Feast in CI/CD, you'll also need to fork

These are meant mostly to be done in order, with examples building on previous concepts.

| Time (min) | Description | Module&nbsp;&nbsp;&nbsp; |
| :--------: | :------------------------------------------------------------------------------- | :----------------------------- |
| 30-45 | Setting up Feast projects & CI/CD + powering batch predictions | [Module 0](module_0/README.md) |
| 15-20 | Streaming ingestion & online feature retrieval with Kafka, Spark, Airflow, Redis | [Module 1](module_1/README.md) |
| 10-15 | Real-time feature engineering with on demand transformations | [Module 2](module_2/README.md) |
| 30 | (WIP) Orchestrated batch/stream transformations using dbt + Airflow with Feast | [Module 3](module_3/README.md) |
| Time (min) | Description | Module&nbsp;&nbsp;&nbsp; |
| :--------: | :------------------------------------------------------------------------------- | :--------------------------------------------- |
| 30-45 | Setting up Feast projects & CI/CD + powering batch predictions | [Module 0](module_0/README.md) |
| 15-20 | Streaming ingestion & online feature retrieval with Kafka, Spark, Airflow, Redis | [Module 1](module_1/README.md) |
| 10-15 | Real-time feature engineering with on demand transformations | [Module 2](module_2/README.md) |
| 30 | Orchestrated batch/stream transformations using dbt + Airflow with Feast | [Module 3 (Snowflake)](module_3_sf/README.md) |
| 30 | (WIP) Orchestrated batch/stream transformations using dbt + Airflow with Feast | [Module 3 (Databricks)](module_3_db/README.md) |
312 changes: 312 additions & 0 deletions module_3_db/README.md

Large diffs are not rendered by default.

70 changes: 70 additions & 0 deletions module_3_db/airflow_demo/dag.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
import os
from airflow.decorators import task
from airflow.models.dag import DAG
from airflow.operators.bash import BashOperator
from feast import RepoConfig, FeatureStore
from feast.infra.offline_stores.contrib.spark_offline_store.spark import (
SparkOfflineStoreConfig,
)
from feast.repo_config import RegistryConfig
from feast.infra.online_stores.dynamodb import DynamoDBOnlineStoreConfig
import pendulum

with DAG(
dag_id="feature_dag",
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
description="A dbt + Feast DAG",
schedule="@daily",
catchup=False,
tags=["feast"],
) as dag:
dbt_test = BashOperator(
task_id="dbt_test",
bash_command="""
cd ${AIRFLOW_HOME}; dbt test --models "aggregate_transaction_features"
""",
dag=dag,
)

# Generate new transformed feature values
dbt_run = BashOperator(
task_id="dbt_run",
bash_command="""
cd ${AIRFLOW_HOME}; dbt run --models "aggregate_transaction_features"
""",
dag=dag,
)

# Use Feast to make these feature values available in a low latency store
@task()
def materialize(data_interval_start=None, data_interval_end=None):
repo_config = RepoConfig(
registry=RegistryConfig(
registry_type="sql",
path="postgresql://postgres:mysecretpassword@[YOUR-RDS-ENDPOINT:PORT]/feast",
),
project="feast_demo",
provider="local",
offline_store=SparkOfflineStoreConfig(
spark_conf={
"spark.ui.enabled": "false",
"spark.eventLog.enabled": "false",
"spark.sql.catalogImplementation": "hive",
"spark.sql.parser.quotedRegexColumnNames": "true",
"spark.sql.session.timeZone": "UTC",
}
),
online_store=DynamoDBOnlineStoreConfig(region="us-west-1"),
entity_key_serialization_version=2,
)
# Needed for Mac OS users because of a seg fault in requests for standalone Airflow (not needed in prod)
os.environ["NO_PROXY"] = "*"
os.environ["SPARK_LOCAL_IP"] = "127.0.0.1"
store = FeatureStore(config=repo_config)
# Add 1 hr overlap to account for late data
# Note: normally, you'll probably have different feature views with different freshness requirements, instead
# of materializing all feature views every day.
store.materialize(data_interval_start.subtract(hours=1), data_interval_end)

# Setup DAG
dbt_test >> dbt_run >> materialize()
33 changes: 33 additions & 0 deletions module_3_db/airflow_demo/setup_airflow.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# Airflow needs a home. `~/airflow` is the default, but you can put it
# somewhere else if you prefer (optional)
export AIRFLOW_HOME=$(pwd)/airflow_home
export AIRFLOW__CORE__LOAD_EXAMPLES=False
# TODO: UPDATE THIS
export AIRFLOW_CONN_DATABRICKS_DEFAULT='databricks://@host-url?token=yourtoken'

# Cleanup previous state, if it exists
rm -rf $AIRFLOW_HOME

# Install Airflow using the constraints file
AIRFLOW_VERSION=2.4.0
PYTHON_VERSION="$(python --version | cut -d " " -f 2 | cut -d "." -f 1-2)"
# For example: 3.7
CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt"
# For example: https://raw.githubusercontent.com/apache/airflow/constraints-2.4.0/constraints-3.7.txt
pip install "apache-airflow==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}"

# Setup Feast dags
mkdir -p $AIRFLOW_HOME/dags
cp dag.py $AIRFLOW_HOME/dags

# Setup dbt dags
cd ../dbt/feast_demo
cp -R * $AIRFLOW_HOME
cd $AIRFLOW_HOME

# The Standalone command will initialise the database, make a user,
# and start all components for you.
airflow standalone

# Visit localhost:8080 in the browser and use the admin account details
# shown on the terminal to login.
Binary file added module_3_db/architecture.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
File renamed without changes.
29 changes: 29 additions & 0 deletions module_3_db/dbt/feast_demo/dbt_project.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@

# Name your project! Project names should contain only lowercase characters
# and underscores. A good package name should reflect your organization's
# name or the intended use of these models
name: 'feast_demo'
version: '1.0.0'
config-version: 2

# This setting configures which "profile" dbt uses for this project.
profile: 'feast_demo'

# These configurations specify where dbt should look for different types of files.
# The `model-paths` config, for example, states that models in this project can be
# found in the "models/" directory. You probably won't need to change these!
model-paths: ["models"]
macro-paths: ["macros"]


target-path: "target" # directory which will store compiled SQL files
clean-targets: # directories to be removed by `dbt clean`
- "target"
- "dbt_packages"

models:
feast_demo:
example:
materialized: view
location_root: '[YOUR S3 BUCKET/FOLDER]'
file_format: 'delta'
7 changes: 7 additions & 0 deletions module_3_db/dbt/feast_demo/macros/macro.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{% macro prev_day_partition() %}
(SELECT MAX(timestamp)::date FROM {{ this }})
{% endmacro %}

{% macro next_day_partition() %}
(SELECT date_add(MAX(timestamp)::date, 1) FROM {{ this }})
{% endmacro %}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
{{
config(
materialized='incremental',
file_format='parquet',
incremental_strategy='append'
)
}}

SELECT *
FROM
(
SELECT
user_id,
to_timestamp(timestamp) AS timestamp,
SUM(amt) OVER (
PARTITION BY user_id
ORDER BY to_timestamp(timestamp)
RANGE BETWEEN INTERVAL 1 day PRECEDING AND CURRENT ROW
) AS amt_sum_1d_10m,
AVG(amt) OVER (
PARTITION BY user_id
ORDER BY to_timestamp(timestamp)
RANGE BETWEEN INTERVAL 1 day PRECEDING AND CURRENT ROW
) AS amt_mean_1d_10m
FROM demo_fraud_v2.transactions
{% if is_incremental() %}
WHERE
partition_0 BETWEEN date_format({{ prev_day_partition() }}, "yyyy") AND date_format({{ next_day_partition() }}, "yyyy") AND
partition_1 BETWEEN date_format({{ prev_day_partition() }}, "MM") AND date_format({{ next_day_partition() }}, "MM") AND
partition_2 BETWEEN date_format({{ prev_day_partition() }}, "dd") AND date_format({{ next_day_partition() }}, "dd")
{% else %}
-- Hack to simulate we started on 2021-06-01
WHERE
partition_0 = "2022" AND
partition_1 = "04" AND
partition_2 = "20"
{% endif %}
)
{% if is_incremental() %}
-- Need to only produce values in this window
WHERE timestamp > (SELECT MAX(timestamp) FROM {{ this }})
{% endif %}
11 changes: 11 additions & 0 deletions module_3_db/dbt/feast_demo/models/example/schema.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@

version: 2

models:
- name: aggregate_transaction_features
description: ""
columns:
- name: "user_id"
description: "The primary key for this table"
tests:
- not_null
File renamed without changes
Binary file added module_3_db/demo_images/db_ghf.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added module_3_db/demo_images/db_gof.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added module_3_db/demo_images/db_materialize.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added module_3_db/demo_images/db_stream_agg.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added module_3_db/demo_images/dbt.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added module_3_db/demo_images/df_stream_setup.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
22 changes: 22 additions & 0 deletions module_3_db/feature_repo/data_sources.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
from feast import PushSource
from feast.infra.offline_stores.contrib.spark_offline_store.spark_source import (
SparkSource,
)

# Historical log of transactions stream
transactions_source = SparkSource(
name="transactions_source",
table="demo_fraud_v2.transactions",
timestamp_field="timestamp",
)

# Precomputed aggregate transaction feature values (batch / stream)
aggregate_transactions_source = PushSource(
name="transactions_1d",
batch_source=SparkSource(
name="transactions_1d_batch",
table="demo_fraud_v2.aggregate_transaction_features",
timestamp_field="timestamp",
tags={"dbtModel": "models/example/aggregate_transaction_features.sql"},
),
)
9 changes: 9 additions & 0 deletions module_3_db/feature_repo/entities.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
from feast import (
Entity,
)

user = Entity(
name="user",
join_keys=["user_id"],
description="user id",
)
9 changes: 9 additions & 0 deletions module_3_db/feature_repo/feature_services.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
from feast import FeatureService

from features import *

feature_service_1 = FeatureService(
name="model_v1",
features=[user_transaction_amount_metrics],
owner="[email protected]",
)
17 changes: 17 additions & 0 deletions module_3_db/feature_repo/feature_store.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
project: feast_demo
provider: aws
registry: # where this repo's metadata is stored
registry_type: sql
path: postgresql://postgres:mysecretpassword@[your-rds-instance]:5432/feast
online_store: # low latency online storage
type: dynamodb
region: us-west-1
offline_store:
type: spark
spark_conf: # Note: pip install -U "databricks-connect"
spark.ui.enabled: "false"
spark.eventLog.enabled: "false"
spark.sql.catalogImplementation: "hive"
spark.sql.parser.quotedRegexColumnNames: "true"
spark.sql.session.timeZone: "UTC"
entity_key_serialization_version: 2
26 changes: 26 additions & 0 deletions module_3_db/feature_repo/features.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
from datetime import timedelta

from feast import (
FeatureView,
Field,
)
from feast.types import String, Float64

from data_sources import *
from entities import *

user_transaction_amount_metrics = FeatureView(
name="user_transaction_amount_metrics",
description="User transaction features",
entities=[user],
ttl=timedelta(seconds=8640000000),
schema=[
Field(name="user_id", dtype=String),
Field(name="amt_sum_1d_10m", dtype=Float64),
Field(name="amt_mean_1d_10m", dtype=Float64),
],
online=True,
source=aggregate_transactions_source,
tags={"production": "True"},
owner="[email protected]",
)
Binary file added module_3_db/orchestrate.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
1 change: 1 addition & 0 deletions module_3_db/sample_db_notebook.ipynb

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions module_3/README.md → module_3_sf/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ This is a very similar module to module 1. The key difference is now we'll be us
- **Data sources**: Snowflake
- **Online store**: Redis
- **Orchestrator**: Airflow + dbt
- **Use case**: Predicting churn for drivers in real time.
- **Use case**: Fraud detection

<img src="architecture.png" width=750>

Expand Down Expand Up @@ -44,7 +44,7 @@ This is a very similar module to module 1. The key difference is now we'll be us
# Workshop
## Step 1: Install Feast

First, we install Feast with Spark and Postgres and Redis support:
First, we install Feast with Snowflake and Postgres and Redis support:
```bash
pip install "feast[snowflake,postgres,redis]"
```
Expand Down
Binary file added module_3_sf/airflow.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
File renamed without changes.
File renamed without changes.
File renamed without changes
File renamed without changes.
File renamed without changes.
4 changes: 4 additions & 0 deletions module_3_sf/dbt/feast_demo/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@

target/
dbt_packages/
logs/
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.

0 comments on commit 8652344

Please sign in to comment.