Skip to content

Commit

Permalink
Merge pull request #23 from franciscojavierarceo/odfv-profiling
Browse files Browse the repository at this point in the history
Odfv profiling
  • Loading branch information
franciscojavierarceo authored Jun 8, 2024
2 parents 2a7ce37 + 9b5f953 commit 6485133
Show file tree
Hide file tree
Showing 10 changed files with 390 additions and 0 deletions.
29 changes: 29 additions & 0 deletions demos/feast_odfv_profiling/easy_ox/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# Feast Quickstart
If you haven't already, check out the quickstart guide on Feast's website (http://docs.feast.dev/quickstart), which
uses this repo. A quick view of what's in this repository's `feature_repo/` directory:

* `data/` contains raw demo parquet data
* `feature_repo/example_repo.py` contains demo feature definitions
* `feature_repo/feature_store.yaml` contains a demo setup configuring where data sources are
* `feature_repo/test_workflow.py` showcases how to run all key Feast commands, including defining, retrieving, and pushing features.

You can run the overall workflow with `python test_workflow.py`.

## To move from this into a more production ready workflow:
> See more details in [Running Feast in production](https://docs.feast.dev/how-to-guides/running-feast-in-production)
1. First: you should start with a different Feast template, which delegates to a more scalable offline store.
- For example, running `feast init -t gcp`
or `feast init -t aws` or `feast init -t snowflake`.
- You can see your options if you run `feast init --help`.
2. `feature_store.yaml` points to a local file as a registry. You'll want to setup a remote file (e.g. in S3/GCS) or a
SQL registry. See [registry docs](https://docs.feast.dev/getting-started/concepts/registry) for more details.
3. This example uses a file [offline store](https://docs.feast.dev/getting-started/architecture-and-components/offline-store)
to generate training data. It does not scale. We recommend instead using a data warehouse such as BigQuery,
Snowflake, Redshift. There is experimental support for Spark as well.
4. Setup CI/CD + dev vs staging vs prod environments to automatically update the registry as you change Feast feature definitions. See [docs](https://docs.feast.dev/how-to-guides/running-feast-in-production#1.-automatically-deploying-changes-to-your-feature-definitions).
5. (optional) Regularly scheduled materialization to power low latency feature retrieval (e.g. via Airflow). See [Batch data ingestion](https://docs.feast.dev/getting-started/concepts/data-ingestion#batch-data-ingestion)
for more details.
6. (optional) Deploy feature server instances with `feast serve` to expose endpoints to retrieve online features.
- See [Python feature server](https://docs.feast.dev/reference/feature-servers/python-feature-server) for details.
- Use cases can also directly call the Feast client to fetch features as per [Feature retrieval](https://docs.feast.dev/getting-started/concepts/feature-retrieval)
Empty file.
Empty file.
Binary file not shown.
155 changes: 155 additions & 0 deletions demos/feast_odfv_profiling/easy_ox/feature_repo/example_repo.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
# This is an example feature definition file

from datetime import timedelta

import pandas as pd

from feast import (
Entity,
FeatureService,
FeatureView,
Field,
FileSource,
PushSource,
RequestSource,
)
from feast.on_demand_feature_view import on_demand_feature_view
from feast.types import Float32, Float64, Int64
from typing import Dict, Any

# Define an entity for the driver. You can think of an entity as a primary key used to
# fetch features.
driver = Entity(name="driver", join_keys=["driver_id"])

# Read data from parquet files. Parquet is convenient for local development mode. For
# production, you can use your favorite DWH, such as BigQuery. See Feast documentation
# for more info.
driver_stats_source = FileSource(
name="driver_hourly_stats_source",
path="/Users/farceo/dev/Python/demos/feast_odfv_profiling/easy_ox/feature_repo/data/driver_stats.parquet",
timestamp_field="event_timestamp",
created_timestamp_column="created",
)

# Our parquet files contain sample data that includes a driver_id column, timestamps and
# three feature column. Here we define a Feature View that will allow us to serve this
# data to our model online.
driver_stats_fv = FeatureView(
# The unique name of this feature view. Two feature views in a single
# project cannot have the same name
name="driver_hourly_stats",
entities=[driver],
ttl=timedelta(days=1),
# The list of features defined below act as a schema to both define features
# for both materialization of features into a store, and are used as references
# during retrieval for building a training dataset or serving features
schema=[
Field(name="conv_rate", dtype=Float32),
Field(name="acc_rate", dtype=Float32),
Field(name="avg_daily_trips", dtype=Int64, description="Average daily trips"),
],
online=True,
source=driver_stats_source,
# Tags are user defined key/value pairs that are attached to each
# feature view
tags={"team": "driver_performance"},
)

# Define a request data source which encodes features / information only
# available at request time (e.g. part of the user initiated HTTP request)
input_request = RequestSource(
name="vals_to_add",
schema=[
Field(name="val_to_add", dtype=Int64),
Field(name="val_to_add_2", dtype=Int64),
],
)


# Define an on demand feature view which can generate new features based on
# existing feature views and RequestSource features
@on_demand_feature_view(
sources=[driver_stats_fv, input_request],
schema=[
Field(name="conv_rate_plus_val1", dtype=Float64),
Field(name="conv_rate_plus_val2", dtype=Float64),
],
)
def transformed_conv_rate(inputs: pd.DataFrame) -> pd.DataFrame:
df = pd.DataFrame()
df["conv_rate_plus_val1"] = inputs["conv_rate"] + inputs["val_to_add"]
df["conv_rate_plus_val2"] = inputs["conv_rate"] + inputs["val_to_add_2"]
return df


# This groups features into a model version
driver_activity_v1 = FeatureService(
name="driver_activity_v1",
features=[
driver_stats_fv[["conv_rate"]], # Sub-selects a feature from a feature view
transformed_conv_rate, # Selects all features from the feature view
],
)
driver_activity_v2 = FeatureService(
name="driver_activity_v2", features=[driver_stats_fv, transformed_conv_rate]
)

# Defines a way to push data (to be available offline, online or both) into Feast.
driver_stats_push_source = PushSource(
name="driver_stats_push_source",
batch_source=driver_stats_source,
)

# Defines a slightly modified version of the feature view from above, where the source
# has been changed to the push source. This allows fresh features to be directly pushed
# to the online store for this feature view.
driver_stats_fresh_fv = FeatureView(
name="driver_hourly_stats_fresh",
entities=[driver],
ttl=timedelta(days=1),
schema=[
Field(name="conv_rate", dtype=Float32),
Field(name="acc_rate", dtype=Float32),
Field(name="avg_daily_trips", dtype=Int64),
],
online=True,
source=driver_stats_push_source, # Changed from above
tags={"team": "driver_performance"},
)


# Define an on demand feature view which can generate new features based on
# existing feature views and RequestSource features
@on_demand_feature_view(
sources=[driver_stats_fresh_fv, input_request], # relies on fresh version of FV
schema=[
Field(name="conv_rate_plus_val1", dtype=Float64),
Field(name="conv_rate_plus_val2", dtype=Float64),
],
)
def transformed_conv_rate_fresh(inputs: pd.DataFrame) -> pd.DataFrame:
df = pd.DataFrame()
df["conv_rate_plus_val1"] = inputs["conv_rate"] + inputs["val_to_add"]
df["conv_rate_plus_val2"] = inputs["conv_rate"] + inputs["val_to_add_2"]
return df


@on_demand_feature_view(
sources=[driver_stats_fresh_fv, input_request], # relies on fresh version of FV
schema=[
Field(name="conv_rate_plus_val1", dtype=Float64),
Field(name="conv_rate_plus_val2", dtype=Float64),
],
mode="python",
)
def transformed_conv_rate_fresh_python(inputs: dict[str, Any]) -> dict[str, Any]:
output = {
"conv_rate_plus_val1": inputs["conv_rate"] + inputs["val_to_add"],
"conv_rate_plus_val2": inputs["conv_rate"] + inputs["val_to_add_2"]
}
return output

driver_activity_v3 = FeatureService(
name="driver_activity_v3",
features=[driver_stats_fresh_fv, transformed_conv_rate_fresh],
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
project: easy_ox
# By default, the registry is a file (but can be turned into a more scalable SQL-backed registry)
registry: data/registry.db
# The provider primarily specifies default offline / online stores & storing the registry in a given cloud
provider: local
online_store:
type: sqlite
path: data/online_store.db
entity_key_serialization_version: 2
Binary file not shown.
Binary file not shown.
67 changes: 67 additions & 0 deletions demos/feast_odfv_profiling/easy_ox/feature_repo/profile_odfv.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
import cProfile

import subprocess
from datetime import datetime

import pandas as pd
from datetime import datetime

from feast import FeatureStore

entity_rows = [
# {join_key: entity_value}
{
"driver_id": 1001,
"val_to_add": 1000,
"val_to_add_2": 2000,
},
{
"driver_id": 1002,
"val_to_add": 1001,
"val_to_add_2": 2002,
},
]

store = FeatureStore(repo_path=".")
store.materialize_incremental(end_date=datetime.now())

def odfv_pandas():
features_to_fetch = [
"transformed_conv_rate_fresh:conv_rate_plus_val1",
"transformed_conv_rate_fresh:conv_rate_plus_val2",
]
returned_features = store.get_online_features(
features=features_to_fetch,
entity_rows=entity_rows,
).to_dict()

def odfv_python():
features_to_fetch = [
"transformed_conv_rate_fresh_python:conv_rate_plus_val1",
"transformed_conv_rate_fresh_python:conv_rate_plus_val2",
]
returned_features = store.get_online_features(
features=features_to_fetch,
entity_rows=entity_rows,
).to_dict()


def main():
print("running pandas odfv...")
profiler = cProfile.Profile()
profiler.enable()
odfv_pandas()
profiler.disable()
profiler.dump_stats("odfv_pandas.prof")

print("running python odfv...")
profiler = cProfile.Profile()
profiler.enable()
odfv_python()
profiler.disable()
profiler.dump_stats("odfv_python.prof")
print("...done")


if __name__ == "__main__":
main()
130 changes: 130 additions & 0 deletions demos/feast_odfv_profiling/easy_ox/feature_repo/test_workflow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
import subprocess
from datetime import datetime

import pandas as pd

from feast import FeatureStore
from feast.data_source import PushMode


def run_demo():
store = FeatureStore(repo_path=".")
print("\n--- Run feast apply ---")
subprocess.run(["feast", "apply"])

#print("\n--- Historical features for training ---")
#fetch_historical_features_entity_df(store, for_batch_scoring=False)

#print("\n--- Historical features for batch scoring ---")
#fetch_historical_features_entity_df(store, for_batch_scoring=True)

print("\n--- Load features into online store ---")
store.materialize_incremental(end_date=datetime.now())

print("\n--- Online features ---")
fetch_online_features(store)

#print("\n--- Online features retrieved (instead) through a feature service---")
#fetch_online_features(store, source="feature_service")

#print(
# "\n--- Online features retrieved (using feature service v3, which uses a feature view with a push source---"
#)
#fetch_online_features(store, source="push")

print("\n--- Simulate a stream event ingestion of the hourly stats df ---")
event_df = pd.DataFrame.from_dict(
{
"driver_id": [1001],
"event_timestamp": [
datetime.now(),
],
"created": [
datetime.now(),
],
"conv_rate": [1.0],
"acc_rate": [1.0],
"avg_daily_trips": [1000],
}
)
print(event_df)
#store.push("driver_stats_push_source", event_df, to=PushMode.ONLINE_AND_OFFLINE)

#print("\n--- Online features again with updated values from a stream push---")
#fetch_online_features(store, source="push")

print("\n--- Run feast teardown ---")
#subprocess.run(["feast", "teardown"])


def fetch_historical_features_entity_df(store: FeatureStore, for_batch_scoring: bool):
# Note: see https://docs.feast.dev/getting-started/concepts/feature-retrieval for more details on how to retrieve
# for all entities in the offline store instead
entity_df = pd.DataFrame.from_dict(
{
# entity's join key -> entity values
"driver_id": [1001, 1002, 1003],
# "event_timestamp" (reserved key) -> timestamps
"event_timestamp": [
datetime(2021, 4, 12, 10, 59, 42),
datetime(2021, 4, 12, 8, 12, 10),
datetime(2021, 4, 12, 16, 40, 26),
],
# (optional) label name -> label values. Feast does not process these
"label_driver_reported_satisfaction": [1, 5, 3],
# values we're using for an on-demand transformation
"val_to_add": [1, 2, 3],
"val_to_add_2": [10, 20, 30],
}
)
# For batch scoring, we want the latest timestamps
if for_batch_scoring:
entity_df["event_timestamp"] = pd.to_datetime("now", utc=True)

training_df = store.get_historical_features(
entity_df=entity_df,
features=[
"driver_hourly_stats:conv_rate",
"driver_hourly_stats:acc_rate",
"driver_hourly_stats:avg_daily_trips",
"transformed_conv_rate:conv_rate_plus_val1",
"transformed_conv_rate:conv_rate_plus_val2",
],
).to_df()
print(training_df.head())


def fetch_online_features(store, source: str = ""):
entity_rows = [
# {join_key: entity_value}
{
"driver_id": 1001,
"val_to_add": 1000,
"val_to_add_2": 2000,
},
{
"driver_id": 1002,
"val_to_add": 1001,
"val_to_add_2": 2002,
},
]
if source == "feature_service":
features_to_fetch = store.get_feature_service("driver_activity_v1")
elif source == "push":
features_to_fetch = store.get_feature_service("driver_activity_v3")
else:
features_to_fetch = [
"driver_hourly_stats:acc_rate",
"transformed_conv_rate:conv_rate_plus_val1",
"transformed_conv_rate:conv_rate_plus_val2",
]
returned_features = store.get_online_features(
features=features_to_fetch,
entity_rows=entity_rows,
).to_dict()
for key, value in sorted(returned_features.items()):
print(key, " : ", value)


if __name__ == "__main__":
run_demo()

0 comments on commit 6485133

Please sign in to comment.