Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added an example for using PostgreSQL as a database registry for Feast #5

Merged
merged 1 commit into from
Feb 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 85 additions & 0 deletions feast_postgres/Readme.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
### 1. Install PostgreSQL on Openshift using OpenShift Template



![postgres.png](postgres.png)


![postgres-1.png](postgres-1.png)

### 2. Create a feature repository


![create-feast-postgres.png](create-feast-postgres.png)

### 3. Port Forward PostgreSQL Database

```
oc port-forward postgresql-1-fgn46 5432
```

### 4. Create Feature Store Deployment

```
cd feature_repo
feast apply
```

- **Verify that the Postgres DB is updated with the Feast Registry**.

![postgres-feast-registry.png](postgres-feast-registry.png)


### 5. Deploy Feature Server Instances on Openshift:

Set the environment variables in [feature_store.yaml](feature_repo%2Ffeature_store.yaml) and its will be to look like as

```yaml
project: feast_postgres
provider: local
registry:
registry_store_type: PostgreSQLRegistryStore
path: feast_registry
host: postgresql.feast.svc.cluster.local
port: 5432
database: feast
db_schema: feast
user: ${PG_USERNAME}
password: ${PG_PASSWORD}
online_store:
type: postgres
host: postgresql.feast.svc.cluster.local
port: 5432
database: feast
db_schema: feast
user: ${PG_USERNAME}
password: ${PG_PASSWORD}
offline_store:
type: postgres
host: postgresql.feast.svc.cluster.local
port: 5432
database: feast
db_schema: feast
user: ${PG_USERNAME}
password: ${PG_PASSWORD}
entity_key_serialization_version: 2
```

- **Add Permissions to Security Context Constraint (SCC)**:

```bash
oc adm policy add-scc-to-user anyuid -z default -n <namespace>>
```
- Add the Feast Helm repository and update:
```bash
helm repo add feast-charts https://feast-helm-charts.storage.googleapis.com
helm repo update
```

- Deploy Feast on Openshift using Helm, by setting `feature_store.yaml` file as a base64 string from feature_repo directory:
```bash
helm install feast-release feast-charts/feast-feature-server \
--set feature_store_yaml_base64=$(base64 < feature_store.yaml)
```
### 6. Testing and Verification
![feast-test.png](feast-test.png)
Empty file added feast_postgres/__init__.py
Empty file.
Binary file added feast_postgres/create-feast-postgres.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added feast_postgres/feast-test.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Empty file.
131 changes: 131 additions & 0 deletions feast_postgres/feature_repo/example_repo.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
# This is an example feature definition file

from datetime import timedelta

import pandas as pd

from feast import Entity, FeatureService, FeatureView, Field, PushSource, RequestSource
from feast.infra.offline_stores.contrib.postgres_offline_store.postgres_source import (
PostgreSQLSource,
)
from feast.on_demand_feature_view import on_demand_feature_view
from feast.types import Float32, Float64, Int64

# Define an entity for the driver. You can think of an entity as a primary key used to
# fetch features.
driver = Entity(name="driver", join_keys=["driver_id"])

driver_stats_source = PostgreSQLSource(
name="driver_hourly_stats_source",
query="SELECT * FROM feast_driver_hourly_stats",
timestamp_field="event_timestamp",
created_timestamp_column="created",
)

# Our parquet files contain sample data that includes a driver_id column, timestamps and
# three feature column. Here we define a Feature View that will allow us to serve this
# data to our model online.
driver_stats_fv = FeatureView(
# The unique name of this feature view. Two feature views in a single
# project cannot have the same name
name="driver_hourly_stats",
entities=[driver],
ttl=timedelta(days=1),
# The list of features defined below act as a schema to both define features
# for both materialization of features into a store, and are used as references
# during retrieval for building a training dataset or serving features
schema=[
Field(name="conv_rate", dtype=Float32),
Field(name="acc_rate", dtype=Float32),
Field(name="avg_daily_trips", dtype=Int64),
],
online=True,
source=driver_stats_source,
# Tags are user defined key/value pairs that are attached to each
# feature view
tags={"team": "driver_performance"},
)

# Define a request data source which encodes features / information only
# available at request time (e.g. part of the user initiated HTTP request)
input_request = RequestSource(
name="vals_to_add",
schema=[
Field(name="val_to_add", dtype=Int64),
Field(name="val_to_add_2", dtype=Int64),
],
)


# Define an on demand feature view which can generate new features based on
# existing feature views and RequestSource features
@on_demand_feature_view(
sources=[driver_stats_fv, input_request],
schema=[
Field(name="conv_rate_plus_val1", dtype=Float64),
Field(name="conv_rate_plus_val2", dtype=Float64),
],
)
def transformed_conv_rate(inputs: pd.DataFrame) -> pd.DataFrame:
df = pd.DataFrame()
df["conv_rate_plus_val1"] = inputs["conv_rate"] + inputs["val_to_add"]
df["conv_rate_plus_val2"] = inputs["conv_rate"] + inputs["val_to_add_2"]
return df


# This groups features into a model version
driver_activity_v1 = FeatureService(
name="driver_activity_v1",
features=[
driver_stats_fv[["conv_rate"]], # Sub-selects a feature from a feature view
transformed_conv_rate, # Selects all features from the feature view
],
)
driver_activity_v2 = FeatureService(
name="driver_activity_v2", features=[driver_stats_fv, transformed_conv_rate]
)

# Defines a way to push data (to be available offline, online or both) into Feast.
driver_stats_push_source = PushSource(
name="driver_stats_push_source",
batch_source=driver_stats_source,
)

# Defines a slightly modified version of the feature view from above, where the source
# has been changed to the push source. This allows fresh features to be directly pushed
# to the online store for this feature view.
driver_stats_fresh_fv = FeatureView(
name="driver_hourly_stats_fresh",
entities=[driver],
ttl=timedelta(days=1),
schema=[
Field(name="conv_rate", dtype=Float32),
Field(name="acc_rate", dtype=Float32),
Field(name="avg_daily_trips", dtype=Int64),
],
online=True,
source=driver_stats_push_source, # Changed from above
tags={"team": "driver_performance"},
)


# Define an on demand feature view which can generate new features based on
# existing feature views and RequestSource features
@on_demand_feature_view(
sources=[driver_stats_fresh_fv, input_request], # relies on fresh version of FV
schema=[
Field(name="conv_rate_plus_val1", dtype=Float64),
Field(name="conv_rate_plus_val2", dtype=Float64),
],
)
def transformed_conv_rate_fresh(inputs: pd.DataFrame) -> pd.DataFrame:
df = pd.DataFrame()
df["conv_rate_plus_val1"] = inputs["conv_rate"] + inputs["val_to_add"]
df["conv_rate_plus_val2"] = inputs["conv_rate"] + inputs["val_to_add_2"]
return df


driver_activity_v3 = FeatureService(
name="driver_activity_v3",
features=[driver_stats_fresh_fv, transformed_conv_rate_fresh],
)
28 changes: 28 additions & 0 deletions feast_postgres/feature_repo/feature_store.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
project: feast_postgres
provider: local
registry:
registry_store_type: PostgreSQLRegistryStore
path: feast_registry
host: postgresql.feast.svc.cluster.local
port: 5432
database: feast
db_schema: feast
user: ${PG_USERNAME}
password: ${PG_PASSWORD}
online_store:
type: postgres
host: postgresql.feast.svc.cluster.local
port: 5432
database: feast
db_schema: feast
user: ${PG_USERNAME}
password: ${PG_PASSWORD}
offline_store:
type: postgres
host: postgresql.feast.svc.cluster.local
port: 5432
database: feast
db_schema: feast
user: ${PG_USERNAME}
password: ${PG_PASSWORD}
entity_key_serialization_version: 2
130 changes: 130 additions & 0 deletions feast_postgres/feature_repo/test_workflow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
import subprocess
from datetime import datetime

import pandas as pd

from feast import FeatureStore
from feast.data_source import PushMode


def run_demo():
store = FeatureStore(repo_path=".")
print("\n--- Run feast apply to setup feature store on Postgres ---")
subprocess.run(["feast", "apply"])

print("\n--- Historical features for training ---")
fetch_historical_features_entity_df(store, for_batch_scoring=False)

print("\n--- Historical features for batch scoring ---")
fetch_historical_features_entity_df(store, for_batch_scoring=True)

print("\n--- Load features into online store ---")
store.materialize_incremental(end_date=datetime.now())

print("\n--- Online features ---")
fetch_online_features(store)

print("\n--- Online features retrieved (instead) through a feature service---")
fetch_online_features(store, source="feature_service")

print(
"\n--- Online features retrieved (using feature service v3, which uses a feature view with a push source---"
)
fetch_online_features(store, source="push")

print("\n--- Simulate a stream event ingestion of the hourly stats df ---")
event_df = pd.DataFrame.from_dict(
{
"driver_id": [1001],
"event_timestamp": [
datetime.now(),
],
"created": [
datetime.now(),
],
"conv_rate": [1.0],
"acc_rate": [1.0],
"avg_daily_trips": [1000],
}
)
print(event_df)
store.push("driver_stats_push_source", event_df, to=PushMode.ONLINE_AND_OFFLINE)

print("\n--- Online features again with updated values from a stream push---")
fetch_online_features(store, source="push")

print("\n--- Run feast teardown ---")
subprocess.run(["feast", "teardown"])


def fetch_historical_features_entity_df(store: FeatureStore, for_batch_scoring: bool):
# Note: see https://docs.feast.dev/getting-started/concepts/feature-retrieval for more details on how to retrieve
# for all entities in the offline store instead
entity_df = pd.DataFrame.from_dict(
{
# entity's join key -> entity values
"driver_id": [1001, 1002, 1003],
# "event_timestamp" (reserved key) -> timestamps
"event_timestamp": [
datetime(2021, 4, 12, 10, 59, 42),
datetime(2021, 4, 12, 8, 12, 10),
datetime(2021, 4, 12, 16, 40, 26),
],
# (optional) label name -> label values. Feast does not process these
"label_driver_reported_satisfaction": [1, 5, 3],
# values we're using for an on-demand transformation
"val_to_add": [1, 2, 3],
"val_to_add_2": [10, 20, 30],
}
)
# For batch scoring, we want the latest timestamps
if for_batch_scoring:
entity_df["event_timestamp"] = pd.to_datetime("now", utc=True)

training_df = store.get_historical_features(
entity_df=entity_df,
features=[
"driver_hourly_stats:conv_rate",
"driver_hourly_stats:acc_rate",
"driver_hourly_stats:avg_daily_trips",
"transformed_conv_rate:conv_rate_plus_val1",
"transformed_conv_rate:conv_rate_plus_val2",
],
).to_df()
print(training_df.head())


def fetch_online_features(store, source: str = ""):
entity_rows = [
# {join_key: entity_value}
{
"driver_id": 1001,
"val_to_add": 1000,
"val_to_add_2": 2000,
},
{
"driver_id": 1002,
"val_to_add": 1001,
"val_to_add_2": 2002,
},
]
if source == "feature_service":
features_to_fetch = store.get_feature_service("driver_activity_v1")
elif source == "push":
features_to_fetch = store.get_feature_service("driver_activity_v3")
else:
features_to_fetch = [
"driver_hourly_stats:acc_rate",
"transformed_conv_rate:conv_rate_plus_val1",
"transformed_conv_rate:conv_rate_plus_val2",
]
returned_features = store.get_online_features(
features=features_to_fetch,
entity_rows=entity_rows,
).to_dict()
for key, value in sorted(returned_features.items()):
print(key, " : ", value)


if __name__ == "__main__":
run_demo()
Binary file added feast_postgres/postgres-1.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added feast_postgres/postgres-feast-registry.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added feast_postgres/postgres.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.