Skip to content

Commit

Permalink
fix: Improve logging for spark materialization engine (#172)
Browse files Browse the repository at this point in the history
Co-authored-by: Bhargav Dodla <[email protected]>
  • Loading branch information
EXPEbdodla and Bhargav Dodla authored Feb 13, 2025
1 parent 402a383 commit 040c40f
Showing 1 changed file with 96 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -234,15 +234,25 @@ def _map_by_partition(
spark_serialized_artifacts: _SparkSerializedArtifacts,
):
feature_view, online_store, repo_config = spark_serialized_artifacts.unserialize()

total_batches = 0
total_time = 0.0
min_time = float("inf")
max_time = float("-inf")

total_rows = 0
min_batch_size = float("inf")
max_batch_size = float("-inf")

"""Load pandas df to online store"""
for pdf in iterator:
start_time = time.perf_counter()
pdf_row_count = pdf.shape[0]
start_time = time.time()
# convert to pyarrow table
if pdf_row_count == 0:
print("INFO!!! Dataframe has 0 records to process")
return
print("INFO: Dataframe has 0 records to process")
break

# convert to pyarrow table
table = pyarrow.Table.from_pandas(pdf)

if feature_view.batch_source.field_mapping is not None:
Expand All @@ -266,10 +276,89 @@ def _map_by_partition(
rows_to_write,
lambda x: None,
)
end_time = time.time()
print(
f"INFO!!! Processed batch with size {pdf_row_count} in {int((end_time - start_time) * 1000)} milliseconds"

batch_time = time.perf_counter() - start_time

(
total_batches,
total_time,
min_time,
max_time,
total_rows,
min_batch_size,
max_batch_size,
) = update_exec_stats(
total_batches,
total_time,
min_time,
max_time,
total_rows,
min_batch_size,
max_batch_size,
batch_time,
pdf_row_count,
)

if total_batches > 0:
print_exec_stats(
total_batches,
total_time,
min_time,
max_time,
total_rows,
min_batch_size,
max_batch_size,
)

yield pd.DataFrame(
[pd.Series(range(1, 2))]
) # dummy result because mapInPandas needs to return something


def update_exec_stats(
total_batches,
total_time,
min_time,
max_time,
total_rows,
min_batch_size,
max_batch_size,
batch_time,
current_batch_size,
):
total_batches += 1
total_time += batch_time
min_time = min(min_time, batch_time)
max_time = max(max_time, batch_time)

total_rows += current_batch_size
min_batch_size = min(min_batch_size, current_batch_size)
max_batch_size = max(max_batch_size, current_batch_size)

return (
total_batches,
total_time,
min_time,
max_time,
total_rows,
min_batch_size,
max_batch_size,
)


def print_exec_stats(
total_batches,
total_time,
min_time,
max_time,
total_rows,
min_batch_size,
max_batch_size,
):
# TODO: Investigate why the logger is not working in Spark Executors
avg_time = total_time / total_batches
avg_batch_size = total_rows / total_batches
print(
f"Time - Total: {total_time:.6f}s, Avg: {avg_time:.6f}s, Min: {min_time:.6f}s, Max: {max_time:.6f}s | "
f"Batch Size - Total: {total_rows}, Avg: {avg_batch_size:.2f}, Min: {min_batch_size}, Max: {max_batch_size}"
)

0 comments on commit 040c40f

Please sign in to comment.