diff --git a/feathr_project/feathr/__init__.py b/feathr_project/feathr/__init__.py index 5c279b7d5..9073cf7be 100644 --- a/feathr_project/feathr/__init__.py +++ b/feathr_project/feathr/__init__.py @@ -72,6 +72,8 @@ 'Int64VectorFeatureType', 'DoubleVectorFeatureType', 'FeatureNameValidationError', + 'AbsoluteTimeRange', + 'RelativeTimeRange', 'ObservationSettings', 'FeaturePrinter', 'SparkExecutionConfiguration', diff --git a/feathr_project/feathr/definition/settings.py b/feathr_project/feathr/definition/settings.py index 7768d2a98..75b3186c8 100644 --- a/feathr_project/feathr/definition/settings.py +++ b/feathr_project/feathr/definition/settings.py @@ -1,41 +1,124 @@ +from abc import ABC +from dataclasses import dataclass from typing import Optional + from jinja2 import Template from loguru import logger + from feathr.definition.feathrconfig import HoconConvertible + +@dataclass +class BaseTimeRange(ABC): + pass + + +@dataclass +class AbsoluteTimeRange(BaseTimeRange): + start_time: str # E.g. "2020-01-01" + end_time: str # E.g. "2020-01-31" + time_format: str = "yyyy-MM-dd" + + +@dataclass +class RelativeTimeRange(BaseTimeRange): + offset: str # E.g. 1d + window: str # E.g. 3d + + class ObservationSettings(HoconConvertible): """Time settings of the observation data. Used in feature join. + The data time settings pertaining to how much of the input dataset is to be loaded from the timestamp column. + This is a way in which the input data can be restricted to allow only a fixed interval of dates to be joined with the feature data. + This restriction will apply on the timestamp column of the input data. + + For example, startDate: "20200522", endDate: "20200525" implies this feature should be joined with the input data starting from + 22nd May 2020 to 25th May, 2020 with both dates included. Attributes: - observation_path: path to the observation dataset, i.e. input dataset to get with features - event_timestamp_column (Optional[str]): The timestamp field of your record. As sliding window aggregation feature assume each record in the source data should have a timestamp column. - timestamp_format (Optional[str], optional): The format of the timestamp field. Defaults to "epoch". Possible values are: - - `epoch` (seconds since epoch), for example `1647737463` - - `epoch_millis` (milliseconds since epoch), for example `1647737517761` - - Any date formats supported by [SimpleDateFormat](https://docs.oracle.com/javase/8/docs/api/java/text/SimpleDateFormat.html). + observation_path (str): Path to the observation dataset, i.e. input dataset to get with features. + observation_data_time_settings (str): Settings which have parameters specifying how to load the observation data. + join_time_settings (str): Settings which have parameters specifying how to join the observation data with the feature data. """ - def __init__(self, - observation_path: str, - event_timestamp_column: Optional[str] = None, - timestamp_format: str = "epoch") -> None: - self.event_timestamp_column = event_timestamp_column - self.timestamp_format = timestamp_format + observation_data_time_settings: str = None + join_time_settings: str = None + + def __init__( + self, + observation_path: str, + event_timestamp_column: Optional[str] = None, + timestamp_format: Optional[str] = "epoch", + use_latest_feature_data: Optional[bool] = False, + observation_time_range: Optional[BaseTimeRange] = None, + ) -> None: + """Initialize observation settings. + + Args: + observation_path: Path to the observation dataset, i.e. input dataset to get with features. + event_timestamp_column: The timestamp field of your record as sliding window aggregation (SWA) feature + assume each record in the source data should have a timestamp column. + timestamp_format: The format of the timestamp field. Defaults to "epoch". Possible values are: + - `epoch` (seconds since epoch), for example `1647737463` + - `epoch_millis` (milliseconds since epoch), for example `1647737517761` + - Any date formats supported by [SimpleDateFormat](https://docs.oracle.com/javase/8/docs/api/java/text/SimpleDateFormat.html). + use_latest_feature_data: Use the latest feature data when join. This should not be set together with event_timestamp_column + observation_time_range: The time range of the observation data. + """ self.observation_path = observation_path if observation_path.startswith("http"): logger.warning("Your observation_path {} starts with http, which is not supported. Consider using paths starting with wasb[s]/abfs[s]/s3.", observation_path) + # Settings which have parameters specifying how to load the observation data. + if observation_time_range: + if isinstance(observation_time_range, AbsoluteTimeRange): + self.observation_data_time_settings = f"""{{ + absoluteTimeRange: {{ + startTime: "{observation_time_range.start_time}" + endTime: "{observation_time_range.end_time}" + timeFormat: "{observation_time_range.time_format}" + }} + }}""" + elif isinstance(observation_time_range, RelativeTimeRange): + self.observation_data_time_settings = f"""{{ + relativeTimeRange: {{ + offset: {observation_time_range.offset} + window: {observation_time_range.window} + }} + }}""" + else: + raise ValueError(f"Unsupported observation_time_range type {type(observation_time_range)}") + + # Settings which have parameters specifying how to join the observation data with the feature data. + if use_latest_feature_data: + if event_timestamp_column: + raise ValueError("use_latest_feature_data cannot set together with event_timestamp_column") + + self.join_time_settings = """{ + useLatestFeatureData: true + }""" + elif event_timestamp_column: + self.join_time_settings = f"""{{ + timestampColumn: {{ + def: "{event_timestamp_column}" + format: "{timestamp_format}" + }} + }}""" + + # TODO implement `simulateTimeDelay: 1d` -- This is the global setting, and should be applied to all the features + # except those specified using timeDelayOverride (should introduce "timeDelayOverride" to Feature spec). + def to_feature_config(self) -> str: tm = Template(""" - {% if setting.event_timestamp_column is not none %} - settings: { - joinTimeSettings: { - timestampColumn: { - def: "{{setting.event_timestamp_column}}" - format: "{{setting.timestamp_format}}" - } - } - } + {% if (setting.observation_data_time_settings is not none) or (setting.join_time_settings is not none) %} + settings: { + {% if setting.observation_data_time_settings is not none %} + observationDataTimeSettings: {{setting.observation_data_time_settings}} + {% endif %} + {% if setting.join_time_settings is not none %} + joinTimeSettings: {{setting.join_time_settings}} {% endif %} - observationPath: "{{setting.observation_path}}" + } + {% endif %} + observationPath: "{{setting.observation_path}}" """) return tm.render(setting=self) diff --git a/feathr_project/pyproject.toml b/feathr_project/pyproject.toml index 3ebc58ba7..3b5a9e4d7 100644 --- a/feathr_project/pyproject.toml +++ b/feathr_project/pyproject.toml @@ -12,7 +12,8 @@ multi_line_output = 3 [tool.pytest.ini_options] markers = [ "notebooks: Jupyter notebook tests. Target Spark platform can be either Azure Synapse, Databricks, or Local Spark.", - "databricks: Jupyter notebook tests. Target Spark platform must be Databricks", + "databricks: Jupyter notebook tests. Target Spark platform must be Databricks.", + "integration: Integration tests." ] [build-system] diff --git a/feathr_project/test/conftest.py b/feathr_project/test/conftest.py index c2699e871..93f1071a1 100644 --- a/feathr_project/test/conftest.py +++ b/feathr_project/test/conftest.py @@ -29,12 +29,23 @@ def workspace_dir() -> str: return str(Path(__file__).parent.resolve().joinpath("test_user_workspace")) +# TODO we can use this later +# @pytest.fixture +# def mock_data_path(workspace_dir): +# return str(Path(workspace_dir).joinpath( +# "mockdata", +# "feathrazuretest3fs@feathrazuretest3storage.dfs.core.windows.net", +# "demo_data", +# "green_tripdata_2020-04.csv", +# )) + + @pytest.fixture(scope="function") -def feathr_client(workspace_dir) -> FeathrClient: +def feathr_client(config_path) -> FeathrClient: """Test function-scoped Feathr client. - Note, cluster target (local, databricks, synapse) maybe overriden by the environment variables set at test machine. + Note, cluster target (local, databricks, synapse) maybe overridden by the environment variables set at test machine. """ - return FeathrClient(config_path=str(Path(workspace_dir, "feathr_config.yaml"))) + return FeathrClient(config_path=config_path) @pytest.fixture(scope="module") diff --git a/feathr_project/test/integration/__init__.py b/feathr_project/test/integration/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/feathr_project/test/integration/test_observation_settings.py b/feathr_project/test/integration/test_observation_settings.py new file mode 100644 index 000000000..8fc38fd3f --- /dev/null +++ b/feathr_project/test/integration/test_observation_settings.py @@ -0,0 +1,152 @@ +from datetime import datetime, timedelta +from pathlib import Path +from tempfile import TemporaryDirectory + +import pandas as pd +import pytest + +from feathr import ( + AbsoluteTimeRange, RelativeTimeRange, ObservationSettings, + HdfsSource, + TypedKey, ValueType, + Feature, FeatureAnchor, WindowAggTransformation, + # Feature types + FLOAT, + INPUT_CONTEXT, + FeatureQuery, +) +from feathr.utils.job_utils import get_result_df + + +TIMESTAMP_COL = "timestamp" +KEY_COL = "location_id" +FEATURE_COL = "fare" + + +@pytest.fixture(scope="session") +def mock_df(): + """Mock data for testing. + """ + # TODO Currently we're using "today" since `useLatestFeatureData` uses now(). + # Once the behavior is changed to use the latest timestamp in the data, we can use fixed test data instead of creating new one everytime. + today = datetime.now().date() + date_range = list(pd.date_range(start=today-timedelta(days=4), end=today, freq="D")) + return pd.DataFrame({ + TIMESTAMP_COL: date_range + date_range, + KEY_COL: [1, 1, 1, 1, 1, 2, 2, 2, 2, 2], + FEATURE_COL: [5.5, 10.0, 6.0, 8.0, 2.5, 38.0, 12.0, 52.0, 180.0, 3.5], + }) + + +@pytest.mark.integration +def test__observation_settings(feathr_client, mock_df): + tmp_dir = TemporaryDirectory() + + mock_data_path = str(Path(tmp_dir.name, "mock_data.csv")) + mock_df.to_csv(str(mock_data_path), index=False) + + # Upload data into dbfs or adls + if feathr_client.spark_runtime != "local": + mock_data_path = feathr_client.feathr_spark_launcher.upload_or_get_cloud_path(mock_data_path) + + # Define agg features + source = HdfsSource( + name="source", + path=mock_data_path, + event_timestamp_column=TIMESTAMP_COL, + timestamp_format="yyyy-MM-dd", # yyyy/MM/dd/HH/mm/ss + ) + key = TypedKey( + key_column=KEY_COL, + key_column_type=ValueType.INT32, + description="key", + full_name=KEY_COL, + ) + agg_features = [ + Feature( + name=f"f_{FEATURE_COL}", + key=key, + feature_type=FLOAT, + transform=WindowAggTransformation( + agg_expr=f"cast_float({FEATURE_COL})", + agg_func="MAX", + window="2d", # 2 days sliding window + ), + ), + ] + agg_anchor = FeatureAnchor( + name="agg_anchor", + source=source, + features=agg_features, + ) + + feathr_client.build_features( + anchor_list=[ + agg_anchor, + ] + ) + + query = [ + FeatureQuery( + feature_list=[ + f"f_{FEATURE_COL}", + ], + key=key, + ), + ] + + test_parameters__expected_values = [ + ( + dict(event_timestamp_column=TIMESTAMP_COL), + # Max value by the sliding window '2d' + [5.5, 10., 10., 8., 8., 38., 38., 52., 180., 180.], + ), + ( + dict(use_latest_feature_data=True), + # The latest feature values: Time window is '2d' and thus the feature values for each key is 8.0 and 180.0 + [8.0, 8.0, 8.0, 8.0, 8.0, 180.0, 180.0, 180.0, 180.0, 180.0], + ), + ( + dict( + event_timestamp_column=TIMESTAMP_COL, + observation_time_range=RelativeTimeRange(offset="3d", window="2d"), + ), + # TODO BUG - RelativeTimeRange doesn't have any effect on the result + [5.5, 10., 10., 8., 8., 38., 38., 52., 180., 180.], + ), + ( + dict( + event_timestamp_column=TIMESTAMP_COL, + observation_time_range=AbsoluteTimeRange( + start_time=mock_df[TIMESTAMP_COL].max().date().isoformat(), + end_time=mock_df[TIMESTAMP_COL].max().date().isoformat(), + time_format="yyyy-MM-dd", + ), + ), + # TODO BUG - AbsoluteTimeRange doesn't have any effect on the result + [5.5, 10., 10., 8., 8., 38., 38., 52., 180., 180.], + ), + ] + + for test_params, expected_values in test_parameters__expected_values: + settings = ObservationSettings( + observation_path=mock_data_path, + timestamp_format="yyyy-MM-dd", + **test_params, + ) + + output_path = str(Path(Path(mock_data_path).parent, "output.avro")) + + feathr_client.get_offline_features( + observation_settings=settings, + feature_query=query, + output_path=output_path, + ) + + feathr_client.wait_job_to_finish(timeout_sec=5000) + + # download result and assert the returned result + res_df = get_result_df(feathr_client) + res_df.sort_values(by=[KEY_COL, TIMESTAMP_COL], inplace=True) + assert res_df[f"f_{FEATURE_COL}"].tolist() == expected_values + # print(res_df) diff --git a/feathr_project/test/samples/__init__.py b/feathr_project/test/samples/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/feathr_project/test/test_observation_setting.py b/feathr_project/test/test_observation_setting.py deleted file mode 100644 index aa9cd6f72..000000000 --- a/feathr_project/test/test_observation_setting.py +++ /dev/null @@ -1,33 +0,0 @@ -from feathr import ObservationSettings - - -def test_observation_setting_with_timestamp(): - observation_settings = ObservationSettings( - observation_path="wasbs://public@azurefeathrstorage.blob.core.windows.net/sample_data/green_tripdata_2020-04.csv", - event_timestamp_column="lpep_dropoff_datetime", - timestamp_format="yyyy-MM-dd HH:mm:ss") - config = observation_settings.to_feature_config() - expected_config = """ - settings: { - joinTimeSettings: { - timestampColumn: { - def: "lpep_dropoff_datetime" - format: "yyyy-MM-dd HH:mm:ss" - } - } - } - - observationPath: "wasbs://public@azurefeathrstorage.blob.core.windows.net/sample_data/green_tripdata_2020-04.csv" - """ - assert ''.join(config.split()) == ''.join(expected_config.split()) - - -def test_observation_setting_without_timestamp(): - - observation_settings = ObservationSettings( - observation_path='snowflake://snowflake_account/?dbtable=CALL_CENTER&sfDatabase=SNOWFLAKE_SAMPLE_DATA&sfSchema=TPCDS_SF10TCL') - config = observation_settings.to_feature_config() - expected_config = """ - observationPath:"snowflake://snowflake_account/?dbtable=CALL_CENTER&sfDatabase=SNOWFLAKE_SAMPLE_DATA&sfSchema=TPCDS_SF10TCL" - """ - assert ''.join(config.split()) == ''.join(expected_config.split()) \ No newline at end of file diff --git a/feathr_project/test/unit/__init__.py b/feathr_project/test/unit/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/feathr_project/test/unit/datasets/__init__.py b/feathr_project/test/unit/datasets/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/feathr_project/test/unit/definition/__init__.py b/feathr_project/test/unit/definition/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/feathr_project/test/unit/definition/test_settings.py b/feathr_project/test/unit/definition/test_settings.py new file mode 100644 index 000000000..1aee51e84 --- /dev/null +++ b/feathr_project/test/unit/definition/test_settings.py @@ -0,0 +1,125 @@ +import pytest +from feathr.definition.settings import AbsoluteTimeRange, RelativeTimeRange, ObservationSettings + + +def test__observation_settings__with_use_latest_feature_data(): + settings = ObservationSettings( + observation_path="some_observation_path", + use_latest_feature_data=True, + ) + config = settings.to_feature_config() + expected_config = """ + settings: { + joinTimeSettings: { + useLatestFeatureData: true + } + } + observationPath: "some_observation_path" + """ + assert ''.join(config.split()) == ''.join(expected_config.split()) + + +def test__observation_settings__with_use_latest_feature_data_and_join_time_settings__raise_exception(): + with pytest.raises(ValueError): + ObservationSettings( + observation_path="some_observation_path", + event_timestamp_column="timestamp", + use_latest_feature_data=True, + ) + + +def test__observation_settings__with_join_time_settings(): + settings = ObservationSettings( + observation_path="some_observation_path", + event_timestamp_column="timestamp", + timestamp_format="yyyy-MM-dd", + ) + config = settings.to_feature_config() + expected_config = """ + settings: { + joinTimeSettings: { + timestampColumn: { + def: "timestamp" + format: "yyyy-MM-dd" + } + } + } + observationPath: "some_observation_path" + """ + assert ''.join(config.split()) == ''.join(expected_config.split()) + + +@pytest.mark.parametrize( + "observation_time_range,expected_time_range_str", [ + ( + AbsoluteTimeRange(start_time="2020-01-01", end_time="2020-01-31", time_format="yyyy-MM-dd"), + """absoluteTimeRange: { + startTime: "2020-01-01" + endTime: "2020-01-31" + timeFormat: "yyyy-MM-dd" + }""", + ), + ( + RelativeTimeRange(offset="1d", window="2d"), + """relativeTimeRange: { + offset: 1d + window: 2d + }""", + ) + ], +) +def test__observation_settings__with_observation_data_time_settings(observation_time_range, expected_time_range_str): + settings = ObservationSettings( + observation_path="some_observation_path", + observation_time_range=observation_time_range, + ) + config = settings.to_feature_config() + expected_config = f""" + settings: {{ + observationDataTimeSettings: {{ + {expected_time_range_str} + }} + }} + observationPath: "some_observation_path" + """ + assert ''.join(config.split()) == ''.join(expected_config.split()) + + +def test__observation_settings__without_join_and_observation_data_time_settings(): + settings = ObservationSettings( + observation_path="some_observation_path", + ) + config = settings.to_feature_config() + expected_config = """ + observationPath: "some_observation_path" + """ + assert ''.join(config.split()) == ''.join(expected_config.split()) + + +def test__observation_settings(): + settings = ObservationSettings( + observation_path="some_observation_path", + event_timestamp_column="timestamp", + timestamp_format="yyyy-MM-dd", + observation_time_range=RelativeTimeRange(offset="1d", window="2d"), + ) + config = settings.to_feature_config() + expected_config = """ + settings: { + observationDataTimeSettings: { + relativeTimeRange: { + offset: 1d + window: 2d + } + } + joinTimeSettings: { + timestampColumn: { + def: "timestamp" + format: "yyyy-MM-dd" + } + } + } + observationPath: "some_observation_path" + """ + + assert ''.join(config.split()) == ''.join(expected_config.split()) diff --git a/feathr_project/test/unit/spark_provider/__init__.py b/feathr_project/test/unit/spark_provider/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/feathr_project/test/unit/udf/__init__.py b/feathr_project/test/unit/udf/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/feathr_project/test/unit/utils/__init__.py b/feathr_project/test/unit/utils/__init__.py new file mode 100644 index 000000000..e69de29bb