Skip to content

Commit

Permalink
Confluent-kafka consumer
Browse files Browse the repository at this point in the history
  • Loading branch information
Mátyás Kuti committed Nov 30, 2023
1 parent dd9e87e commit a434d3f
Show file tree
Hide file tree
Showing 21 changed files with 247 additions and 200 deletions.
25 changes: 11 additions & 14 deletions karapace/backup/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,24 +24,22 @@
from concurrent.futures import Future
from enum import Enum
from functools import partial
from kafka import KafkaConsumer
from kafka.consumer.fetcher import ConsumerRecord
from kafka.errors import KafkaError, TopicAlreadyExistsError
from kafka.structs import TopicPartition
from karapace import constants
from karapace.backup.backends.v1 import SchemaBackupV1Reader
from karapace.backup.backends.v2 import AnonymizeAvroWriter, SchemaBackupV2Reader, SchemaBackupV2Writer, V2_MARKER
from karapace.backup.backends.v3.backend import SchemaBackupV3Reader, SchemaBackupV3Writer, VerifyFailure, VerifySuccess
from karapace.config import Config
from karapace.kafka.admin import KafkaAdminClient
from karapace.kafka.producer import KafkaProducer
from karapace.kafka.admin import KafkaAdminClient, TopicPartition
from karapace.kafka.consumer import KafkaConsumer
from karapace.kafka.producer import KafkaProducer, Message
from karapace.kafka_utils import kafka_admin_from_config, kafka_consumer_from_config, kafka_producer_from_config
from karapace.key_format import KeyFormatter
from karapace.utils import assert_never
from pathlib import Path
from rich.console import Console
from tenacity import retry, retry_if_exception_type, RetryCallState, stop_after_delay, wait_fixed
from typing import Callable, Collection, Iterator, Literal, Mapping, NewType, Sized, TypeVar
from typing import Callable, Iterator, Literal, Mapping, NewType, Sized, TypeVar

import contextlib
import datetime
Expand Down Expand Up @@ -282,9 +280,8 @@ def _consume_records(
consumer: KafkaConsumer,
topic_partition: TopicPartition,
poll_timeout: PollTimeout,
) -> Iterator[ConsumerRecord]:
start_offset: int = consumer.beginning_offsets([topic_partition])[topic_partition]
end_offset: int = consumer.end_offsets([topic_partition])[topic_partition]
) -> Iterator[Message]:
start_offset, end_offset = consumer.get_watermark_offsets(topic_partition)
last_offset = start_offset

LOG.info(
Expand All @@ -301,12 +298,11 @@ def _consume_records(
end_offset -= 1 # high watermark to actual end offset

while True:
records: Collection[ConsumerRecord] = consumer.poll(poll_timeout.milliseconds).get(topic_partition, [])
if len(records) == 0:
record: Message | None = consumer.poll(timeout=poll_timeout.seconds)
if record is None:
raise StaleConsumerError(topic_partition, start_offset, end_offset, last_offset, poll_timeout)
for record in records:
yield record
last_offset = record.offset # pylint: disable=undefined-loop-variable
yield record
last_offset = record.offset()
if last_offset >= end_offset:
break

Expand Down Expand Up @@ -528,6 +524,7 @@ def create_backup(
with _consumer(config, topic_name) as consumer:
(partition,) = consumer.partitions_for_topic(topic_name)
topic_partition = TopicPartition(topic_name, partition)
consumer.assign([topic_partition])

try:
data_file = _write_partition(
Expand Down
18 changes: 9 additions & 9 deletions karapace/backup/backends/v3/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@
from .schema import ChecksumAlgorithm, DataFile, Header, Metadata, Record
from .writers import write_metadata, write_record
from dataclasses import dataclass
from kafka.consumer.fetcher import ConsumerRecord
from karapace.backup.backends.reader import BaseBackupReader, Instruction, ProducerSend, RestoreTopic
from karapace.backup.backends.writer import BytesBackupWriter, StdOut
from karapace.backup.safe_writer import bytes_writer, staging_directory
from karapace.dataclasses import default_dataclass
from karapace.kafka.producer import Message
from karapace.utils import assert_never
from karapace.version import __version__
from pathlib import Path
Expand Down Expand Up @@ -334,9 +334,9 @@ def store_metadata(
def store_record(
self,
buffer: IO[bytes],
record: ConsumerRecord,
record: Message,
) -> None:
stats: Final = self._partition_stats[record.partition]
stats: Final = self._partition_stats[record.partition()]
checksum_checkpoint: Final = stats.get_checkpoint(
records_threshold=self._max_records_per_checkpoint,
bytes_threshold=self._max_bytes_per_checkpoint,
Expand All @@ -345,16 +345,16 @@ def store_record(
write_record(
buffer,
record=Record(
key=record.key,
value=record.value,
headers=tuple(Header(key=key.encode(), value=value) for key, value in record.headers),
offset=record.offset,
timestamp=record.timestamp,
key=record.key(),
value=record.value(),
headers=tuple(Header(key=key.encode(), value=value) for key, value in record.headers() or []),
offset=record.offset(),
timestamp=record.timestamp()[1],
checksum_checkpoint=checksum_checkpoint,
),
running_checksum=stats.running_checksum,
)
stats.update(
bytes_offset=buffer.tell() - offset_start,
record_offset=record.offset,
record_offset=record.offset(),
)
8 changes: 4 additions & 4 deletions karapace/backup/backends/writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
"""
from __future__ import annotations

from kafka.consumer.fetcher import ConsumerRecord
from karapace.backup.safe_writer import bytes_writer, str_writer
from karapace.kafka.producer import Message
from pathlib import Path
from typing import ContextManager, Generic, IO, Iterator, Literal, Mapping, Sequence, TypeVar
from typing_extensions import TypeAlias
Expand Down Expand Up @@ -98,7 +98,7 @@ def store_metadata(
def store_record(
self,
buffer: IO[B],
record: ConsumerRecord,
record: Message,
) -> None:
"""
Called in order for each record read from a topic to be backed up. It's safe to
Expand Down Expand Up @@ -154,9 +154,9 @@ class BaseKVBackupWriter(StrBackupWriter, abc.ABC):
def store_record(
self,
buffer: IO[str],
record: ConsumerRecord,
record: Message,
) -> None:
buffer.write(self.serialize_record(record.key, record.value))
buffer.write(self.serialize_record(record.key(), record.value()))

@staticmethod
@abc.abstractmethod
Expand Down
2 changes: 1 addition & 1 deletion karapace/backup/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
Copyright (c) 2023 Aiven Ltd
See LICENSE for details
"""
from kafka.structs import TopicPartition
from karapace.backup.poll_timeout import PollTimeout
from karapace.kafka.admin import TopicPartition

__all__ = ["BackupError", "BackupTopicAlreadyExists", "EmptyPartition", "PartitionCountError", "StaleConsumerError"]

Expand Down
5 changes: 5 additions & 0 deletions karapace/backup/poll_timeout.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,3 +49,8 @@ def __repr__(self) -> str:
def milliseconds(self) -> int:
"""Returns this poll timeout in milliseconds, anything smaller than a milliseconds is ignored (no rounding)."""
return self.__value // timedelta(milliseconds=1)

@cached_property
def seconds(self) -> float:
"""Returns this poll timeout in seconds."""
return self.__value / timedelta(seconds=1)
16 changes: 15 additions & 1 deletion karapace/kafka/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from concurrent.futures import Future
from confluent_kafka.error import KafkaError, KafkaException
from kafka.errors import AuthenticationFailedError, for_code, NoBrokersAvailable, UnknownTopicOrPartitionError
from typing import Any, Callable, NoReturn, Protocol, TypedDict, TypeVar
from typing import Any, Callable, Literal, NoReturn, Protocol, TypedDict, TypeVar
from typing_extensions import Unpack

import logging
Expand Down Expand Up @@ -85,6 +85,13 @@ class KafkaClientParams(TypedDict, total=False):
ssl_certfile: str | None
ssl_keyfile: str | None
sasl_oauth_token_provider: TokenWithExpiryProvider
# Consumer-only
auto_offset_reset: Literal["smallest", "earliest", "beginning", "largest", "latest", "end", "error"]
fetch_max_wait_ms: int
enable_auto_commit: bool
group_id: str
request_timeout_ms: int
session_timeout_ms: int


class _KafkaConfigMixin:
Expand Down Expand Up @@ -128,6 +135,13 @@ def _get_config_from_params(self, bootstrap_servers: Iterable[str] | str, **para
"ssl.certificate.location": params.get("ssl_certfile"),
"ssl.key.location": params.get("ssl_keyfile"),
"error_cb": self._error_callback,
# Consumer-only
"auto.offset.reset": params.get("auto_offset_reset"),
"fetch.wait.max.ms": params.get("fetch_max_wait_ms"),
"enable.auto.commit": params.get("enable_auto_commit"),
"request.timeout.ms": params.get("request_timeout_ms"),
"session.timeout.ms": params.get("session_timeout_ms"),
"group.id": params.get("group_id"),
}
config = {key: value for key, value in config.items() if value is not None}

Expand Down
40 changes: 40 additions & 0 deletions karapace/kafka/consumer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
from __future__ import annotations

from confluent_kafka import Consumer
from confluent_kafka.admin import PartitionMetadata
from confluent_kafka.error import KafkaException
from karapace.kafka.common import _KafkaConfigMixin, KafkaClientParams, raise_from_kafkaexception
from typing import Iterable
from typing_extensions import Unpack

import secrets


class KafkaConsumer(_KafkaConfigMixin, Consumer):
def __init__(
self,
topic: str,
bootstrap_servers: Iterable[str] | str,
verify_connection: bool = True,
**params: Unpack[KafkaClientParams],
) -> None:
# The `confluent_kafka.Consumer` does not allow for a missing group id
# if the client of this class does not provide one, we'll generate a
# unique group id to achieve the groupless behaviour
if "group_id" not in params:
params["group_id"] = self._create_group_id()

super().__init__(bootstrap_servers, verify_connection, **params)

self.subscribe([topic])

@staticmethod
def _create_group_id() -> str:
return f"karapace-{secrets.token_hex(6)}"

def partitions_for_topic(self, topic: str) -> dict[int, PartitionMetadata]:
"""Returns all partition metadata for the given topic."""
try:
return self.list_topics(topic).topics[topic].partitions
except KafkaException as exc:
raise_from_kafkaexception(exc)
4 changes: 1 addition & 3 deletions karapace/kafka_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,8 @@
See LICENSE for details
"""
from .config import Config
from .utils import KarapaceKafkaClient
from kafka import KafkaConsumer
from karapace.kafka.admin import KafkaAdminClient
from karapace.kafka.consumer import KafkaConsumer
from karapace.kafka.producer import KafkaProducer
from typing import Iterator

Expand Down Expand Up @@ -42,7 +41,6 @@ def kafka_consumer_from_config(config: Config, topic: str) -> Iterator[KafkaCons
sasl_plain_password=config["sasl_plain_password"],
auto_offset_reset="earliest",
metadata_max_age_ms=config["metadata_max_age_ms"],
kafka_client=KarapaceKafkaClient,
)
try:
yield consumer
Expand Down
3 changes: 1 addition & 2 deletions karapace/master_coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
See LICENSE for details
"""
from dataclasses import dataclass
from kafka import KafkaConsumer
from kafka.coordinator.base import BaseCoordinator
from kafka.errors import NoBrokersAvailable, NodeNotReadyError
from kafka.metrics import MetricConfig, Metrics
Expand Down Expand Up @@ -238,7 +237,7 @@ def init_schema_coordinator(self) -> None:
election_strategy=self.config.get("master_election_strategy", "lowest"),
group_id=self.config["group_id"],
session_timeout_ms=session_timeout_ms,
request_timeout_ms=max(session_timeout_ms, KafkaConsumer.DEFAULT_CONFIG["request_timeout_ms"]),
request_timeout_ms=max(session_timeout_ms, 30000),
)
self.schema_coordinator_ready.set()

Expand Down
Loading

0 comments on commit a434d3f

Please sign in to comment.