From 5528276796df7adc9d46a35c897e08f72f7fafc4 Mon Sep 17 00:00:00 2001 From: Jun Ki Min <42475935+loomlike@users.noreply.github.com> Date: Tue, 29 Nov 2022 16:01:01 -0800 Subject: [PATCH 1/5] Add observation time range setting Signed-off-by: Jun Ki Min <42475935+loomlike@users.noreply.github.com> --- feathr_project/feathr/definition/settings.py | 115 +++++++++++++--- .../test/test_observation_setting.py | 33 ----- .../test/unit/definition/test_settings.py | 125 ++++++++++++++++++ 3 files changed, 221 insertions(+), 52 deletions(-) delete mode 100644 feathr_project/test/test_observation_setting.py create mode 100644 feathr_project/test/unit/definition/test_settings.py diff --git a/feathr_project/feathr/definition/settings.py b/feathr_project/feathr/definition/settings.py index 7768d2a98..4f53d6f35 100644 --- a/feathr_project/feathr/definition/settings.py +++ b/feathr_project/feathr/definition/settings.py @@ -1,39 +1,116 @@ +from abc import ABC +from dataclasses import dataclass from typing import Optional + from jinja2 import Template from loguru import logger + from feathr.definition.feathrconfig import HoconConvertible + +@dataclass +class BaseTimeRange(ABC): + pass + + +@dataclass +class AbsoluteTimeRange(BaseTimeRange): + start_time: str # E.g. "2020-01-01" + end_time: str # E.g. "2020-01-31" + time_format: str = "yyyy-MM-dd" + + +@dataclass +class RelativeTimeRange(BaseTimeRange): + offset: str # E.g. 1d + window: str # E.g. 3d + + class ObservationSettings(HoconConvertible): """Time settings of the observation data. Used in feature join. Attributes: - observation_path: path to the observation dataset, i.e. input dataset to get with features - event_timestamp_column (Optional[str]): The timestamp field of your record. As sliding window aggregation feature assume each record in the source data should have a timestamp column. - timestamp_format (Optional[str], optional): The format of the timestamp field. Defaults to "epoch". Possible values are: - - `epoch` (seconds since epoch), for example `1647737463` - - `epoch_millis` (milliseconds since epoch), for example `1647737517761` - - Any date formats supported by [SimpleDateFormat](https://docs.oracle.com/javase/8/docs/api/java/text/SimpleDateFormat.html). + observation_path (str): Path to the observation dataset, i.e. input dataset to get with features. + observation_data_time_settings (str): Settings which have parameters specifying how to load the observation data. + join_time_settings (str): Settings which have parameters specifying how to join the observation data with the feature data. """ - def __init__(self, - observation_path: str, - event_timestamp_column: Optional[str] = None, - timestamp_format: str = "epoch") -> None: - self.event_timestamp_column = event_timestamp_column - self.timestamp_format = timestamp_format + observation_data_time_settings: str = None + join_time_settings: str = None + + def __init__( + self, + observation_path: str, + event_timestamp_column: Optional[str] = None, + timestamp_format: Optional[str] = "epoch", + use_latest_feature_data: Optional[bool] = False, + observation_time_range: Optional[BaseTimeRange] = None, + ) -> None: + """Initialize observation settings. + + Args: + observation_path: Path to the observation dataset, i.e. input dataset to get with features. + event_timestamp_column: The timestamp field of your record as sliding window aggregation (SWA) feature + assume each record in the source data should have a timestamp column. + timestamp_format: The format of the timestamp field. Defaults to "epoch". Possible values are: + - `epoch` (seconds since epoch), for example `1647737463` + - `epoch_millis` (milliseconds since epoch), for example `1647737517761` + - Any date formats supported by [SimpleDateFormat](https://docs.oracle.com/javase/8/docs/api/java/text/SimpleDateFormat.html). + use_latest_feature_data: Use the latest feature data when join. This should not be set together with event_timestamp_column + observation_time_range: The time range of the observation data. + """ self.observation_path = observation_path if observation_path.startswith("http"): logger.warning("Your observation_path {} starts with http, which is not supported. Consider using paths starting with wasb[s]/abfs[s]/s3.", observation_path) + # Settings which have parameters specifying how to load the observation data. + if observation_time_range: + if isinstance(observation_time_range, AbsoluteTimeRange): + self.observation_data_time_settings = f"""{{ + absoluteTimeRange: {{ + startTime: "{observation_time_range.start_time}" + endTime: "{observation_time_range.end_time}" + timeFormat: "{observation_time_range.time_format}" + }} + }}""" + elif isinstance(observation_time_range, RelativeTimeRange): + self.observation_data_time_settings = f"""{{ + relativeTimeRange: {{ + offset: {observation_time_range.offset} + window: {observation_time_range.window} + }} + }}""" + else: + raise ValueError(f"Unsupported observation_time_range type {type(observation_time_range)}") + + # Settings which have parameters specifying how to join the observation data with the feature data. + if use_latest_feature_data: + if event_timestamp_column: + raise ValueError("use_latest_feature_data cannot set together with event_timestamp_column") + + self.join_time_settings = """{ + useLatestFeatureData: true + }""" + elif event_timestamp_column: + self.join_time_settings = f"""{{ + timestampColumn: {{ + def: "{event_timestamp_column}" + format: "{timestamp_format}" + }} + }}""" + + # TODO implement `simulateTimeDelay: 1d` -- This is the global setting, and should be applied to all the features + # except those specified using timeDelayOverride (should introduce "timeDelayOverride" to Feature spec). + def to_feature_config(self) -> str: tm = Template(""" - {% if setting.event_timestamp_column is not none %} + {% if (setting.observation_data_time_settings is not none) or (setting.join_time_settings is not none) %} settings: { - joinTimeSettings: { - timestampColumn: { - def: "{{setting.event_timestamp_column}}" - format: "{{setting.timestamp_format}}" - } - } + {% if setting.observation_data_time_settings is not none %} + observationDataTimeSettings: {{setting.observation_data_time_settings}} + {% endif %} + {% if setting.join_time_settings is not none %} + joinTimeSettings: {{setting.join_time_settings}} + {% endif %} } {% endif %} observationPath: "{{setting.observation_path}}" diff --git a/feathr_project/test/test_observation_setting.py b/feathr_project/test/test_observation_setting.py deleted file mode 100644 index aa9cd6f72..000000000 --- a/feathr_project/test/test_observation_setting.py +++ /dev/null @@ -1,33 +0,0 @@ -from feathr import ObservationSettings - - -def test_observation_setting_with_timestamp(): - observation_settings = ObservationSettings( - observation_path="wasbs://public@azurefeathrstorage.blob.core.windows.net/sample_data/green_tripdata_2020-04.csv", - event_timestamp_column="lpep_dropoff_datetime", - timestamp_format="yyyy-MM-dd HH:mm:ss") - config = observation_settings.to_feature_config() - expected_config = """ - settings: { - joinTimeSettings: { - timestampColumn: { - def: "lpep_dropoff_datetime" - format: "yyyy-MM-dd HH:mm:ss" - } - } - } - - observationPath: "wasbs://public@azurefeathrstorage.blob.core.windows.net/sample_data/green_tripdata_2020-04.csv" - """ - assert ''.join(config.split()) == ''.join(expected_config.split()) - - -def test_observation_setting_without_timestamp(): - - observation_settings = ObservationSettings( - observation_path='snowflake://snowflake_account/?dbtable=CALL_CENTER&sfDatabase=SNOWFLAKE_SAMPLE_DATA&sfSchema=TPCDS_SF10TCL') - config = observation_settings.to_feature_config() - expected_config = """ - observationPath:"snowflake://snowflake_account/?dbtable=CALL_CENTER&sfDatabase=SNOWFLAKE_SAMPLE_DATA&sfSchema=TPCDS_SF10TCL" - """ - assert ''.join(config.split()) == ''.join(expected_config.split()) \ No newline at end of file diff --git a/feathr_project/test/unit/definition/test_settings.py b/feathr_project/test/unit/definition/test_settings.py new file mode 100644 index 000000000..1aee51e84 --- /dev/null +++ b/feathr_project/test/unit/definition/test_settings.py @@ -0,0 +1,125 @@ +import pytest +from feathr.definition.settings import AbsoluteTimeRange, RelativeTimeRange, ObservationSettings + + +def test__observation_settings__with_use_latest_feature_data(): + settings = ObservationSettings( + observation_path="some_observation_path", + use_latest_feature_data=True, + ) + config = settings.to_feature_config() + expected_config = """ + settings: { + joinTimeSettings: { + useLatestFeatureData: true + } + } + observationPath: "some_observation_path" + """ + assert ''.join(config.split()) == ''.join(expected_config.split()) + + +def test__observation_settings__with_use_latest_feature_data_and_join_time_settings__raise_exception(): + with pytest.raises(ValueError): + ObservationSettings( + observation_path="some_observation_path", + event_timestamp_column="timestamp", + use_latest_feature_data=True, + ) + + +def test__observation_settings__with_join_time_settings(): + settings = ObservationSettings( + observation_path="some_observation_path", + event_timestamp_column="timestamp", + timestamp_format="yyyy-MM-dd", + ) + config = settings.to_feature_config() + expected_config = """ + settings: { + joinTimeSettings: { + timestampColumn: { + def: "timestamp" + format: "yyyy-MM-dd" + } + } + } + observationPath: "some_observation_path" + """ + assert ''.join(config.split()) == ''.join(expected_config.split()) + + +@pytest.mark.parametrize( + "observation_time_range,expected_time_range_str", [ + ( + AbsoluteTimeRange(start_time="2020-01-01", end_time="2020-01-31", time_format="yyyy-MM-dd"), + """absoluteTimeRange: { + startTime: "2020-01-01" + endTime: "2020-01-31" + timeFormat: "yyyy-MM-dd" + }""", + ), + ( + RelativeTimeRange(offset="1d", window="2d"), + """relativeTimeRange: { + offset: 1d + window: 2d + }""", + ) + ], +) +def test__observation_settings__with_observation_data_time_settings(observation_time_range, expected_time_range_str): + settings = ObservationSettings( + observation_path="some_observation_path", + observation_time_range=observation_time_range, + ) + config = settings.to_feature_config() + expected_config = f""" + settings: {{ + observationDataTimeSettings: {{ + {expected_time_range_str} + }} + }} + observationPath: "some_observation_path" + """ + assert ''.join(config.split()) == ''.join(expected_config.split()) + + +def test__observation_settings__without_join_and_observation_data_time_settings(): + settings = ObservationSettings( + observation_path="some_observation_path", + ) + config = settings.to_feature_config() + expected_config = """ + observationPath: "some_observation_path" + """ + assert ''.join(config.split()) == ''.join(expected_config.split()) + + +def test__observation_settings(): + settings = ObservationSettings( + observation_path="some_observation_path", + event_timestamp_column="timestamp", + timestamp_format="yyyy-MM-dd", + observation_time_range=RelativeTimeRange(offset="1d", window="2d"), + ) + config = settings.to_feature_config() + expected_config = """ + settings: { + observationDataTimeSettings: { + relativeTimeRange: { + offset: 1d + window: 2d + } + } + joinTimeSettings: { + timestampColumn: { + def: "timestamp" + format: "yyyy-MM-dd" + } + } + } + observationPath: "some_observation_path" + """ + + assert ''.join(config.split()) == ''.join(expected_config.split()) From aabdff4ca3a4950f81ca9c8912ea4e411dc9c24f Mon Sep 17 00:00:00 2001 From: Jun Ki Min <42475935+loomlike@users.noreply.github.com> Date: Tue, 29 Nov 2022 21:39:44 -0800 Subject: [PATCH 2/5] Add integration test Signed-off-by: Jun Ki Min <42475935+loomlike@users.noreply.github.com> --- feathr_project/pyproject.toml | 1 + feathr_project/test/conftest.py | 16 +- feathr_project/test/integration/__init__.py | 0 .../test/integration/test_settings.py | 146 ++++++++++++++++++ feathr_project/test/samples/__init__.py | 0 feathr_project/test/unit/__init__.py | 0 feathr_project/test/unit/datasets/__init__.py | 0 .../test/unit/definition/__init__.py | 0 .../test/unit/spark_provider/__init__.py | 0 feathr_project/test/unit/udf/__init__.py | 0 feathr_project/test/unit/utils/__init__.py | 0 11 files changed, 160 insertions(+), 3 deletions(-) create mode 100644 feathr_project/test/integration/__init__.py create mode 100644 feathr_project/test/integration/test_settings.py create mode 100644 feathr_project/test/samples/__init__.py create mode 100644 feathr_project/test/unit/__init__.py create mode 100644 feathr_project/test/unit/datasets/__init__.py create mode 100644 feathr_project/test/unit/definition/__init__.py create mode 100644 feathr_project/test/unit/spark_provider/__init__.py create mode 100644 feathr_project/test/unit/udf/__init__.py create mode 100644 feathr_project/test/unit/utils/__init__.py diff --git a/feathr_project/pyproject.toml b/feathr_project/pyproject.toml index be0813090..ece3767f5 100644 --- a/feathr_project/pyproject.toml +++ b/feathr_project/pyproject.toml @@ -12,6 +12,7 @@ multi_line_output = 3 [tool.pytest.ini_options] markers = [ "notebooks: Jupyter notebook tests", + "integration: Integration tests" ] [build-system] diff --git a/feathr_project/test/conftest.py b/feathr_project/test/conftest.py index c2699e871..5e3a167b3 100644 --- a/feathr_project/test/conftest.py +++ b/feathr_project/test/conftest.py @@ -29,12 +29,22 @@ def workspace_dir() -> str: return str(Path(__file__).parent.resolve().joinpath("test_user_workspace")) +@pytest.fixture +def mock_data_path(workspace_dir): + return str(Path(workspace_dir).joinpath( + "mockdata", + "feathrazuretest3fs@feathrazuretest3storage.dfs.core.windows.net", + "demo_data", + "green_tripdata_2020-04.csv", + )) + + @pytest.fixture(scope="function") -def feathr_client(workspace_dir) -> FeathrClient: +def feathr_client(config_path) -> FeathrClient: """Test function-scoped Feathr client. - Note, cluster target (local, databricks, synapse) maybe overriden by the environment variables set at test machine. + Note, cluster target (local, databricks, synapse) maybe overridden by the environment variables set at test machine. """ - return FeathrClient(config_path=str(Path(workspace_dir, "feathr_config.yaml"))) + return FeathrClient(config_path=config_path) @pytest.fixture(scope="module") diff --git a/feathr_project/test/integration/__init__.py b/feathr_project/test/integration/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/feathr_project/test/integration/test_settings.py b/feathr_project/test/integration/test_settings.py new file mode 100644 index 000000000..bebe7d495 --- /dev/null +++ b/feathr_project/test/integration/test_settings.py @@ -0,0 +1,146 @@ +from pathlib import Path + +import pytest + +from feathr import ( + AbsoluteTimeRange, RelativeTimeRange, ObservationSettings, + HdfsSource, + TypedKey, ValueType, + Feature, FeatureAnchor, WindowAggTransformation, + # Feature types + FLOAT, + INPUT_CONTEXT, + FeatureQuery, +) +from feathr.utils.job_utils import get_result_df + + +# TODO remove this +def test__mock_data(mock_data_path): + import pandas as pd + df = pd.read_csv(mock_data_path) + print(df) + + +@pytest.mark.integration +@pytest.mark.parametrize( + # TODO expected output results + "observation_time_range,event_timestamp_column,use_latest_feature_data", [ + ( + AbsoluteTimeRange( + start_time="2021-01-01 05:19:14", + end_time="2021-01-02 00:00:00", + time_format="yyyy-MM-dd HH:mm:ss", + ), + "lpep_pickup_datetime", + False, + ), + ( + AbsoluteTimeRange( + start_time="2021-01-01 05:19:14", + end_time="2021-01-02 00:00:00", + time_format="yyyy-MM-dd HH:mm:ss", + ), + None, + True, + ), + ( + RelativeTimeRange(offset="10h", window="1d"), + "lpep_pickup_datetime", + False, + ), + ( + RelativeTimeRange(offset="10h", window="1d"), + None, + True, + ), + (None, "lpep_pickup_datetime", False), + (None, None, True), + ], +) +def test__observation_settings( + feathr_client, + mock_data_path, + observation_time_range, + event_timestamp_column, + use_latest_feature_data, +): + # Upload data into dbfs or adls + if feathr_client.spark_runtime != "local": + mock_data_path = feathr_client.feathr_spark_launcher.upload_or_get_cloud_path(mock_data_path) + + # Define agg features + source = HdfsSource( + name="source", + path=mock_data_path, + event_timestamp_column="lpep_pickup_datetime", + timestamp_format="yyyy-MM-dd HH:mm:ss", + ) + location_id_key = TypedKey( + key_column="DOLocationID", + key_column_type=ValueType.INT32, + description="location id key", + full_name="location_id_key", + ) + agg_features = [ + Feature( + name="f_location_max_fare", + key=location_id_key, + feature_type=FLOAT, + transform=WindowAggTransformation( + agg_expr="fare_amount", + agg_func="MAX", + window="1d", + ), + ), + ] + agg_anchor = FeatureAnchor( + name="agg_anchor", + source=source, + features=agg_features, + ) + + # Define observation features + features = [ + Feature( + name="f_trip_distance", + feature_type=FLOAT, + transform="trip_distance", + ), + ] + + anchor = FeatureAnchor( + name="anchor", + source=INPUT_CONTEXT, + features=features, + ) + + feathr_client.build_features(anchor_list=[agg_anchor, anchor]) + + query = [ + FeatureQuery( + feature_list=["f_location_max_fare", "f_trip_distance"], + key=location_id_key, + ), + ] + + settings = ObservationSettings( + observation_path=mock_data_path, + event_timestamp_column=event_timestamp_column, + timestamp_format="yyyy-MM-dd HH:mm:ss", + use_latest_feature_data=use_latest_feature_data, + observation_time_range=observation_time_range, + ) + + output_path = str(Path(Path(mock_data_path).parent, "output.avro")) + feathr_client.get_offline_features( + observation_settings=settings, + feature_query=query, + output_path=output_path, + ) + + feathr_client.wait_job_to_finish(timeout_sec=5000) + + # download result and assert the returned result + res_df = get_result_df(feathr_client) + print(res_df) diff --git a/feathr_project/test/samples/__init__.py b/feathr_project/test/samples/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/feathr_project/test/unit/__init__.py b/feathr_project/test/unit/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/feathr_project/test/unit/datasets/__init__.py b/feathr_project/test/unit/datasets/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/feathr_project/test/unit/definition/__init__.py b/feathr_project/test/unit/definition/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/feathr_project/test/unit/spark_provider/__init__.py b/feathr_project/test/unit/spark_provider/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/feathr_project/test/unit/udf/__init__.py b/feathr_project/test/unit/udf/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/feathr_project/test/unit/utils/__init__.py b/feathr_project/test/unit/utils/__init__.py new file mode 100644 index 000000000..e69de29bb From ac0113c96d45dbe0d41a3de67f27f24b062e9ab3 Mon Sep 17 00:00:00 2001 From: Jun Ki Min <42475935+loomlike@users.noreply.github.com> Date: Fri, 2 Dec 2022 00:49:48 +0000 Subject: [PATCH 3/5] WIP integration test Signed-off-by: Jun Ki Min <42475935+loomlike@users.noreply.github.com> --- feathr_project/feathr/__init__.py | 2 + feathr_project/feathr/definition/settings.py | 62 ++++--- .../test/integration/test_settings.py | 164 +++++++++--------- 3 files changed, 117 insertions(+), 111 deletions(-) diff --git a/feathr_project/feathr/__init__.py b/feathr_project/feathr/__init__.py index 5c279b7d5..9073cf7be 100644 --- a/feathr_project/feathr/__init__.py +++ b/feathr_project/feathr/__init__.py @@ -72,6 +72,8 @@ 'Int64VectorFeatureType', 'DoubleVectorFeatureType', 'FeatureNameValidationError', + 'AbsoluteTimeRange', + 'RelativeTimeRange', 'ObservationSettings', 'FeaturePrinter', 'SparkExecutionConfiguration', diff --git a/feathr_project/feathr/definition/settings.py b/feathr_project/feathr/definition/settings.py index 4f53d6f35..75b3186c8 100644 --- a/feathr_project/feathr/definition/settings.py +++ b/feathr_project/feathr/definition/settings.py @@ -28,6 +28,12 @@ class RelativeTimeRange(BaseTimeRange): class ObservationSettings(HoconConvertible): """Time settings of the observation data. Used in feature join. + The data time settings pertaining to how much of the input dataset is to be loaded from the timestamp column. + This is a way in which the input data can be restricted to allow only a fixed interval of dates to be joined with the feature data. + This restriction will apply on the timestamp column of the input data. + + For example, startDate: "20200522", endDate: "20200525" implies this feature should be joined with the input data starting from + 22nd May 2020 to 25th May, 2020 with both dates included. Attributes: observation_path (str): Path to the observation dataset, i.e. input dataset to get with features. @@ -66,19 +72,19 @@ def __init__( if observation_time_range: if isinstance(observation_time_range, AbsoluteTimeRange): self.observation_data_time_settings = f"""{{ - absoluteTimeRange: {{ - startTime: "{observation_time_range.start_time}" - endTime: "{observation_time_range.end_time}" - timeFormat: "{observation_time_range.time_format}" - }} - }}""" + absoluteTimeRange: {{ + startTime: "{observation_time_range.start_time}" + endTime: "{observation_time_range.end_time}" + timeFormat: "{observation_time_range.time_format}" + }} + }}""" elif isinstance(observation_time_range, RelativeTimeRange): self.observation_data_time_settings = f"""{{ - relativeTimeRange: {{ - offset: {observation_time_range.offset} - window: {observation_time_range.window} - }} - }}""" + relativeTimeRange: {{ + offset: {observation_time_range.offset} + window: {observation_time_range.window} + }} + }}""" else: raise ValueError(f"Unsupported observation_time_range type {type(observation_time_range)}") @@ -88,31 +94,31 @@ def __init__( raise ValueError("use_latest_feature_data cannot set together with event_timestamp_column") self.join_time_settings = """{ - useLatestFeatureData: true - }""" + useLatestFeatureData: true + }""" elif event_timestamp_column: self.join_time_settings = f"""{{ - timestampColumn: {{ - def: "{event_timestamp_column}" - format: "{timestamp_format}" - }} - }}""" + timestampColumn: {{ + def: "{event_timestamp_column}" + format: "{timestamp_format}" + }} + }}""" # TODO implement `simulateTimeDelay: 1d` -- This is the global setting, and should be applied to all the features # except those specified using timeDelayOverride (should introduce "timeDelayOverride" to Feature spec). def to_feature_config(self) -> str: tm = Template(""" - {% if (setting.observation_data_time_settings is not none) or (setting.join_time_settings is not none) %} - settings: { - {% if setting.observation_data_time_settings is not none %} - observationDataTimeSettings: {{setting.observation_data_time_settings}} - {% endif %} - {% if setting.join_time_settings is not none %} - joinTimeSettings: {{setting.join_time_settings}} - {% endif %} - } + {% if (setting.observation_data_time_settings is not none) or (setting.join_time_settings is not none) %} + settings: { + {% if setting.observation_data_time_settings is not none %} + observationDataTimeSettings: {{setting.observation_data_time_settings}} + {% endif %} + {% if setting.join_time_settings is not none %} + joinTimeSettings: {{setting.join_time_settings}} {% endif %} - observationPath: "{{setting.observation_path}}" + } + {% endif %} + observationPath: "{{setting.observation_path}}" """) return tm.render(setting=self) diff --git a/feathr_project/test/integration/test_settings.py b/feathr_project/test/integration/test_settings.py index bebe7d495..8f7118b74 100644 --- a/feathr_project/test/integration/test_settings.py +++ b/feathr_project/test/integration/test_settings.py @@ -15,66 +15,24 @@ from feathr.utils.job_utils import get_result_df -# TODO remove this -def test__mock_data(mock_data_path): - import pandas as pd - df = pd.read_csv(mock_data_path) - print(df) +@pytest.mark.integration +def test__observation_settings(feathr_client, mock_data_path): + # TODO + mock_data_path = str(Path("mock_data.csv").resolve()) + # mock_distance_data_path = str(Path("mock_distance_data.csv").resolve()) -@pytest.mark.integration -@pytest.mark.parametrize( - # TODO expected output results - "observation_time_range,event_timestamp_column,use_latest_feature_data", [ - ( - AbsoluteTimeRange( - start_time="2021-01-01 05:19:14", - end_time="2021-01-02 00:00:00", - time_format="yyyy-MM-dd HH:mm:ss", - ), - "lpep_pickup_datetime", - False, - ), - ( - AbsoluteTimeRange( - start_time="2021-01-01 05:19:14", - end_time="2021-01-02 00:00:00", - time_format="yyyy-MM-dd HH:mm:ss", - ), - None, - True, - ), - ( - RelativeTimeRange(offset="10h", window="1d"), - "lpep_pickup_datetime", - False, - ), - ( - RelativeTimeRange(offset="10h", window="1d"), - None, - True, - ), - (None, "lpep_pickup_datetime", False), - (None, None, True), - ], -) -def test__observation_settings( - feathr_client, - mock_data_path, - observation_time_range, - event_timestamp_column, - use_latest_feature_data, -): # Upload data into dbfs or adls if feathr_client.spark_runtime != "local": mock_data_path = feathr_client.feathr_spark_launcher.upload_or_get_cloud_path(mock_data_path) + # mock_distance_data_path = feathr_client.feathr_spark_launcher.upload_or_get_cloud_path(mock_distance_data_path) # Define agg features source = HdfsSource( name="source", path=mock_data_path, event_timestamp_column="lpep_pickup_datetime", - timestamp_format="yyyy-MM-dd HH:mm:ss", + timestamp_format="yyyyMMdd", ) location_id_key = TypedKey( key_column="DOLocationID", @@ -88,9 +46,9 @@ def test__observation_settings( key=location_id_key, feature_type=FLOAT, transform=WindowAggTransformation( - agg_expr="fare_amount", + agg_expr="cast_float(fare_amount)", agg_func="MAX", - window="1d", + window="20d", ), ), ] @@ -100,47 +58,87 @@ def test__observation_settings( features=agg_features, ) - # Define observation features - features = [ - Feature( - name="f_trip_distance", - feature_type=FLOAT, - transform="trip_distance", - ), - ] + # distance_source = HdfsSource( + # name="distance_source", + # path=mock_distance_data_path, + # event_timestamp_column="lpep_pickup_datetime", + # timestamp_format="yyyy-MM-dd HH:mm:ss", + # ) + # datetime_key = TypedKey( + # key_column="lpep_pickup_datetime", + # key_column_type=ValueType.INT64, + # description="datetime key", + # full_name="datetime_key", + # ) + # features = [ + # Feature( + # name="f_fare_amount", + # # name="f_trip_distance", + # # key=datetime_key, + # feature_type=FLOAT, + # transform="fare_amount", + # # transform="trip_distance", + # ), + # ] + # anchor = FeatureAnchor( + # name="anchor", + # source=INPUT_CONTEXT, + # # source=distance_source, + # features=features, + # ) - anchor = FeatureAnchor( - name="anchor", - source=INPUT_CONTEXT, - features=features, + feathr_client.build_features( + anchor_list=[ + agg_anchor, + # anchor, + ] ) - feathr_client.build_features(anchor_list=[agg_anchor, anchor]) - query = [ FeatureQuery( - feature_list=["f_location_max_fare", "f_trip_distance"], + feature_list=[ + "f_location_max_fare", + # "f_trip_distance", + # "f_fare_amount", + ], key=location_id_key, ), ] - settings = ObservationSettings( - observation_path=mock_data_path, - event_timestamp_column=event_timestamp_column, - timestamp_format="yyyy-MM-dd HH:mm:ss", - use_latest_feature_data=use_latest_feature_data, - observation_time_range=observation_time_range, - ) + observation_time_range_values = [ + AbsoluteTimeRange( + start_time="20210102", + end_time="20210103", + time_format="yyyyMMdd", + ), + # RelativeTimeRange(offset="10h", window="1d"), + # None, + ] - output_path = str(Path(Path(mock_data_path).parent, "output.avro")) - feathr_client.get_offline_features( - observation_settings=settings, - feature_query=query, - output_path=output_path, - ) + # event_timestamp_column_values = [ + # "lpep_pickup_datetime", + # None, + # ] + + for obs_time_range in observation_time_range_values: + settings = ObservationSettings( + observation_path=mock_data_path, + event_timestamp_column="lpep_pickup_datetime",# TODOevent_timestamp_column, + timestamp_format="yyyyMMdd", # TODO check -- We only support yyyyMMdd format for this. In future, if there is a request, we can + # use_latest_feature_data=True, #use_latest_feature_data, + observation_time_range=obs_time_range, + ) + + output_path = str(Path(Path(mock_data_path).parent, "output.avro")) + + feathr_client.get_offline_features( + observation_settings=settings, + feature_query=query, + output_path=output_path, + ) - feathr_client.wait_job_to_finish(timeout_sec=5000) + feathr_client.wait_job_to_finish(timeout_sec=5000) - # download result and assert the returned result - res_df = get_result_df(feathr_client) - print(res_df) + # download result and assert the returned result + res_df = get_result_df(feathr_client) + print(res_df) #[["lpep_pickup_datetime", "f_location_max_fare", "f_trip_distance"]]) From e7be239a179028f5093ce0885a2fd3d887d31dfc Mon Sep 17 00:00:00 2001 From: Jun Ki Min <42475935+loomlike@users.noreply.github.com> Date: Sat, 3 Dec 2022 01:22:14 +0000 Subject: [PATCH 4/5] Add integration test. TODO: confirm expected behavior of the timestamp range and fix issues Signed-off-by: Jun Ki Min <42475935+loomlike@users.noreply.github.com> --- feathr_project/test/conftest.py | 17 ++- .../test/integration/test_settings.py | 142 +++++++++--------- 2 files changed, 84 insertions(+), 75 deletions(-) diff --git a/feathr_project/test/conftest.py b/feathr_project/test/conftest.py index 5e3a167b3..93f1071a1 100644 --- a/feathr_project/test/conftest.py +++ b/feathr_project/test/conftest.py @@ -29,14 +29,15 @@ def workspace_dir() -> str: return str(Path(__file__).parent.resolve().joinpath("test_user_workspace")) -@pytest.fixture -def mock_data_path(workspace_dir): - return str(Path(workspace_dir).joinpath( - "mockdata", - "feathrazuretest3fs@feathrazuretest3storage.dfs.core.windows.net", - "demo_data", - "green_tripdata_2020-04.csv", - )) +# TODO we can use this later +# @pytest.fixture +# def mock_data_path(workspace_dir): +# return str(Path(workspace_dir).joinpath( +# "mockdata", +# "feathrazuretest3fs@feathrazuretest3storage.dfs.core.windows.net", +# "demo_data", +# "green_tripdata_2020-04.csv", +# )) @pytest.fixture(scope="function") diff --git a/feathr_project/test/integration/test_settings.py b/feathr_project/test/integration/test_settings.py index 8f7118b74..8fc38fd3f 100644 --- a/feathr_project/test/integration/test_settings.py +++ b/feathr_project/test/integration/test_settings.py @@ -1,5 +1,8 @@ +from datetime import datetime, timedelta from pathlib import Path +from tempfile import TemporaryDirectory +import pandas as pd import pytest from feathr import ( @@ -15,40 +18,59 @@ from feathr.utils.job_utils import get_result_df +TIMESTAMP_COL = "timestamp" +KEY_COL = "location_id" +FEATURE_COL = "fare" + + +@pytest.fixture(scope="session") +def mock_df(): + """Mock data for testing. + """ + # TODO Currently we're using "today" since `useLatestFeatureData` uses now(). + # Once the behavior is changed to use the latest timestamp in the data, we can use fixed test data instead of creating new one everytime. + today = datetime.now().date() + date_range = list(pd.date_range(start=today-timedelta(days=4), end=today, freq="D")) + return pd.DataFrame({ + TIMESTAMP_COL: date_range + date_range, + KEY_COL: [1, 1, 1, 1, 1, 2, 2, 2, 2, 2], + FEATURE_COL: [5.5, 10.0, 6.0, 8.0, 2.5, 38.0, 12.0, 52.0, 180.0, 3.5], + }) + + @pytest.mark.integration -def test__observation_settings(feathr_client, mock_data_path): +def test__observation_settings(feathr_client, mock_df): + tmp_dir = TemporaryDirectory() - # TODO - mock_data_path = str(Path("mock_data.csv").resolve()) - # mock_distance_data_path = str(Path("mock_distance_data.csv").resolve()) + mock_data_path = str(Path(tmp_dir.name, "mock_data.csv")) + mock_df.to_csv(str(mock_data_path), index=False) # Upload data into dbfs or adls if feathr_client.spark_runtime != "local": mock_data_path = feathr_client.feathr_spark_launcher.upload_or_get_cloud_path(mock_data_path) - # mock_distance_data_path = feathr_client.feathr_spark_launcher.upload_or_get_cloud_path(mock_distance_data_path) # Define agg features source = HdfsSource( name="source", path=mock_data_path, - event_timestamp_column="lpep_pickup_datetime", - timestamp_format="yyyyMMdd", + event_timestamp_column=TIMESTAMP_COL, + timestamp_format="yyyy-MM-dd", # yyyy/MM/dd/HH/mm/ss ) - location_id_key = TypedKey( - key_column="DOLocationID", + key = TypedKey( + key_column=KEY_COL, key_column_type=ValueType.INT32, - description="location id key", - full_name="location_id_key", + description="key", + full_name=KEY_COL, ) agg_features = [ Feature( - name="f_location_max_fare", - key=location_id_key, + name=f"f_{FEATURE_COL}", + key=key, feature_type=FLOAT, transform=WindowAggTransformation( - agg_expr="cast_float(fare_amount)", + agg_expr=f"cast_float({FEATURE_COL})", agg_func="MAX", - window="20d", + window="2d", # 2 days sliding window ), ), ] @@ -58,75 +80,59 @@ def test__observation_settings(feathr_client, mock_data_path): features=agg_features, ) - # distance_source = HdfsSource( - # name="distance_source", - # path=mock_distance_data_path, - # event_timestamp_column="lpep_pickup_datetime", - # timestamp_format="yyyy-MM-dd HH:mm:ss", - # ) - # datetime_key = TypedKey( - # key_column="lpep_pickup_datetime", - # key_column_type=ValueType.INT64, - # description="datetime key", - # full_name="datetime_key", - # ) - # features = [ - # Feature( - # name="f_fare_amount", - # # name="f_trip_distance", - # # key=datetime_key, - # feature_type=FLOAT, - # transform="fare_amount", - # # transform="trip_distance", - # ), - # ] - # anchor = FeatureAnchor( - # name="anchor", - # source=INPUT_CONTEXT, - # # source=distance_source, - # features=features, - # ) - feathr_client.build_features( anchor_list=[ agg_anchor, - # anchor, ] ) query = [ FeatureQuery( feature_list=[ - "f_location_max_fare", - # "f_trip_distance", - # "f_fare_amount", + f"f_{FEATURE_COL}", ], - key=location_id_key, + key=key, ), ] - observation_time_range_values = [ - AbsoluteTimeRange( - start_time="20210102", - end_time="20210103", - time_format="yyyyMMdd", + test_parameters__expected_values = [ + ( + dict(event_timestamp_column=TIMESTAMP_COL), + # Max value by the sliding window '2d' + [5.5, 10., 10., 8., 8., 38., 38., 52., 180., 180.], + ), + ( + dict(use_latest_feature_data=True), + # The latest feature values: Time window is '2d' and thus the feature values for each key is 8.0 and 180.0 + [8.0, 8.0, 8.0, 8.0, 8.0, 180.0, 180.0, 180.0, 180.0, 180.0], + ), + ( + dict( + event_timestamp_column=TIMESTAMP_COL, + observation_time_range=RelativeTimeRange(offset="3d", window="2d"), + ), + # TODO BUG - RelativeTimeRange doesn't have any effect on the result + [5.5, 10., 10., 8., 8., 38., 38., 52., 180., 180.], + ), + ( + dict( + event_timestamp_column=TIMESTAMP_COL, + observation_time_range=AbsoluteTimeRange( + start_time=mock_df[TIMESTAMP_COL].max().date().isoformat(), + end_time=mock_df[TIMESTAMP_COL].max().date().isoformat(), + time_format="yyyy-MM-dd", + ), + ), + # TODO BUG - AbsoluteTimeRange doesn't have any effect on the result + [5.5, 10., 10., 8., 8., 38., 38., 52., 180., 180.], ), - # RelativeTimeRange(offset="10h", window="1d"), - # None, ] - # event_timestamp_column_values = [ - # "lpep_pickup_datetime", - # None, - # ] - - for obs_time_range in observation_time_range_values: + for test_params, expected_values in test_parameters__expected_values: settings = ObservationSettings( observation_path=mock_data_path, - event_timestamp_column="lpep_pickup_datetime",# TODOevent_timestamp_column, - timestamp_format="yyyyMMdd", # TODO check -- We only support yyyyMMdd format for this. In future, if there is a request, we can - # use_latest_feature_data=True, #use_latest_feature_data, - observation_time_range=obs_time_range, + timestamp_format="yyyy-MM-dd", + **test_params, ) output_path = str(Path(Path(mock_data_path).parent, "output.avro")) @@ -141,4 +147,6 @@ def test__observation_settings(feathr_client, mock_data_path): # download result and assert the returned result res_df = get_result_df(feathr_client) - print(res_df) #[["lpep_pickup_datetime", "f_location_max_fare", "f_trip_distance"]]) + res_df.sort_values(by=[KEY_COL, TIMESTAMP_COL], inplace=True) + assert res_df[f"f_{FEATURE_COL}"].tolist() == expected_values + # print(res_df) From 5f3dd149ec3a2c5fbf73b946bad73a0d4b6ea1b1 Mon Sep 17 00:00:00 2001 From: Jun Ki Min <42475935+loomlike@users.noreply.github.com> Date: Sat, 3 Dec 2022 01:30:03 +0000 Subject: [PATCH 5/5] change integration test name to be more meaningful Signed-off-by: Jun Ki Min <42475935+loomlike@users.noreply.github.com> --- .../{test_settings.py => test_observation_settings.py} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename feathr_project/test/integration/{test_settings.py => test_observation_settings.py} (100%) diff --git a/feathr_project/test/integration/test_settings.py b/feathr_project/test/integration/test_observation_settings.py similarity index 100% rename from feathr_project/test/integration/test_settings.py rename to feathr_project/test/integration/test_observation_settings.py