Skip to content

Commit

Permalink
[DOP-21408] Allow using etl-entities==2.4.0
Browse files Browse the repository at this point in the history
  • Loading branch information
dolfinus committed Nov 12, 2024
1 parent 91b83b1 commit eb3177f
Show file tree
Hide file tree
Showing 4 changed files with 7 additions and 6 deletions.
1 change: 1 addition & 0 deletions docs/changelog/next_release/+.dependency.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Allow using ``etl-entities==2.4.0``.
7 changes: 4 additions & 3 deletions onetl/connection/db_connection/kafka/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -527,13 +527,14 @@ def get_min_max_values(
max_offsets[partition_id] = end_offset

log.info("|%s| Received min and max offset values for each partition.", self.__class__.__name__)
for partition_id in sorted(min_offsets.keys()):
partitions = sorted(set(min_offsets.keys() | max_offsets.keys()))
for partition_id in partitions:
log.debug(
"|%s| Partition %d: Min Offset = %d, Max Offset = %d",
self.__class__.__name__,
partition_id,
min_offsets[partition_id],
max_offsets[partition_id],
min_offsets.get(partition_id),
max_offsets.get(partition_id),
)

return min_offsets, max_offsets
Expand Down
2 changes: 1 addition & 1 deletion requirements/core.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
etl-entities>=2.2,<2.4
etl-entities>=2.2,<2.5
evacuator>=1.0,<1.1
frozendict
humanize
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ def test_kafka_strategy_incremental(
):
from pyspark.sql.functions import max as spark_max

hwm_type = KeyValueIntHWM
hwm_name = secrets.token_hex(5)
store = HWMStoreStackManager.get_current()

Expand Down Expand Up @@ -77,7 +76,7 @@ def test_kafka_strategy_incremental(

hwm = store.get_hwm(hwm_name)
assert hwm is not None
assert isinstance(hwm, hwm_type)
assert isinstance(hwm, KeyValueIntHWM)

# HWM contains mapping `partition: max offset + 1`
partition_offsets_initial = dict.fromkeys(range(num_partitions or 1), 0)
Expand Down

0 comments on commit eb3177f

Please sign in to comment.