Skip to content

Commit

Permalink
Remove consumer assignment for backups
Browse files Browse the repository at this point in the history
When instantiating the consumer we already a subscribe to the topic that
is to be backed up (ie. __schemas). Using assign on top of that causes
problems - once the assignment actually takes effect, the consumer
starts to consume from the beginning, ultimately resulting in an
`InconsistentOffset` error.

The default subscribe is enough at the moment, as we do not support
backing up a multi-partition topic.
  • Loading branch information
Mátyás Kuti committed Jan 11, 2024
1 parent 7fde318 commit a30608f
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 2 deletions.
1 change: 0 additions & 1 deletion karapace/backup/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -530,7 +530,6 @@ def create_backup(
with _consumer(config, topic_name) as consumer:
(partition,) = consumer.partitions_for_topic(topic_name)
topic_partition = TopicPartition(topic_name, partition)
consumer.assign([topic_partition])

try:
data_file = _write_partition(
Expand Down
77 changes: 76 additions & 1 deletion tests/integration/backup/test_v3_backup.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,16 @@
from dataclasses import fields
from kafka.errors import UnknownTopicOrPartitionError
from karapace.backup import api
from karapace.backup.api import _consume_records, TopicName
from karapace.backup.api import _consume_records, BackupVersion, TopicName
from karapace.backup.backends.v3.errors import InconsistentOffset
from karapace.backup.backends.v3.readers import read_metadata
from karapace.backup.backends.v3.schema import Metadata
from karapace.backup.errors import BackupDataRestorationError, EmptyPartition
from karapace.backup.poll_timeout import PollTimeout
from karapace.backup.topic_configurations import ConfigSource, get_topic_configurations
from karapace.config import Config, set_config_defaults
from karapace.kafka.admin import KafkaAdminClient
from karapace.kafka.consumer import KafkaConsumer
from karapace.kafka.producer import KafkaProducer
from karapace.kafka.types import Timestamp
from karapace.kafka_utils import kafka_consumer_from_config, kafka_producer_from_config
Expand All @@ -36,6 +38,7 @@
import shutil
import subprocess
import textwrap
import time

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -1043,3 +1046,75 @@ def test_gives_non_successful_exit_code_for_legacy_backup_format(
assert cp.returncode == 1
assert cp.stderr.decode() == error_message
assert cp.stdout == b""


def test_backup_creation_succeeds_no_duplicate_offsets(
kafka_servers: KafkaServers,
producer: KafkaProducer,
new_topic: NewTopic,
tmp_path: Path,
) -> None:
"""This test was added to prevent a regression scenario of duplicate offsets.
After introducing the confluent-kafka based consumer in the backups code,
backing up large topics would result in duplicate offsets, which would fail
the backup creation. The original code called `assign(TopicPartition(...))`
after `subscribe`, resulting in an eventual reset of the consumer.
The issue occurred only for large topics, as these had sufficient amount of
data for the consumer to reset before the backup was completed.
In this test a "large" topic is simulated by slowing down the consumer.
"""
for i in range(1000):
producer.send(
new_topic.topic,
key=f"message-key-{i}",
value=f"message-value-{i}-" + 1000 * "X",
)
producer.flush()

backup_location = tmp_path / "fails.log"
config = set_config_defaults(
{
"bootstrap_uri": kafka_servers.bootstrap_servers,
"topic_name": new_topic.topic,
}
)

class SlowConsumer(KafkaConsumer):
def poll(self, *args, **kwargs):
# Slow down the consumer so more time is given for `assign` to kick in
# This simulates a backup of a _large_ topic
time.sleep(0.01)
return super().poll(*args, **kwargs)

class ConsumerContext:
def __init__(self):
self._consumer = SlowConsumer(
bootstrap_servers=kafka_servers.bootstrap_servers,
topic=new_topic.topic,
enable_auto_commit=False,
auto_offset_reset="earliest",
)
# Compare with `test_backup_creation_fails_with_consumer_subscribe_assign`,
# we do not call `assign` on the consumer here.

def __enter__(self):
return self._consumer

def __exit__(self, exc_type, exc_value, exc_traceback):
self._consumer.close()

with patch("karapace.backup.api._consumer") as consumer_patch:
consumer_patch.return_value = ConsumerContext()
try:
api.create_backup(
config=config,
backup_location=backup_location,
topic_name=api.normalize_topic_name(None, config),
version=BackupVersion.V3,
replication_factor=1,
)
except InconsistentOffset as exc:
pytest.fail(f"Unexpected InconsistentOffset error {exc}")

0 comments on commit a30608f

Please sign in to comment.