Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expose join time-range to python API #900

Draft
wants to merge 8 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions feathr_project/feathr/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@
'Int64VectorFeatureType',
'DoubleVectorFeatureType',
'FeatureNameValidationError',
'AbsoluteTimeRange',
'RelativeTimeRange',
'ObservationSettings',
'FeaturePrinter',
'SparkExecutionConfiguration',
Expand Down
127 changes: 105 additions & 22 deletions feathr_project/feathr/definition/settings.py
Original file line number Diff line number Diff line change
@@ -1,41 +1,124 @@
from abc import ABC
from dataclasses import dataclass
from typing import Optional

from jinja2 import Template
from loguru import logger

from feathr.definition.feathrconfig import HoconConvertible


@dataclass
class BaseTimeRange(ABC):
pass


@dataclass
class AbsoluteTimeRange(BaseTimeRange):
start_time: str # E.g. "2020-01-01"
end_time: str # E.g. "2020-01-31"
time_format: str = "yyyy-MM-dd"


@dataclass
class RelativeTimeRange(BaseTimeRange):
offset: str # E.g. 1d
window: str # E.g. 3d


class ObservationSettings(HoconConvertible):
"""Time settings of the observation data. Used in feature join.
The data time settings pertaining to how much of the input dataset is to be loaded from the timestamp column.
This is a way in which the input data can be restricted to allow only a fixed interval of dates to be joined with the feature data.
This restriction will apply on the timestamp column of the input data.

For example, startDate: "20200522", endDate: "20200525" implies this feature should be joined with the input data starting from
22nd May 2020 to 25th May, 2020 with both dates included.

Attributes:
observation_path: path to the observation dataset, i.e. input dataset to get with features
event_timestamp_column (Optional[str]): The timestamp field of your record. As sliding window aggregation feature assume each record in the source data should have a timestamp column.
timestamp_format (Optional[str], optional): The format of the timestamp field. Defaults to "epoch". Possible values are:
- `epoch` (seconds since epoch), for example `1647737463`
- `epoch_millis` (milliseconds since epoch), for example `1647737517761`
- Any date formats supported by [SimpleDateFormat](https://docs.oracle.com/javase/8/docs/api/java/text/SimpleDateFormat.html).
observation_path (str): Path to the observation dataset, i.e. input dataset to get with features.
observation_data_time_settings (str): Settings which have parameters specifying how to load the observation data.
join_time_settings (str): Settings which have parameters specifying how to join the observation data with the feature data.
"""
def __init__(self,
observation_path: str,
event_timestamp_column: Optional[str] = None,
timestamp_format: str = "epoch") -> None:
self.event_timestamp_column = event_timestamp_column
self.timestamp_format = timestamp_format
observation_data_time_settings: str = None
join_time_settings: str = None

def __init__(
self,
observation_path: str,
event_timestamp_column: Optional[str] = None,
timestamp_format: Optional[str] = "epoch",
use_latest_feature_data: Optional[bool] = False,
observation_time_range: Optional[BaseTimeRange] = None,
) -> None:
"""Initialize observation settings.

Args:
observation_path: Path to the observation dataset, i.e. input dataset to get with features.
event_timestamp_column: The timestamp field of your record as sliding window aggregation (SWA) feature
assume each record in the source data should have a timestamp column.
timestamp_format: The format of the timestamp field. Defaults to "epoch". Possible values are:
- `epoch` (seconds since epoch), for example `1647737463`
- `epoch_millis` (milliseconds since epoch), for example `1647737517761`
- Any date formats supported by [SimpleDateFormat](https://docs.oracle.com/javase/8/docs/api/java/text/SimpleDateFormat.html).
use_latest_feature_data: Use the latest feature data when join. This should not be set together with event_timestamp_column
observation_time_range: The time range of the observation data.
"""
self.observation_path = observation_path
if observation_path.startswith("http"):
logger.warning("Your observation_path {} starts with http, which is not supported. Consider using paths starting with wasb[s]/abfs[s]/s3.", observation_path)

# Settings which have parameters specifying how to load the observation data.
if observation_time_range:
if isinstance(observation_time_range, AbsoluteTimeRange):
self.observation_data_time_settings = f"""{{
absoluteTimeRange: {{
startTime: "{observation_time_range.start_time}"
endTime: "{observation_time_range.end_time}"
timeFormat: "{observation_time_range.time_format}"
}}
}}"""
elif isinstance(observation_time_range, RelativeTimeRange):
self.observation_data_time_settings = f"""{{
relativeTimeRange: {{
offset: {observation_time_range.offset}
window: {observation_time_range.window}
}}
}}"""
else:
raise ValueError(f"Unsupported observation_time_range type {type(observation_time_range)}")

# Settings which have parameters specifying how to join the observation data with the feature data.
if use_latest_feature_data:
if event_timestamp_column:
raise ValueError("use_latest_feature_data cannot set together with event_timestamp_column")

self.join_time_settings = """{
useLatestFeatureData: true
}"""
elif event_timestamp_column:
self.join_time_settings = f"""{{
timestampColumn: {{
def: "{event_timestamp_column}"
format: "{timestamp_format}"
}}
}}"""

# TODO implement `simulateTimeDelay: 1d` -- This is the global setting, and should be applied to all the features
# except those specified using timeDelayOverride (should introduce "timeDelayOverride" to Feature spec).

def to_feature_config(self) -> str:
tm = Template("""
{% if setting.event_timestamp_column is not none %}
settings: {
joinTimeSettings: {
timestampColumn: {
def: "{{setting.event_timestamp_column}}"
format: "{{setting.timestamp_format}}"
}
}
}
{% if (setting.observation_data_time_settings is not none) or (setting.join_time_settings is not none) %}
settings: {
{% if setting.observation_data_time_settings is not none %}
observationDataTimeSettings: {{setting.observation_data_time_settings}}
{% endif %}
{% if setting.join_time_settings is not none %}
joinTimeSettings: {{setting.join_time_settings}}
{% endif %}
observationPath: "{{setting.observation_path}}"
}
{% endif %}
observationPath: "{{setting.observation_path}}"
""")
return tm.render(setting=self)
3 changes: 2 additions & 1 deletion feathr_project/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ multi_line_output = 3
[tool.pytest.ini_options]
markers = [
"notebooks: Jupyter notebook tests. Target Spark platform can be either Azure Synapse, Databricks, or Local Spark.",
"databricks: Jupyter notebook tests. Target Spark platform must be Databricks",
"databricks: Jupyter notebook tests. Target Spark platform must be Databricks.",
"integration: Integration tests."
]

[build-system]
Expand Down
17 changes: 14 additions & 3 deletions feathr_project/test/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,23 @@ def workspace_dir() -> str:
return str(Path(__file__).parent.resolve().joinpath("test_user_workspace"))


# TODO we can use this later
# @pytest.fixture
# def mock_data_path(workspace_dir):
# return str(Path(workspace_dir).joinpath(
# "mockdata",
# "[email protected]",
# "demo_data",
# "green_tripdata_2020-04.csv",
# ))


@pytest.fixture(scope="function")
def feathr_client(workspace_dir) -> FeathrClient:
def feathr_client(config_path) -> FeathrClient:
"""Test function-scoped Feathr client.
Note, cluster target (local, databricks, synapse) maybe overriden by the environment variables set at test machine.
Note, cluster target (local, databricks, synapse) maybe overridden by the environment variables set at test machine.
"""
return FeathrClient(config_path=str(Path(workspace_dir, "feathr_config.yaml")))
return FeathrClient(config_path=config_path)


@pytest.fixture(scope="module")
Expand Down
Empty file.
152 changes: 152 additions & 0 deletions feathr_project/test/integration/test_observation_settings.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
from datetime import datetime, timedelta
from pathlib import Path
from tempfile import TemporaryDirectory

import pandas as pd
import pytest

from feathr import (
AbsoluteTimeRange, RelativeTimeRange, ObservationSettings,
HdfsSource,
TypedKey, ValueType,
Feature, FeatureAnchor, WindowAggTransformation,
# Feature types
FLOAT,
INPUT_CONTEXT,
FeatureQuery,
)
from feathr.utils.job_utils import get_result_df


TIMESTAMP_COL = "timestamp"
KEY_COL = "location_id"
FEATURE_COL = "fare"


@pytest.fixture(scope="session")
def mock_df():
"""Mock data for testing.
"""
# TODO Currently we're using "today" since `useLatestFeatureData` uses now().
# Once the behavior is changed to use the latest timestamp in the data, we can use fixed test data instead of creating new one everytime.
today = datetime.now().date()
date_range = list(pd.date_range(start=today-timedelta(days=4), end=today, freq="D"))
return pd.DataFrame({
TIMESTAMP_COL: date_range + date_range,
KEY_COL: [1, 1, 1, 1, 1, 2, 2, 2, 2, 2],
FEATURE_COL: [5.5, 10.0, 6.0, 8.0, 2.5, 38.0, 12.0, 52.0, 180.0, 3.5],
})


@pytest.mark.integration
def test__observation_settings(feathr_client, mock_df):
tmp_dir = TemporaryDirectory()

mock_data_path = str(Path(tmp_dir.name, "mock_data.csv"))
mock_df.to_csv(str(mock_data_path), index=False)

# Upload data into dbfs or adls
if feathr_client.spark_runtime != "local":
mock_data_path = feathr_client.feathr_spark_launcher.upload_or_get_cloud_path(mock_data_path)

# Define agg features
source = HdfsSource(
name="source",
path=mock_data_path,
event_timestamp_column=TIMESTAMP_COL,
timestamp_format="yyyy-MM-dd", # yyyy/MM/dd/HH/mm/ss
)
key = TypedKey(
key_column=KEY_COL,
key_column_type=ValueType.INT32,
description="key",
full_name=KEY_COL,
)
agg_features = [
Feature(
name=f"f_{FEATURE_COL}",
key=key,
feature_type=FLOAT,
transform=WindowAggTransformation(
agg_expr=f"cast_float({FEATURE_COL})",
agg_func="MAX",
window="2d", # 2 days sliding window
),
),
]
agg_anchor = FeatureAnchor(
name="agg_anchor",
source=source,
features=agg_features,
)

feathr_client.build_features(
anchor_list=[
agg_anchor,
]
)

query = [
FeatureQuery(
feature_list=[
f"f_{FEATURE_COL}",
],
key=key,
),
]

test_parameters__expected_values = [
(
dict(event_timestamp_column=TIMESTAMP_COL),
# Max value by the sliding window '2d'
[5.5, 10., 10., 8., 8., 38., 38., 52., 180., 180.],
),
(
dict(use_latest_feature_data=True),
# The latest feature values: Time window is '2d' and thus the feature values for each key is 8.0 and 180.0
[8.0, 8.0, 8.0, 8.0, 8.0, 180.0, 180.0, 180.0, 180.0, 180.0],
),
(
dict(
event_timestamp_column=TIMESTAMP_COL,
observation_time_range=RelativeTimeRange(offset="3d", window="2d"),
),
# TODO BUG - RelativeTimeRange doesn't have any effect on the result
[5.5, 10., 10., 8., 8., 38., 38., 52., 180., 180.],
),
(
dict(
event_timestamp_column=TIMESTAMP_COL,
observation_time_range=AbsoluteTimeRange(
start_time=mock_df[TIMESTAMP_COL].max().date().isoformat(),
end_time=mock_df[TIMESTAMP_COL].max().date().isoformat(),
time_format="yyyy-MM-dd",
),
),
# TODO BUG - AbsoluteTimeRange doesn't have any effect on the result
[5.5, 10., 10., 8., 8., 38., 38., 52., 180., 180.],
),
]

for test_params, expected_values in test_parameters__expected_values:
settings = ObservationSettings(
observation_path=mock_data_path,
timestamp_format="yyyy-MM-dd",
**test_params,
)

output_path = str(Path(Path(mock_data_path).parent, "output.avro"))

feathr_client.get_offline_features(
observation_settings=settings,
feature_query=query,
output_path=output_path,
)

feathr_client.wait_job_to_finish(timeout_sec=5000)

# download result and assert the returned result
res_df = get_result_df(feathr_client)
res_df.sort_values(by=[KEY_COL, TIMESTAMP_COL], inplace=True)
assert res_df[f"f_{FEATURE_COL}"].tolist() == expected_values
# print(res_df)
Empty file.
33 changes: 0 additions & 33 deletions feathr_project/test/test_observation_setting.py

This file was deleted.

Empty file.
Empty file.
Empty file.
Loading