-
Notifications
You must be signed in to change notification settings - Fork 6
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[DOP-18570] Implement SparkMetricsRecorder
- Loading branch information
Showing
32 changed files
with
1,952 additions
and
11 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,6 +1,4 @@ | ||
.github/workflows/data/file-df/** | ||
onetl/file_df_connection/spark_file_df_connection.py | ||
onetl/file/file_df_reader/** | ||
onetl/file/file_df_writer/** | ||
onetl/file/__init__.py | ||
tests/resources/file_df_connection/** | ||
**/*file_df* | ||
**/*file_df*/** |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,17 @@ | ||
# SPDX-FileCopyrightText: 2021-2024 MTS (Mobile Telesystems) | ||
# SPDX-License-Identifier: Apache-2.0 | ||
from onetl._metrics.command import SparkCommandMetrics | ||
from onetl._metrics.driver import SparkDriverMetrics | ||
from onetl._metrics.executor import SparkExecutorMetrics | ||
from onetl._metrics.input import SparkInputMetrics | ||
from onetl._metrics.output import SparkOutputMetrics | ||
from onetl._metrics.recorder import SparkMetricsRecorder | ||
|
||
__all__ = [ | ||
"SparkCommandMetrics", | ||
"SparkDriverMetrics", | ||
"SparkMetricsRecorder", | ||
"SparkExecutorMetrics", | ||
"SparkInputMetrics", | ||
"SparkOutputMetrics", | ||
] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,57 @@ | ||
# SPDX-FileCopyrightText: 2021-2024 MTS (Mobile Telesystems) | ||
# SPDX-License-Identifier: Apache-2.0 | ||
from __future__ import annotations | ||
|
||
import os | ||
import textwrap | ||
|
||
try: | ||
from pydantic.v1 import Field | ||
except (ImportError, AttributeError): | ||
from pydantic import Field # type: ignore[no-redef, assignment] | ||
|
||
from onetl._metrics.driver import SparkDriverMetrics | ||
from onetl._metrics.executor import SparkExecutorMetrics | ||
from onetl._metrics.input import SparkInputMetrics | ||
from onetl._metrics.output import SparkOutputMetrics | ||
from onetl.impl import BaseModel | ||
|
||
INDENT = " " * 4 | ||
|
||
|
||
class SparkCommandMetrics(BaseModel): | ||
input: SparkInputMetrics = Field(default_factory=SparkInputMetrics) | ||
output: SparkOutputMetrics = Field(default_factory=SparkOutputMetrics) | ||
driver: SparkDriverMetrics = Field(default_factory=SparkDriverMetrics) | ||
executor: SparkExecutorMetrics = Field(default_factory=SparkExecutorMetrics) | ||
|
||
@property | ||
def is_empty(self) -> bool: | ||
return all([self.input.is_empty, self.output.is_empty]) | ||
|
||
def update(self, other: SparkCommandMetrics) -> SparkCommandMetrics: | ||
self.input.update(other.input) | ||
self.output.update(other.output) | ||
self.driver.update(other.driver) | ||
self.executor.update(other.executor) | ||
return self | ||
|
||
@property | ||
def details(self) -> str: | ||
if self.is_empty: | ||
return "No data" | ||
|
||
result = [] | ||
if not self.input.is_empty: | ||
result.append(f"Input:{os.linesep}{textwrap.indent(self.input.details, INDENT)}") | ||
if not self.output.is_empty: | ||
result.append(f"Output:{os.linesep}{textwrap.indent(self.output.details, INDENT)}") | ||
if not self.driver.is_empty: | ||
result.append(f"Driver:{os.linesep}{textwrap.indent(self.driver.details, INDENT)}") | ||
if not self.executor.is_empty: | ||
result.append(f"Executor:{os.linesep}{textwrap.indent(self.executor.details, INDENT)}") | ||
|
||
return os.linesep.join(result) | ||
|
||
def __str__(self): | ||
return self.details |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,39 @@ | ||
# SPDX-FileCopyrightText: 2021-2024 MTS (Mobile Telesystems) | ||
# SPDX-License-Identifier: Apache-2.0 | ||
from __future__ import annotations | ||
|
||
import os | ||
|
||
from humanize import naturalsize | ||
|
||
from onetl.impl import BaseModel | ||
|
||
# Metrics themselves are considered a part of driver result, | ||
# ignore if result is smaller than 1MB | ||
MIN_DRIVER_BYTES = 1_000_000 | ||
|
||
|
||
class SparkDriverMetrics(BaseModel): | ||
in_memory_bytes: int = 0 | ||
|
||
@property | ||
def is_empty(self) -> bool: | ||
return self.in_memory_bytes < MIN_DRIVER_BYTES | ||
|
||
def update(self, other: SparkDriverMetrics) -> SparkDriverMetrics: | ||
self.in_memory_bytes += other.in_memory_bytes | ||
return self | ||
|
||
@property | ||
def details(self) -> str: | ||
if self.is_empty: | ||
return "No data" | ||
|
||
result = [] | ||
if self.in_memory_bytes >= MIN_DRIVER_BYTES: | ||
result.append(f"In-memory data (approximate): {naturalsize(self.in_memory_bytes)}") | ||
|
||
return os.linesep.join(result) | ||
|
||
def __str__(self): | ||
return self.details |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,54 @@ | ||
# SPDX-FileCopyrightText: 2021-2024 MTS (Mobile Telesystems) | ||
# SPDX-License-Identifier: Apache-2.0 | ||
from __future__ import annotations | ||
|
||
import os | ||
from datetime import timedelta | ||
|
||
from humanize import naturalsize, precisedelta | ||
|
||
from onetl.impl import BaseModel | ||
|
||
|
||
class SparkExecutorMetrics(BaseModel): | ||
total_run_time: timedelta = timedelta() | ||
total_cpu_time: timedelta = timedelta() | ||
peak_memory_bytes: int = 0 | ||
memory_spilled_bytes: int = 0 | ||
disk_spilled_bytes: int = 0 | ||
|
||
@property | ||
def is_empty(self) -> bool: | ||
return not self.total_run_time | ||
|
||
def update(self, other: SparkExecutorMetrics) -> SparkExecutorMetrics: | ||
self.total_run_time += other.total_run_time | ||
self.total_cpu_time += other.total_cpu_time | ||
self.peak_memory_bytes += other.peak_memory_bytes | ||
self.memory_spilled_bytes += other.memory_spilled_bytes | ||
self.disk_spilled_bytes += other.disk_spilled_bytes | ||
return self | ||
|
||
@property | ||
def details(self) -> str: | ||
if self.is_empty: | ||
return "No data" | ||
|
||
result = [ | ||
f"Total run time: {precisedelta(self.total_run_time)}", | ||
f"Total CPU time: {precisedelta(self.total_cpu_time)}", | ||
] | ||
|
||
if self.peak_memory_bytes: | ||
result.append(f"Peak memory: {naturalsize(self.peak_memory_bytes)}") | ||
|
||
if self.memory_spilled_bytes: | ||
result.append(f"Memory spilled: {naturalsize(self.memory_spilled_bytes)}") | ||
|
||
if self.disk_spilled_bytes: | ||
result.append(f"Disk spilled: {naturalsize(self.disk_spilled_bytes)}") | ||
|
||
return os.linesep.join(result) | ||
|
||
def __str__(self): | ||
return self.details |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,113 @@ | ||
# SPDX-FileCopyrightText: 2021-2024 MTS (Mobile Telesystems) | ||
# SPDX-License-Identifier: Apache-2.0 | ||
from __future__ import annotations | ||
|
||
import re | ||
from datetime import timedelta | ||
from typing import Any | ||
|
||
try: | ||
from pydantic.v1 import ByteSize | ||
except (ImportError, AttributeError): | ||
from pydantic import ByteSize # type: ignore[no-redef, assignment] | ||
|
||
from onetl._metrics.command import SparkCommandMetrics | ||
from onetl._metrics.driver import SparkDriverMetrics | ||
from onetl._metrics.executor import SparkExecutorMetrics | ||
from onetl._metrics.input import SparkInputMetrics | ||
from onetl._metrics.listener.execution import ( | ||
SparkListenerExecution, | ||
SparkSQLMetricNames, | ||
) | ||
from onetl._metrics.output import SparkOutputMetrics | ||
|
||
# in some cases byte metrics have format "7.6 MiB", but sometimes it is: | ||
# total (min, med, max (stageId: taskId))\n7.6 MiB (0.0 B, 7.6 MiB, 7.6 MiB (driver)) | ||
NON_DIGIT = re.compile(r"^[^\d.]+|\(.*\)", flags=re.DOTALL) | ||
|
||
|
||
def _get_int(data: dict[SparkSQLMetricNames, list[str]], key: Any) -> int | None: | ||
try: | ||
return int(data[key][0]) | ||
except Exception: | ||
return None | ||
|
||
|
||
def _get_bytes(data: dict[SparkSQLMetricNames, list[str]], key: Any) -> int | None: | ||
try: | ||
raw_value = data[key][0] | ||
normalized_value = NON_DIGIT.sub("", raw_value) | ||
return int(ByteSize.validate(normalized_value)) | ||
except Exception: | ||
return None | ||
|
||
|
||
def extract_metrics_from_execution(execution: SparkListenerExecution) -> SparkCommandMetrics: | ||
input_read_bytes: int = 0 | ||
input_read_rows: int = 0 | ||
output_bytes: int = 0 | ||
output_rows: int = 0 | ||
|
||
run_time_milliseconds: int = 0 | ||
cpu_time_nanoseconds: int = 0 | ||
peak_memory_bytes: int = 0 | ||
memory_spilled_bytes: int = 0 | ||
disk_spilled_bytes: int = 0 | ||
result_size_bytes: int = 0 | ||
|
||
# some metrics are per-stage, and have to be summed, others are per-execution | ||
for job in execution.jobs: | ||
for stage in job.stages: | ||
input_read_bytes += stage.metrics.input_metrics.bytes_read | ||
input_read_rows += stage.metrics.input_metrics.records_read | ||
output_bytes += stage.metrics.output_metrics.bytes_written | ||
output_rows += stage.metrics.output_metrics.records_written | ||
|
||
run_time_milliseconds += stage.metrics.executor_run_time_milliseconds | ||
cpu_time_nanoseconds += stage.metrics.executor_cpu_time_nanoseconds | ||
peak_memory_bytes += stage.metrics.peak_execution_memory_bytes | ||
memory_spilled_bytes += stage.metrics.memory_spilled_bytes | ||
disk_spilled_bytes += stage.metrics.disk_spilled_bytes | ||
result_size_bytes += stage.metrics.result_size_bytes | ||
|
||
# https://github.com/apache/spark/blob/v3.5.1/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala#L467-L473 | ||
input_file_count = ( | ||
_get_int(execution.metrics, SparkSQLMetricNames.NUMBER_OF_FILES_READ) | ||
or _get_int(execution.metrics, SparkSQLMetricNames.STATIC_NUMBER_OF_FILES_READ) | ||
or 0 | ||
) | ||
input_raw_file_bytes = ( | ||
_get_bytes(execution.metrics, SparkSQLMetricNames.SIZE_OF_FILES_READ) | ||
or _get_bytes(execution.metrics, SparkSQLMetricNames.STATIC_SIZE_OF_FILES_READ) | ||
or 0 | ||
) | ||
input_read_partitions = _get_int(execution.metrics, SparkSQLMetricNames.NUMBER_OF_PARTITIONS_READ) or 0 | ||
|
||
output_files = _get_int(execution.metrics, SparkSQLMetricNames.NUMBER_OF_WRITTEN_FILES) or 0 | ||
output_dynamic_partitions = _get_int(execution.metrics, SparkSQLMetricNames.NUMBER_OF_DYNAMIC_PART) or 0 | ||
|
||
return SparkCommandMetrics( | ||
input=SparkInputMetrics( | ||
read_rows=input_read_rows, | ||
read_files=input_file_count, | ||
read_bytes=input_read_bytes, | ||
raw_file_bytes=input_raw_file_bytes, | ||
read_partitions=input_read_partitions, | ||
), | ||
output=SparkOutputMetrics( | ||
written_rows=output_rows, | ||
written_bytes=output_bytes, | ||
created_files=output_files, | ||
created_partitions=output_dynamic_partitions, | ||
), | ||
driver=SparkDriverMetrics( | ||
in_memory_bytes=result_size_bytes, | ||
), | ||
executor=SparkExecutorMetrics( | ||
total_run_time=timedelta(milliseconds=run_time_milliseconds), | ||
total_cpu_time=timedelta(microseconds=cpu_time_nanoseconds / 1000), | ||
peak_memory_bytes=peak_memory_bytes, | ||
memory_spilled_bytes=memory_spilled_bytes, | ||
disk_spilled_bytes=disk_spilled_bytes, | ||
), | ||
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,55 @@ | ||
# SPDX-FileCopyrightText: 2021-2024 MTS (Mobile Telesystems) | ||
# SPDX-License-Identifier: Apache-2.0 | ||
from __future__ import annotations | ||
|
||
import os | ||
from pprint import pformat | ||
|
||
from humanize import naturalsize | ||
|
||
from onetl.impl import BaseModel | ||
|
||
|
||
class SparkInputMetrics(BaseModel): | ||
read_rows: int = 0 | ||
read_files: int = 0 | ||
read_partitions: int = 0 | ||
read_bytes: int = 0 | ||
raw_file_bytes: int = 0 | ||
|
||
@property | ||
def is_empty(self) -> bool: | ||
return not any([self.read_bytes, self.read_files, self.read_rows]) | ||
|
||
def update(self, other: SparkInputMetrics) -> SparkInputMetrics: | ||
self.read_rows += other.read_rows | ||
self.read_files += other.read_files | ||
self.read_partitions += other.read_partitions | ||
self.read_bytes += other.read_bytes | ||
self.raw_file_bytes += other.raw_file_bytes | ||
return self | ||
|
||
@property | ||
def details(self) -> str: | ||
if self.is_empty: | ||
return "No data" | ||
|
||
result = [] | ||
result.append(f"Read rows: {pformat(self.read_rows)}") | ||
|
||
if self.read_partitions: | ||
result.append(f"Read partitions: {pformat(self.read_partitions)}") | ||
|
||
if self.read_files: | ||
result.append(f"Read files: {pformat(self.read_files)}") | ||
|
||
if self.read_bytes: | ||
result.append(f"Read size: {naturalsize(self.read_bytes)}") | ||
|
||
if self.raw_file_bytes and self.read_bytes != self.raw_file_bytes: | ||
result.append(f"Raw files size: {naturalsize(self.raw_file_bytes)}") | ||
|
||
return os.linesep.join(result) | ||
|
||
def __str__(self): | ||
return self.details |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,29 @@ | ||
# SPDX-FileCopyrightText: 2021-2024 MTS (Mobile Telesystems) | ||
# SPDX-License-Identifier: Apache-2.0 | ||
from onetl._metrics.listener.execution import ( | ||
SparkListenerExecution, | ||
SparkListenerExecutionStatus, | ||
SparkSQLMetricNames, | ||
) | ||
from onetl._metrics.listener.job import SparkListenerJob, SparkListenerJobStatus | ||
from onetl._metrics.listener.listener import SparkMetricsListener | ||
from onetl._metrics.listener.stage import SparkListenerStage, SparkListenerStageStatus | ||
from onetl._metrics.listener.task import ( | ||
SparkListenerTask, | ||
SparkListenerTaskMetrics, | ||
SparkListenerTaskStatus, | ||
) | ||
|
||
__all__ = [ | ||
"SparkListenerTask", | ||
"SparkListenerTaskStatus", | ||
"SparkListenerTaskMetrics", | ||
"SparkListenerStage", | ||
"SparkListenerStageStatus", | ||
"SparkListenerJob", | ||
"SparkListenerJobStatus", | ||
"SparkListenerExecution", | ||
"SparkListenerExecutionStatus", | ||
"SparkSQLMetricNames", | ||
"SparkMetricsListener", | ||
] |
Oops, something went wrong.