-
Notifications
You must be signed in to change notification settings - Fork 6
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[DOP-18570] Collect Spark metrics in DBWriter and FileDFWriter
- Loading branch information
Showing
25 changed files
with
1,250 additions
and
21 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,3 +1,4 @@ | ||
# SPDX-FileCopyrightText: 2021-2024 MTS (Mobile Telesystems) | ||
# SPDX-License-Identifier: Apache-2.0 | ||
from onetl.db.db_writer.db_writer import DBWriter | ||
from onetl.db.db_writer.result import DBWriterResult |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,116 @@ | ||
# SPDX-FileCopyrightText: 2021-2024 MTS (Mobile Telesystems) | ||
# SPDX-License-Identifier: Apache-2.0 | ||
from __future__ import annotations | ||
|
||
import os | ||
import textwrap | ||
|
||
from onetl.impl import BaseModel | ||
from onetl.metrics.metrics import SparkMetrics | ||
|
||
INDENT = " " * 4 | ||
|
||
|
||
class DBWriterResult(BaseModel): | ||
""" | ||
Representation of DBWriter result. | ||
.. versionadded:: 0.12.0 | ||
Examples | ||
-------- | ||
>>> from onetl.db import DBWriter | ||
>>> writer = DBWriter(...) | ||
>>> result = writer.run(df) | ||
>>> result | ||
DBWriterResult( | ||
metrics=SparkMetrics( | ||
input=SparkInputMetrics( | ||
read_rows=1_000, | ||
read_files=10, | ||
read_bytes=1_000_000, | ||
scan_bytes=2_000_000, | ||
read_partitions=3, | ||
dynamic_partition_pruning=True, | ||
), | ||
output=SparkOutputMetrics( | ||
written_rows=1_000, | ||
written_bytes=1_000_000, | ||
created_files=10, | ||
created_dynamic_partitions=1, | ||
), | ||
executor=SparkExecutorMetrics( | ||
run_time_milliseconds=1_000, | ||
cpu_time_nanoseconds=2_000_000_000, | ||
peak_memory_bytes=1_000_000_000, | ||
), | ||
) | ||
) | ||
""" | ||
|
||
metrics: SparkMetrics | ||
|
||
@property | ||
def details(self) -> str: | ||
""" | ||
Return summarized information about the result object. | ||
Examples | ||
-------- | ||
>>> from onetl.db.writer import DBWriterResult | ||
>>> from onetl.metrics import SparkMetrics, SparkOutputMetrics, SparkInputMetrics, SparkExecutorMetrics | ||
>>> result1 = DBWriterResult( | ||
... metrics=SparkMetrics( | ||
... input=SparkInputMetrics( | ||
... read_rows=1_000, | ||
... read_files=10, | ||
... read_bytes=1_000_000, | ||
... scan_bytes=2_000_000, | ||
... read_partitions=3, | ||
... dynamic_partition_pruning=True, | ||
... ), | ||
... output=SparkOutputMetrics( | ||
... written_rows=1_000, | ||
... written_bytes=1_000_000, | ||
... created_files=10, | ||
... created_dynamic_partitions=1, | ||
... ), | ||
... executor=SparkExecutorMetrics( | ||
... run_time_milliseconds=1_000, | ||
... cpu_time_nanoseconds=2_000_000_000, | ||
... peak_memory_bytes=1_000_000_000, | ||
... ), | ||
... ) | ||
... ) | ||
>>> print(result1.details) | ||
Metrics: | ||
Input: | ||
Read rows: 1000 | ||
Read files: 10 | ||
Read size: 1.0 MB | ||
Scan size: 2.0 MB | ||
Dynamic partition pruning: True | ||
Read partitions: 3 | ||
Output: | ||
Written rows: 1000 | ||
Written size: 1.0 MB | ||
Created files: 10 | ||
Created dynamic partitions: 1 | ||
Executor: | ||
Run time: 1.0 ms | ||
CPU time: 2.0 ms | ||
Peak memory: 1.0 MB | ||
>>> result2 = DBWriterResult() | ||
>>> print(result2.details) | ||
Metrics: No data | ||
""" | ||
if self.metrics.is_empty: | ||
return "Metrics: No data" | ||
|
||
return "Metrics:" + os.linesep + textwrap.indent(self.metrics.details, INDENT) | ||
|
||
def __str__(self): | ||
"""Same as :obj:`onetl.db.db_writer.result.DBWriterResult.details`""" | ||
return self.details |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,15 @@ | ||
# SPDX-FileCopyrightText: 2021-2024 MTS (Mobile Telesystems) | ||
# SPDX-License-Identifier: Apache-2.0 | ||
from onetl.metrics.collector import SparkMetricsCollector | ||
from onetl.metrics.executor import SparkExecutorMetrics | ||
from onetl.metrics.input import SparkInputMetrics | ||
from onetl.metrics.metrics import SparkMetrics | ||
from onetl.metrics.output import SparkOutputMetrics | ||
|
||
__all__ = [ | ||
"SparkMetrics", | ||
"SparkMetricsCollector", | ||
"SparkExecutorMetrics", | ||
"SparkInputMetrics", | ||
"SparkOutputMetrics", | ||
] |
Oops, something went wrong.