From 040c40fa0dd9d6b83e2c565090e9eb2720f920b4 Mon Sep 17 00:00:00 2001 From: Bhargav Dodla <13788369+EXPEbdodla@users.noreply.github.com> Date: Thu, 13 Feb 2025 09:35:18 -0800 Subject: [PATCH] fix: Improve logging for spark materialization engine (#172) Co-authored-by: Bhargav Dodla --- .../spark/spark_materialization_engine.py | 103 ++++++++++++++++-- 1 file changed, 96 insertions(+), 7 deletions(-) diff --git a/sdk/python/feast/infra/materialization/contrib/spark/spark_materialization_engine.py b/sdk/python/feast/infra/materialization/contrib/spark/spark_materialization_engine.py index 320ccc6161d..d046589861d 100644 --- a/sdk/python/feast/infra/materialization/contrib/spark/spark_materialization_engine.py +++ b/sdk/python/feast/infra/materialization/contrib/spark/spark_materialization_engine.py @@ -234,15 +234,25 @@ def _map_by_partition( spark_serialized_artifacts: _SparkSerializedArtifacts, ): feature_view, online_store, repo_config = spark_serialized_artifacts.unserialize() + + total_batches = 0 + total_time = 0.0 + min_time = float("inf") + max_time = float("-inf") + + total_rows = 0 + min_batch_size = float("inf") + max_batch_size = float("-inf") + """Load pandas df to online store""" for pdf in iterator: + start_time = time.perf_counter() pdf_row_count = pdf.shape[0] - start_time = time.time() - # convert to pyarrow table if pdf_row_count == 0: - print("INFO!!! Dataframe has 0 records to process") - return + print("INFO: Dataframe has 0 records to process") + break + # convert to pyarrow table table = pyarrow.Table.from_pandas(pdf) if feature_view.batch_source.field_mapping is not None: @@ -266,10 +276,89 @@ def _map_by_partition( rows_to_write, lambda x: None, ) - end_time = time.time() - print( - f"INFO!!! Processed batch with size {pdf_row_count} in {int((end_time - start_time) * 1000)} milliseconds" + + batch_time = time.perf_counter() - start_time + + ( + total_batches, + total_time, + min_time, + max_time, + total_rows, + min_batch_size, + max_batch_size, + ) = update_exec_stats( + total_batches, + total_time, + min_time, + max_time, + total_rows, + min_batch_size, + max_batch_size, + batch_time, + pdf_row_count, ) + + if total_batches > 0: + print_exec_stats( + total_batches, + total_time, + min_time, + max_time, + total_rows, + min_batch_size, + max_batch_size, + ) + yield pd.DataFrame( [pd.Series(range(1, 2))] ) # dummy result because mapInPandas needs to return something + + +def update_exec_stats( + total_batches, + total_time, + min_time, + max_time, + total_rows, + min_batch_size, + max_batch_size, + batch_time, + current_batch_size, +): + total_batches += 1 + total_time += batch_time + min_time = min(min_time, batch_time) + max_time = max(max_time, batch_time) + + total_rows += current_batch_size + min_batch_size = min(min_batch_size, current_batch_size) + max_batch_size = max(max_batch_size, current_batch_size) + + return ( + total_batches, + total_time, + min_time, + max_time, + total_rows, + min_batch_size, + max_batch_size, + ) + + +def print_exec_stats( + total_batches, + total_time, + min_time, + max_time, + total_rows, + min_batch_size, + max_batch_size, +): + # TODO: Investigate why the logger is not working in Spark Executors + avg_time = total_time / total_batches + avg_batch_size = total_rows / total_batches + print( + f"Time - Total: {total_time:.6f}s, Avg: {avg_time:.6f}s, Min: {min_time:.6f}s, Max: {max_time:.6f}s | " + f"Batch Size - Total: {total_rows}, Avg: {avg_batch_size:.2f}, Min: {min_batch_size}, Max: {max_batch_size}" + )