From eb3177fa25b629f7e08a856b6056edd0b093aadd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=9C=D0=B0=D1=80=D1=82=D1=8B=D0=BD=D0=BE=D0=B2=20=D0=9C?= =?UTF-8?q?=D0=B0=D0=BA=D1=81=D0=B8=D0=BC=20=D0=A1=D0=B5=D1=80=D0=B3=D0=B5?= =?UTF-8?q?=D0=B5=D0=B2=D0=B8=D1=87?= Date: Tue, 12 Nov 2024 14:14:19 +0000 Subject: [PATCH] [DOP-21408] Allow using etl-entities==2.4.0 --- docs/changelog/next_release/+.dependency.rst | 1 + onetl/connection/db_connection/kafka/connection.py | 7 ++++--- requirements/core.txt | 2 +- .../test_strategy_increment_kafka.py | 3 +-- 4 files changed, 7 insertions(+), 6 deletions(-) create mode 100644 docs/changelog/next_release/+.dependency.rst diff --git a/docs/changelog/next_release/+.dependency.rst b/docs/changelog/next_release/+.dependency.rst new file mode 100644 index 000000000..76811d78a --- /dev/null +++ b/docs/changelog/next_release/+.dependency.rst @@ -0,0 +1 @@ +Allow using ``etl-entities==2.4.0``. diff --git a/onetl/connection/db_connection/kafka/connection.py b/onetl/connection/db_connection/kafka/connection.py index 71fa82bd3..7765936c5 100644 --- a/onetl/connection/db_connection/kafka/connection.py +++ b/onetl/connection/db_connection/kafka/connection.py @@ -527,13 +527,14 @@ def get_min_max_values( max_offsets[partition_id] = end_offset log.info("|%s| Received min and max offset values for each partition.", self.__class__.__name__) - for partition_id in sorted(min_offsets.keys()): + partitions = sorted(set(min_offsets.keys() | max_offsets.keys())) + for partition_id in partitions: log.debug( "|%s| Partition %d: Min Offset = %d, Max Offset = %d", self.__class__.__name__, partition_id, - min_offsets[partition_id], - max_offsets[partition_id], + min_offsets.get(partition_id), + max_offsets.get(partition_id), ) return min_offsets, max_offsets diff --git a/requirements/core.txt b/requirements/core.txt index c8e245a4a..5d915ebea 100644 --- a/requirements/core.txt +++ b/requirements/core.txt @@ -1,4 +1,4 @@ -etl-entities>=2.2,<2.4 +etl-entities>=2.2,<2.5 evacuator>=1.0,<1.1 frozendict humanize diff --git a/tests/tests_integration/tests_strategy_integration/tests_incremental_strategy_integration/test_strategy_increment_kafka.py b/tests/tests_integration/tests_strategy_integration/tests_incremental_strategy_integration/test_strategy_increment_kafka.py index 7ae146cca..d506394d0 100644 --- a/tests/tests_integration/tests_strategy_integration/tests_incremental_strategy_integration/test_strategy_increment_kafka.py +++ b/tests/tests_integration/tests_strategy_integration/tests_incremental_strategy_integration/test_strategy_increment_kafka.py @@ -28,7 +28,6 @@ def test_kafka_strategy_incremental( ): from pyspark.sql.functions import max as spark_max - hwm_type = KeyValueIntHWM hwm_name = secrets.token_hex(5) store = HWMStoreStackManager.get_current() @@ -77,7 +76,7 @@ def test_kafka_strategy_incremental( hwm = store.get_hwm(hwm_name) assert hwm is not None - assert isinstance(hwm, hwm_type) + assert isinstance(hwm, KeyValueIntHWM) # HWM contains mapping `partition: max offset + 1` partition_offsets_initial = dict.fromkeys(range(num_partitions or 1), 0)