From 480822a69723b9bc165b8ca89178a0a68df16706 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?M=C3=A1ty=C3=A1s=20Kuti?= Date: Wed, 10 Jan 2024 15:06:49 +0100 Subject: [PATCH] Remove consumer assignment for backups When instantiating the consumer we already a subscribe to the topic that is to be backed up (ie. __schemas). Using assign on top of that causes problems - once the assignment actually takes effect, the consumer starts to consume from the beginning, ultimately resulting in an `InconsistentOffset` error. The default subscribe is enough at the moment, as we do not support backing up a miltui-partition topic. --- karapace/backup/api.py | 1 - tests/integration/kafka/test_consumer.py | 44 ++++++++++++++++++++++++ 2 files changed, 44 insertions(+), 1 deletion(-) diff --git a/karapace/backup/api.py b/karapace/backup/api.py index 0d9248fbd..10ae7d6c1 100644 --- a/karapace/backup/api.py +++ b/karapace/backup/api.py @@ -530,7 +530,6 @@ def create_backup( with _consumer(config, topic_name) as consumer: (partition,) = consumer.partitions_for_topic(topic_name) topic_partition = TopicPartition(topic_name, partition) - consumer.assign([topic_partition]) try: data_file = _write_partition( diff --git a/tests/integration/kafka/test_consumer.py b/tests/integration/kafka/test_consumer.py index 21c547415..e009a8031 100644 --- a/tests/integration/kafka/test_consumer.py +++ b/tests/integration/kafka/test_consumer.py @@ -12,6 +12,7 @@ from tests.integration.utils.kafka_server import KafkaServers import pytest +import time class TestPartitionsForTopic: @@ -77,3 +78,46 @@ def test_get_watermark_offsets_topic_with_one_message( assert beginning == 0 assert end == 1 + + +def test_assign_after_subscribe_consumes_from_beginning( + producer: KafkaProducer, + kafka_servers: KafkaServers, + new_topic: NewTopic, +) -> None: + """This test is an example of how **not** to use a consumer. + + Instantiating the consumer subscribes to the given topic. However, having + assign called afterwards with a `TopicPartition` with the default offset + will make the consumer (eventually) reset and start consuming from the + beginning. + """ + number_of_messages = 1000 + for i in range(number_of_messages): + producer.send( + new_topic.topic, + key=f"message-key-{i}", + value=f"message-value-{i}-" + 1000 * "X", + ) + producer.flush() + + consumer = KafkaConsumer( + bootstrap_servers=kafka_servers.bootstrap_servers, + topic=new_topic.topic, + enable_auto_commit=False, + auto_offset_reset="earliest", + ) + consumer.assign([TopicPartition(new_topic.topic, partition=0)]) + + out_of_order_message_seen = False + last_offset = -1 + + for i in range(number_of_messages): + time.sleep(0.01) # To simulate processing of message + message = consumer.poll(60) + out_of_order_message_seen = message.offset() <= last_offset + if out_of_order_message_seen: + break + last_offset = message.offset() + + assert out_of_order_message_seen