diff --git a/karapace/backup/api.py b/karapace/backup/api.py index 0d9248fbd..10ae7d6c1 100644 --- a/karapace/backup/api.py +++ b/karapace/backup/api.py @@ -530,7 +530,6 @@ def create_backup( with _consumer(config, topic_name) as consumer: (partition,) = consumer.partitions_for_topic(topic_name) topic_partition = TopicPartition(topic_name, partition) - consumer.assign([topic_partition]) try: data_file = _write_partition( diff --git a/tests/integration/backup/test_v3_backup.py b/tests/integration/backup/test_v3_backup.py index 054be6c7a..92db2aaa2 100644 --- a/tests/integration/backup/test_v3_backup.py +++ b/tests/integration/backup/test_v3_backup.py @@ -9,7 +9,8 @@ from dataclasses import fields from kafka.errors import UnknownTopicOrPartitionError from karapace.backup import api -from karapace.backup.api import _consume_records, TopicName +from karapace.backup.api import _consume_records, BackupVersion, TopicName +from karapace.backup.backends.v3.errors import InconsistentOffset from karapace.backup.backends.v3.readers import read_metadata from karapace.backup.backends.v3.schema import Metadata from karapace.backup.errors import BackupDataRestorationError, EmptyPartition @@ -17,6 +18,7 @@ from karapace.backup.topic_configurations import ConfigSource, get_topic_configurations from karapace.config import Config, set_config_defaults from karapace.kafka.admin import KafkaAdminClient +from karapace.kafka.consumer import KafkaConsumer from karapace.kafka.producer import KafkaProducer from karapace.kafka.types import Timestamp from karapace.kafka_utils import kafka_consumer_from_config, kafka_producer_from_config @@ -36,6 +38,7 @@ import shutil import subprocess import textwrap +import time logger = logging.getLogger(__name__) @@ -1043,3 +1046,75 @@ def test_gives_non_successful_exit_code_for_legacy_backup_format( assert cp.returncode == 1 assert cp.stderr.decode() == error_message assert cp.stdout == b"" + + +def test_backup_creation_succeeds_no_duplicate_offsets( + kafka_servers: KafkaServers, + producer: KafkaProducer, + new_topic: NewTopic, + tmp_path: Path, +) -> None: + """This test was added to prevent a regression scenario of duplicate offsets. + + After introducing the confluent-kafka based consumer in the backups code, + backing up large topics would result in duplicate offsets, which would fail + the backup creation. The original code called `assign(TopicPartition(...))` + after `subscribe`, resulting in an eventual reset of the consumer. + + The issue occurred only for large topics, as these had sufficient amount of + data for the consumer to reset before the backup was completed. + + In this test a "large" topic is simulated by slowing down the consumer. + """ + for i in range(1000): + producer.send( + new_topic.topic, + key=f"message-key-{i}", + value=f"message-value-{i}-" + 1000 * "X", + ) + producer.flush() + + backup_location = tmp_path / "fails.log" + config = set_config_defaults( + { + "bootstrap_uri": kafka_servers.bootstrap_servers, + "topic_name": new_topic.topic, + } + ) + + class SlowConsumer(KafkaConsumer): + def poll(self, *args, **kwargs): + # Slow down the consumer so more time is given for `assign` to kick in + # This simulates a backup of a _large_ topic + time.sleep(0.01) + return super().poll(*args, **kwargs) + + class ConsumerContext: + def __init__(self): + self._consumer = SlowConsumer( + bootstrap_servers=kafka_servers.bootstrap_servers, + topic=new_topic.topic, + enable_auto_commit=False, + auto_offset_reset="earliest", + ) + # Compare with `test_backup_creation_fails_with_consumer_subscribe_assign`, + # we do not call `assign` on the consumer here. + + def __enter__(self): + return self._consumer + + def __exit__(self, exc_type, exc_value, exc_traceback): + self._consumer.close() + + with patch("karapace.backup.api._consumer") as consumer_patch: + consumer_patch.return_value = ConsumerContext() + try: + api.create_backup( + config=config, + backup_location=backup_location, + topic_name=api.normalize_topic_name(None, config), + version=BackupVersion.V3, + replication_factor=1, + ) + except InconsistentOffset as exc: + pytest.fail(f"Unexpected InconsistentOffset error {exc}")