From ac2efa48a410ae9e7d642d095b50a92f92e44ad4 Mon Sep 17 00:00:00 2001 From: tanlocnguyen Date: Sat, 2 Mar 2024 14:19:07 +0700 Subject: [PATCH 1/4] remove unused parameter when init sparksource Signed-off-by: tanlocnguyen --- .../offline_stores/contrib/spark_offline_store/spark_source.py | 1 - 1 file changed, 1 deletion(-) diff --git a/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark_source.py b/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark_source.py index a27065fb5e..1ff7e6de58 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark_source.py +++ b/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark_source.py @@ -39,7 +39,6 @@ def __init__( query: Optional[str] = None, path: Optional[str] = None, file_format: Optional[str] = None, - event_timestamp_column: Optional[str] = None, created_timestamp_column: Optional[str] = None, field_mapping: Optional[Dict[str, str]] = None, description: Optional[str] = "", From 1ef7de92c91235c49d1fc4935131ba9be8740cef Mon Sep 17 00:00:00 2001 From: tanlocnguyen Date: Wed, 6 Mar 2024 17:03:33 +0700 Subject: [PATCH 2/4] feat: add entity df to SparkOfflineStore when get_historical_features Signed-off-by: tanlocnguyen --- .../contrib/spark_offline_store/spark.py | 26 ++++++++++++------- 1 file changed, 16 insertions(+), 10 deletions(-) diff --git a/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py b/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py index c9591b7c3f..63b03fbdf9 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py +++ b/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py @@ -125,7 +125,7 @@ def get_historical_features( config: RepoConfig, feature_views: List[FeatureView], feature_refs: List[str], - entity_df: Union[pandas.DataFrame, str], + entity_df: Union[pandas.DataFrame, str, pyspark.sql.DataFrame], registry: Registry, project: str, full_feature_names: bool = False, @@ -473,15 +473,16 @@ def _get_entity_df_event_timestamp_range( entity_df_event_timestamp.min().to_pydatetime(), entity_df_event_timestamp.max().to_pydatetime(), ) - elif isinstance(entity_df, str): + elif isinstance(entity_df, str) or isinstance(entity_df, pyspark.sql.DataFrame): # If the entity_df is a string (SQL query), determine range # from table - df = spark_session.sql(entity_df).select(entity_df_event_timestamp_col) - - # Checks if executing entity sql resulted in any data - if df.rdd.isEmpty(): - raise EntitySQLEmptyResults(entity_df) - + if isinstance(entity_df, str): + df = spark_session.sql(entity_df).select(entity_df_event_timestamp_col) + # Checks if executing entity sql resulted in any data + if df.rdd.isEmpty(): + raise EntitySQLEmptyResults(entity_df) + else: + df = entity_df # TODO(kzhang132): need utc conversion here. entity_df_event_timestamp_range = ( @@ -499,8 +500,11 @@ def _get_entity_schema( ) -> Dict[str, np.dtype]: if isinstance(entity_df, pd.DataFrame): return dict(zip(entity_df.columns, entity_df.dtypes)) - elif isinstance(entity_df, str): - entity_spark_df = spark_session.sql(entity_df) + elif isinstance(entity_df, str) or isinstance(entity_df, pyspark.sql.DataFrame): + if isinstance(entity_df, str): + entity_spark_df = spark_session.sql(entity_df) + else: + entity_spark_df = entity_df return dict( zip( entity_spark_df.columns, @@ -525,6 +529,8 @@ def _upload_entity_df( return elif isinstance(entity_df, str): spark_session.sql(entity_df).createOrReplaceTempView(table_name) + elif isinstance(entity_df, pyspark.sql.DataFrame): + entity_df.createOrReplaceTempView(table_name) return else: raise InvalidEntityType(type(entity_df)) From a5313b5738912bc7018a93d56c8fddb7a4aba509 Mon Sep 17 00:00:00 2001 From: tanlocnguyen Date: Wed, 6 Mar 2024 20:33:12 +0700 Subject: [PATCH 3/4] fix: lint error Signed-off-by: tanlocnguyen --- .../infra/offline_stores/contrib/spark_offline_store/spark.py | 1 + 1 file changed, 1 insertion(+) diff --git a/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py b/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py index 63b03fbdf9..b1b1c04c7d 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py +++ b/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py @@ -529,6 +529,7 @@ def _upload_entity_df( return elif isinstance(entity_df, str): spark_session.sql(entity_df).createOrReplaceTempView(table_name) + return elif isinstance(entity_df, pyspark.sql.DataFrame): entity_df.createOrReplaceTempView(table_name) return From 76ab7bff354452df76291bb066178eb52d53bb3a Mon Sep 17 00:00:00 2001 From: tanlocnguyen Date: Thu, 7 Mar 2024 06:54:16 +0700 Subject: [PATCH 4/4] update readme spark.md Signed-off-by: tanlocnguyen --- docs/reference/offline-stores/spark.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/reference/offline-stores/spark.md b/docs/reference/offline-stores/spark.md index ae5ea78071..3cca2aab1a 100644 --- a/docs/reference/offline-stores/spark.md +++ b/docs/reference/offline-stores/spark.md @@ -4,7 +4,7 @@ The Spark offline store provides support for reading [SparkSources](../data-sources/spark.md). -* Entity dataframes can be provided as a SQL query or can be provided as a Pandas dataframe. A Pandas dataframes will be converted to a Spark dataframe and processed as a temporary view. +* Entity dataframes can be provided as a SQL query, Pandas dataframe or can be provided as a Pyspark dataframe. A Pandas dataframes will be converted to a Spark dataframe and processed as a temporary view. ## Disclaimer