Skip to content

Commit

Permalink
Remove consumer assignment for backups
Browse files Browse the repository at this point in the history
When instantiating the consumer we already a subscribe to the topic that
is to be backed up (ie. __schemas). Using assign on top of that causes
problems - once the assignment actually takes effect, the consumer
starts to consume from the beginning, ultimately resulting in an
`InconsistentOffset` error.

The default subscribe is enough at the moment, as we do not support
backing up a miltui-partition topic.
  • Loading branch information
Mátyás Kuti committed Jan 10, 2024
1 parent 7fde318 commit 480822a
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 1 deletion.
1 change: 0 additions & 1 deletion karapace/backup/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -530,7 +530,6 @@ def create_backup(
with _consumer(config, topic_name) as consumer:
(partition,) = consumer.partitions_for_topic(topic_name)
topic_partition = TopicPartition(topic_name, partition)
consumer.assign([topic_partition])

try:
data_file = _write_partition(
Expand Down
44 changes: 44 additions & 0 deletions tests/integration/kafka/test_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from tests.integration.utils.kafka_server import KafkaServers

import pytest
import time


class TestPartitionsForTopic:
Expand Down Expand Up @@ -77,3 +78,46 @@ def test_get_watermark_offsets_topic_with_one_message(

assert beginning == 0
assert end == 1


def test_assign_after_subscribe_consumes_from_beginning(
producer: KafkaProducer,
kafka_servers: KafkaServers,
new_topic: NewTopic,
) -> None:
"""This test is an example of how **not** to use a consumer.
Instantiating the consumer subscribes to the given topic. However, having
assign called afterwards with a `TopicPartition` with the default offset
will make the consumer (eventually) reset and start consuming from the
beginning.
"""
number_of_messages = 1000
for i in range(number_of_messages):
producer.send(
new_topic.topic,
key=f"message-key-{i}",
value=f"message-value-{i}-" + 1000 * "X",
)
producer.flush()

consumer = KafkaConsumer(
bootstrap_servers=kafka_servers.bootstrap_servers,
topic=new_topic.topic,
enable_auto_commit=False,
auto_offset_reset="earliest",
)
consumer.assign([TopicPartition(new_topic.topic, partition=0)])

out_of_order_message_seen = False
last_offset = -1

for i in range(number_of_messages):
time.sleep(0.01) # To simulate processing of message
message = consumer.poll(60)
out_of_order_message_seen = message.offset() <= last_offset
if out_of_order_message_seen:
break
last_offset = message.offset()

assert out_of_order_message_seen

0 comments on commit 480822a

Please sign in to comment.