diff --git a/tests/topics/test_control_plane.py b/tests/topics/test_control_plane.py index 25258b61..0ad90043 100644 --- a/tests/topics/test_control_plane.py +++ b/tests/topics/test_control_plane.py @@ -2,6 +2,7 @@ import pytest +import ydb from ydb import issues @@ -56,6 +57,39 @@ async def test_alter_existed_topic(self, driver, topic_path): topic_after = await client.describe_topic(topic_path) assert topic_after.min_active_partitions == target_min_active_partitions + async def test_alter_auto_partitioning_settings(self, driver, topic_path): + client = driver.topic_client + + topic_before = await client.describe_topic(topic_path) + + expected = topic_before.auto_partitioning_settings + + expected.strategy = ydb.TopicAutoPartitioningStrategy.SCALE_UP + + await client.alter_topic( + topic_path, + alter_auto_partitioning_settings=ydb.TopicAlterAutoPartitioningSettings( + set_strategy=ydb.TopicAutoPartitioningStrategy.SCALE_UP, + ) + ) + + topic_after = await client.describe_topic(topic_path) + + assert topic_after.auto_partitioning_settings == expected + + expected.up_utilization_percent = 88 + + await client.alter_topic( + topic_path, + alter_auto_partitioning_settings=ydb.TopicAlterAutoPartitioningSettings( + set_up_utilization_percent=88, + ) + ) + + topic_after = await client.describe_topic(topic_path) + + assert topic_after.auto_partitioning_settings == expected + class TestTopicClientControlPlane: def test_create_topic(self, driver_sync, database): diff --git a/ydb/_grpc/grpcwrapper/ydb_topic.py b/ydb/_grpc/grpcwrapper/ydb_topic.py index 570bbb75..4e23d370 100644 --- a/ydb/_grpc/grpcwrapper/ydb_topic.py +++ b/ydb/_grpc/grpcwrapper/ydb_topic.py @@ -1017,7 +1017,7 @@ def from_proto(code: Optional[int]) -> Optional["AutoPartitioningStrategy"]: def to_public(self) -> ydb_topic_public_types.PublicAutoPartitioningStrategy: try: - ydb_topic_public_types.PublicAutoPartitioningStrategy(int(self)) + return ydb_topic_public_types.PublicAutoPartitioningStrategy(int(self)) except KeyError: return ydb_topic_public_types.PublicAutoPartitioningStrategy.UNSPECIFIED @@ -1183,7 +1183,7 @@ def from_proto(code: Optional[int]) -> Optional["MeteringMode"]: def to_public(self) -> ydb_topic_public_types.PublicMeteringMode: try: - ydb_topic_public_types.PublicMeteringMode(int(self)) + return ydb_topic_public_types.PublicMeteringMode(int(self)) except KeyError: return ydb_topic_public_types.PublicMeteringMode.UNSPECIFIED diff --git a/ydb/_grpc/grpcwrapper/ydb_topic_public_types.py b/ydb/_grpc/grpcwrapper/ydb_topic_public_types.py index 58e1a05b..37528a2f 100644 --- a/ydb/_grpc/grpcwrapper/ydb_topic_public_types.py +++ b/ydb/_grpc/grpcwrapper/ydb_topic_public_types.py @@ -82,18 +82,18 @@ class PublicAutoPartitioningStrategy(IntEnum): @dataclass class PublicAutoPartitioningSettings: - strategy: Optional["PublicAutoPartitioningStrategy"] = 0 + strategy: Optional["PublicAutoPartitioningStrategy"] = None stabilization_window: Optional[datetime.timedelta] = None - up_utilization_percent: Optional[int] = None down_utilization_percent: Optional[int] = None + up_utilization_percent: Optional[int] = None @dataclass class PublicAlterAutoPartitioningSettings: - set_strategy: Optional["PublicAutoPartitioningStrategy"] = 0 + set_strategy: Optional["PublicAutoPartitioningStrategy"] = None set_stabilization_window: Optional[datetime.timedelta] = None - set_up_utilization_percent: Optional[int] = None set_down_utilization_percent: Optional[int] = None + set_up_utilization_percent: Optional[int] = None @dataclass diff --git a/ydb/_topic_reader/topic_reader_asyncio.py b/ydb/_topic_reader/topic_reader_asyncio.py index b42c1db2..76461959 100644 --- a/ydb/_topic_reader/topic_reader_asyncio.py +++ b/ydb/_topic_reader/topic_reader_asyncio.py @@ -335,6 +335,8 @@ async def _start(self, stream: IGrpcWrapperAsyncIO, init_message: StreamReadMess self._started = True self._stream = stream + print(init_message) + stream.write(StreamReadMessage.FromClient(client_message=init_message)) init_response = await stream.receive() # type: StreamReadMessage.FromServer if isinstance(init_response.server_message, StreamReadMessage.InitResponse): @@ -586,6 +588,10 @@ def _on_end_partition_session(self, message: StreamReadMessage.EndPartitionSessi f"End partition session with id: {message.partition_session_id}, child partitions: {message.child_partition_ids}" ) + print( + f"End partition session with id: {message.partition_session_id}, child partitions: {message.child_partition_ids}" + ) + def _on_read_response(self, message: StreamReadMessage.ReadResponse): self._buffer_consume_bytes(message.bytes_size) diff --git a/ydb/topic.py b/ydb/topic.py index 042d5fe9..f0607aca 100644 --- a/ydb/topic.py +++ b/ydb/topic.py @@ -7,6 +7,9 @@ "TopicCodec", "TopicConsumer", "TopicAlterConsumer", + "TopicAlterAutoPartitioningSettings", + "TopicAutoPartitioningSettings", + "TopicAutoPartitioningStrategy", "TopicDescription", "TopicError", "TopicMeteringMode",