diff --git a/karapace/kafka/common.py b/karapace/kafka/common.py index 0ca678e16..4941e2871 100644 --- a/karapace/kafka/common.py +++ b/karapace/kafka/common.py @@ -8,7 +8,14 @@ from collections.abc import Iterable from concurrent.futures import Future from confluent_kafka.error import KafkaError, KafkaException -from kafka.errors import AuthenticationFailedError, for_code, NoBrokersAvailable, UnknownTopicOrPartitionError +from kafka.errors import ( + AuthenticationFailedError, + for_code, + IllegalStateError, + KafkaTimeoutError, + NoBrokersAvailable, + UnknownTopicOrPartitionError, +) from typing import Any, Callable, Literal, NoReturn, Protocol, TypedDict, TypeVar from typing_extensions import Unpack @@ -47,6 +54,10 @@ def translate_from_kafkaerror(error: KafkaError) -> Exception: KafkaError._UNKNOWN_TOPIC, # pylint: disable=protected-access ): return UnknownTopicOrPartitionError() + if code == KafkaError._TIMED_OUT: # pylint: disable=protected-access + return KafkaTimeoutError() + if code == KafkaError._STATE: # pylint: disable=protected-access + return IllegalStateError() return for_code(code) @@ -89,12 +100,15 @@ class KafkaClientParams(TypedDict, total=False): ssl_crlfile: str | None ssl_keyfile: str | None sasl_oauth_token_provider: TokenWithExpiryProvider + topic_metadata_refresh_interval_ms: int | None # Consumer-only - auto_offset_reset: Literal["smallest", "earliest", "beginning", "largest", "latest", "end", "error"] - enable_auto_commit: bool - fetch_max_wait_ms: int - group_id: str - session_timeout_ms: int + auto_offset_reset: Literal["smallest", "earliest", "beginning", "largest", "latest", "end", "error"] | None + enable_auto_commit: bool | None + fetch_min_bytes: int | None + fetch_max_bytes: int | None + fetch_max_wait_ms: int | None + group_id: str | None + session_timeout_ms: int | None class _KafkaConfigMixin: @@ -142,10 +156,13 @@ def _get_config_from_params(self, bootstrap_servers: Iterable[str] | str, **para "ssl.certificate.location": params.get("ssl_certfile"), "ssl.crl.location": params.get("ssl_crlfile"), "ssl.key.location": params.get("ssl_keyfile"), + "topic.metadata.refresh.interval.ms": params.get("topic_metadata_refresh_interval_ms"), "error_cb": self._error_callback, # Consumer-only "auto.offset.reset": params.get("auto_offset_reset"), "enable.auto.commit": params.get("enable_auto_commit"), + "fetch.min.bytes": params.get("fetch_min_bytes"), + "fetch.max.bytes": params.get("fetch_max_bytes"), "fetch.wait.max.ms": params.get("fetch_max_wait_ms"), "group.id": params.get("group_id"), "session.timeout.ms": params.get("session_timeout_ms"), diff --git a/karapace/kafka/consumer.py b/karapace/kafka/consumer.py index d7315b378..745433bee 100644 --- a/karapace/kafka/consumer.py +++ b/karapace/kafka/consumer.py @@ -5,14 +5,15 @@ from __future__ import annotations -from confluent_kafka import Consumer, TopicPartition +from confluent_kafka import Consumer, Message, TopicPartition from confluent_kafka.admin import PartitionMetadata from confluent_kafka.error import KafkaException -from kafka.errors import KafkaTimeoutError +from kafka.errors import IllegalStateError, KafkaTimeoutError from karapace.kafka.common import _KafkaConfigMixin, KafkaClientParams, raise_from_kafkaexception -from typing import Iterable +from typing import Any, Callable, Iterable, TypeVar from typing_extensions import Unpack +import asyncio import secrets @@ -32,6 +33,7 @@ def __init__( super().__init__(bootstrap_servers, verify_connection, **params) + self._subscription: frozenset[str] = frozenset() if topic is not None: self.subscribe([topic]) @@ -67,3 +69,165 @@ def get_watermark_offsets( return result except KafkaException as exc: raise_from_kafkaexception(exc) + + def commit( + self, + message: Message | None = None, + offsets: list[TopicPartition] | None = None, + _asynchronous: bool = False, + ) -> list[TopicPartition] | None: + """Commit offsets based on a message or offsets (topic partitions). + + The `message` and `offsets` parameters are mutually exclusive, `message` + takes precedence. + """ + if message is not None and offsets is not None: + raise ValueError("Parameters message and offsets are mutually exclusive.") + + try: + if message is not None: + return super().commit(message=message, asynchronous=False) + + return super().commit(offsets=offsets, asynchronous=False) + except KafkaException as exc: + raise_from_kafkaexception(exc) + + def committed(self, partitions: list[TopicPartition], timeout: float | None = None) -> list[TopicPartition]: + try: + if timeout is not None: + return super().committed(partitions, timeout) + + return super().committed(partitions) + except KafkaException as exc: + raise_from_kafkaexception(exc) + + def subscribe( # type: ignore[override] + self, + topics: list[str] | None = None, + patterns: list[str] | None = None, + ) -> None: + """Subscribe to a list of topics and/or topic patterns. + + Subscriptions are not incremental. + For `Consumer.subscribe`, Topic patterns must start with "^", eg. + "^this-is-a-regex-[0-9]", thus we prefix all strings in the `patterns` + list with "^". + + The `on_assign` and `on_revoke` callbacks are set to keep track of + subscriptions (topics). + + More in the confluent-kafka documentation: + https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html#confluent_kafka.Consumer.subscribe + """ + topics = topics or [] + patterns = patterns or [] + self.log.info("Subscribing to topics %s and patterns %s", topics, patterns) + try: + super().subscribe( + topics + [f"^{pattern}" for pattern in patterns], + on_assign=self._on_assign, + on_revoke=self._on_revoke, + ) + except KafkaException as exc: + raise_from_kafkaexception(exc) + + def _on_assign(self, _consumer: Consumer, partitions: list[TopicPartition]) -> None: + topics = frozenset(partition.topic for partition in partitions) + self._subscription = self._subscription.union(topics) + + def _on_revoke(self, _consumer: Consumer, partitions: list[TopicPartition]) -> None: + topics = frozenset(partition.topic for partition in partitions) + self._subscription = self._subscription.difference(topics) + + def subscription(self) -> frozenset[str]: + """Returns the list of topic names the consumer is subscribed to. + + The topic list is maintained by the `_on_assign` and `_on_revoke` callback + methods, which are set in `subscribe`. These callbacks are only called + when `poll` is called. + """ + return self._subscription + + def unsubscribe(self) -> None: + try: + super().unsubscribe() + except KafkaException as exc: + raise_from_kafkaexception(exc) + + def assign(self, partitions: list[TopicPartition]) -> None: + """Assign a list of topic partitions to the consumer. + + Raises an `IllegalStateError` if `subscribe` has been previously called. + This is to match previous behaviour from `aiokafka`. + """ + if self._subscription: + raise IllegalStateError + + try: + super().assign(partitions) + except KafkaException as exc: + raise_from_kafkaexception(exc) + + def assignment(self) -> list[TopicPartition]: + try: + return super().assignment() + except KafkaException as exc: + raise_from_kafkaexception(exc) + + def seek(self, partition: TopicPartition) -> None: + try: + super().seek(partition) + except KafkaException as exc: + raise_from_kafkaexception(exc) + + +T = TypeVar("T") + + +class AIOKafkaConsumer: + def __init__( + self, + bootstrap_servers: Iterable[str] | str, + topic: str | None = None, + loop: asyncio.AbstractEventLoop | None = None, + **params: Unpack[KafkaClientParams], + ) -> None: + self.loop = loop or asyncio.get_running_loop() + self.consumer = KafkaConsumer(bootstrap_servers, topic=topic, **params) + + async def _run_in_executor(self, func: Callable[..., T], *args: Any) -> T: + return await self.loop.run_in_executor(None, func, *args) + + async def poll(self, timeout: float) -> Message | None: + return await self._run_in_executor(self.consumer.poll, timeout) + + async def commit( + self, + message: Message | None = None, + offsets: list[TopicPartition] | None = None, + ) -> list[TopicPartition] | None: + return await self._run_in_executor(self.consumer.commit, message, offsets) + + async def close(self) -> None: + return await self._run_in_executor(self.consumer.close) + + async def subscribe(self, topics: list[str] | None = None, patterns: list[str] | None = None) -> None: + return await self._run_in_executor(self.consumer.subscribe, topics, patterns) + + async def committed(self, partitions: list[TopicPartition], timeout: float | None = None) -> list[TopicPartition]: + return await self._run_in_executor(self.consumer.committed, partitions, timeout) + + def subscription(self) -> frozenset[str]: + return self.consumer.subscription() + + async def unsubscribe(self) -> None: + return await self._run_in_executor(self.consumer.unsubscribe) + + async def assign(self, partitions: list[TopicPartition]) -> None: + return await self._run_in_executor(self.consumer.assign, partitions) + + async def assignment(self) -> list[TopicPartition]: + return await self._run_in_executor(self.consumer.assignment) + + async def seek(self, partition: TopicPartition) -> None: + return await self._run_in_executor(self.consumer.seek, partition) diff --git a/karapace/kafka/types.py b/karapace/kafka/types.py index a844e74ba..ae1e7b476 100644 --- a/karapace/kafka/types.py +++ b/karapace/kafka/types.py @@ -8,10 +8,15 @@ import enum -# A constant that corresponds to the default value of request.timeout.ms in -# the librdkafka C library +# request.timeout.ms default from librdkafka DEFAULT_REQUEST_TIMEOUT_MS: Final = 30000 +# fetch.wait.max.ms default from librdkafka +FETCH_WAIT_MAX_MS: Final = 500 + +# fetch.max.bytes default from librdkafka +FETCH_MAX_BYTES: Final = 52428800 + class Timestamp(enum.IntEnum): NOT_AVAILABLE = TIMESTAMP_NOT_AVAILABLE diff --git a/karapace/kafka_rest_apis/consumer_manager.py b/karapace/kafka_rest_apis/consumer_manager.py index 26b927b8b..fd07aa502 100644 --- a/karapace/kafka_rest_apis/consumer_manager.py +++ b/karapace/kafka_rest_apis/consumer_manager.py @@ -2,20 +2,25 @@ Copyright (c) 2023 Aiven Ltd See LICENSE for details """ -from aiokafka import AIOKafkaConsumer from asyncio import Lock from collections import defaultdict, namedtuple +from confluent_kafka import OFFSET_BEGINNING, OFFSET_END, TopicPartition from functools import partial from http import HTTPStatus from kafka.errors import ( + AuthenticationFailedError, GroupAuthorizationFailedError, IllegalStateError, KafkaConfigurationError, KafkaError, + NoBrokersAvailable, TopicAuthorizationFailedError, + UnknownTopicOrPartitionError, ) -from kafka.structs import TopicPartition -from karapace.config import Config, create_client_ssl_context +from karapace.config import Config +from karapace.kafka.common import translate_from_kafkaerror +from karapace.kafka.consumer import AIOKafkaConsumer +from karapace.kafka.types import DEFAULT_REQUEST_TIMEOUT_MS, FETCH_MAX_BYTES, FETCH_WAIT_MAX_MS from karapace.kafka_rest_apis.authentication import get_kafka_client_auth_parameters_from_config from karapace.kafka_rest_apis.error_codes import RESTErrorCodes from karapace.karapace import empty_response, KarapaceBase @@ -198,35 +203,34 @@ async def create_consumer(self, group_name: str, request_data: dict, content_typ KarapaceBase.r(content_type=content_type, body={"base_uri": consumer_base_uri, "instance_id": consumer_name}) async def create_kafka_consumer(self, fetch_min_bytes, group_name, internal_name, request_data): - ssl_context = create_client_ssl_context(self.config) for retry in [True, True, False]: try: session_timeout_ms = self.config["session_timeout_ms"] request_timeout_ms = max( session_timeout_ms, - 305000, # Copy of old default from kafka-python's request_timeout_ms (not exposed by aiokafka) + DEFAULT_REQUEST_TIMEOUT_MS, request_data["consumer.request.timeout.ms"], ) - c = AIOKafkaConsumer( + return AIOKafkaConsumer( bootstrap_servers=self.config["bootstrap_uri"], + auto_offset_reset=request_data["auto.offset.reset"], client_id=internal_name, - security_protocol=self.config["security_protocol"], - ssl_context=ssl_context, - group_id=group_name, - fetch_min_bytes=max(1, fetch_min_bytes), # Discard earlier negative values - fetch_max_bytes=self.config["consumer_request_max_bytes"], - fetch_max_wait_ms=self.config.get("consumer_fetch_max_wait_ms", 500), # Copy aiokafka default 500 ms - # This will cause delay if subscription is changed. - consumer_timeout_ms=self.config.get("consumer_timeout_ms", 200), # Copy aiokafka default 200 ms - request_timeout_ms=request_timeout_ms, enable_auto_commit=request_data["auto.commit.enable"], - auto_offset_reset=request_data["auto.offset.reset"], + fetch_max_bytes=self.config["consumer_request_max_bytes"], + fetch_max_wait_ms=self.config.get("consumer_fetch_max_wait_ms", FETCH_WAIT_MAX_MS), + fetch_min_bytes=max(1, fetch_min_bytes), # Discard earlier negative values + group_id=group_name, + security_protocol=self.config["security_protocol"], session_timeout_ms=session_timeout_ms, + socket_timeout_ms=request_timeout_ms, + ssl_cafile=self.config["ssl_cafile"], + ssl_certfile=self.config["ssl_certfile"], + ssl_crlfile=self.config["ssl_crlfile"], + ssl_keyfile=self.config["ssl_keyfile"], + topic_metadata_refresh_interval_ms=request_data.get("topic.metadata.refresh.interval.ms"), **get_kafka_client_auth_parameters_from_config(self.config), ) - await c.start() - return c - except: # pylint: disable=bare-except + except (NoBrokersAvailable, AuthenticationFailedError): if retry: LOG.exception("Unable to create consumer, retrying") else: @@ -255,14 +259,14 @@ async def commit_offsets( self._assert_consumer_exists(internal_name, content_type) if request_data: self._assert_has_key(request_data, "offsets", content_type) - payload = {} + payload = [] for el in request_data.get("offsets", []): for k in ["partition", "offset"]: convert_to_int(el, k, content_type) # If we commit for a partition that does not belong to this consumer, then the internal error raised # is marked as retriable, and thus the commit method will remain blocked in what looks like an infinite loop self._topic_and_partition_valid(cluster_metadata, el, content_type) - payload[TopicPartition(el["topic"], el["partition"])] = el["offset"] + 1 + payload.append(TopicPartition(el["topic"], el["partition"], el["offset"] + 1)) async with self.consumer_locks[internal_name]: consumer = self.consumers[internal_name].consumer @@ -284,7 +288,7 @@ async def get_offsets(self, internal_name: Tuple[str, str], content_type: str, r convert_to_int(el, "partition", content_type) tp = TopicPartition(el["topic"], el["partition"]) try: - offset = await consumer.committed(tp) + [committed_partition] = await consumer.committed([tp]) except GroupAuthorizationFailedError: KarapaceBase.r(body={"message": "Forbidden"}, content_type=content_type, status=HTTPStatus.FORBIDDEN) except KafkaError as ex: @@ -292,14 +296,14 @@ async def get_offsets(self, internal_name: Tuple[str, str], content_type: str, r message=f"Failed to get offsets: {ex}", content_type=content_type, ) - if offset is None: + if committed_partition is None: continue response["offsets"].append( { "topic": tp.topic, "partition": tp.partition, "metadata": "", - "offset": offset, + "offset": committed_partition.offset, } ) KarapaceBase.r(body=response, content_type=content_type) @@ -324,9 +328,9 @@ async def set_subscription(self, internal_name: Tuple[str, str], content_type: s async with self.consumer_locks[internal_name]: consumer = self.consumers[internal_name].consumer try: - # aiokafka does not verify access to topics subscribed during this call, thus cannot get topic authorzation - # error immediately - consumer.subscribe(topics=topics, pattern=topics_pattern) + # the client does not verify access to topics subscribed during this call, + # thus cannot get topic authorzation error immediately + await consumer.subscribe(topics=topics, patterns=[topics_pattern] if topics_pattern is not None else None) empty_response() except IllegalStateError as e: self._illegal_state_fail(str(e), content_type=content_type) @@ -338,17 +342,13 @@ async def get_subscription(self, internal_name: Tuple[str, str], content_type: s self._assert_consumer_exists(internal_name, content_type) async with self.consumer_locks[internal_name]: consumer = self.consumers[internal_name].consumer - if consumer.subscription() is None: - topics = [] - else: - topics = list(consumer.subscription()) - KarapaceBase.r(content_type=content_type, body={"topics": topics}) + KarapaceBase.r(content_type=content_type, body={"topics": list(consumer.subscription())}) async def delete_subscription(self, internal_name: Tuple[str, str], content_type: str): LOG.info("Deleting subscription for %s", internal_name) self._assert_consumer_exists(internal_name, content_type) async with self.consumer_locks[internal_name]: - self.consumers[internal_name].consumer.unsubscribe() + await self.consumers[internal_name].consumer.unsubscribe() empty_response() # ASSIGNMENTS @@ -364,7 +364,7 @@ async def set_assignments(self, internal_name: Tuple[str, str], content_type: st async with self.consumer_locks[internal_name]: try: consumer = self.consumers[internal_name].consumer - consumer.assign(partitions) + await consumer.assign(partitions) empty_response() except IllegalStateError as e: self._illegal_state_fail(message=str(e), content_type=content_type) @@ -378,7 +378,7 @@ async def get_assignments(self, internal_name: Tuple[str, str], content_type: st consumer = self.consumers[internal_name].consumer KarapaceBase.r( content_type=content_type, - body={"partitions": [{"topic": pd.topic, "partition": pd.partition} for pd in consumer.assignment()]}, + body={"partitions": [{"topic": pd.topic, "partition": pd.partition} for pd in await consumer.assignment()]}, ) # POSITIONS @@ -393,13 +393,13 @@ async def seek_to(self, internal_name: Tuple[str, str], content_type: str, reque self._assert_has_key(el, k, content_type) convert_to_int(el, k, content_type) self._assert_positive_number(el, "offset", content_type) - seeks.append((TopicPartition(topic=el["topic"], partition=el["partition"]), el["offset"])) + seeks.append(TopicPartition(topic=el["topic"], partition=el["partition"], offset=el["offset"])) async with self.consumer_locks[internal_name]: consumer = self.consumers[internal_name].consumer - for part, offset in seeks: + for part in seeks: try: - consumer.seek(part, offset) - except IllegalStateError: + await consumer.seek(part) + except (UnknownTopicOrPartitionError, IllegalStateError): self._illegal_state_fail(f"Partition {part} is unassigned", content_type) empty_response() @@ -415,17 +415,20 @@ async def seek_limit( convert_to_int(el, "partition", content_type) for k in ["topic", "partition"]: self._assert_has_key(el, k, content_type) - resets.append(TopicPartition(topic=el["topic"], partition=el["partition"])) + resets.append( + TopicPartition( + topic=el["topic"], + partition=el["partition"], + offset=OFFSET_BEGINNING if beginning else OFFSET_END, + ) + ) async with self.consumer_locks[internal_name]: consumer = self.consumers[internal_name].consumer try: - if beginning: - await consumer.seek_to_beginning(*resets) - else: - await consumer.seek_to_end(*resets) + await asyncio.gather(*(consumer.seek(topic_partition) for topic_partition in resets)) empty_response() - except AssertionError: + except (UnknownTopicOrPartitionError, IllegalStateError): self._illegal_state_fail(f"Trying to reset unassigned partitions to {direction}", content_type) async def fetch(self, internal_name: Tuple[str, str], content_type: str, formats: dict, query_params: dict): @@ -453,7 +456,7 @@ async def fetch(self, internal_name: Tuple[str, str], content_type: str, formats # we get to be more in line with the confluent proxy by doing a bunch of fetches each time and # respecting the max fetch request size # pylint: disable=protected-access - max_bytes = int(query_params["max_bytes"]) if "max_bytes" in query_params else consumer._fetch_max_bytes + max_bytes = int(query_params["max_bytes"]) if "max_bytes" in query_params else FETCH_MAX_BYTES except ValueError: KarapaceBase.internal_error(message=f"Invalid request parameters: {query_params}", content_type=content_type) for val in [timeout, max_bytes]: @@ -470,7 +473,7 @@ async def fetch(self, internal_name: Tuple[str, str], content_type: str, formats ) read_bytes = 0 start_time = time.monotonic() - poll_data = defaultdict(list) + poll_data = [] message_count = 0 # Read buffered records with calling getmany() with possibly zero timeout and max_records=1 multiple times read_buffered = True @@ -486,7 +489,11 @@ async def fetch(self, internal_name: Tuple[str, str], content_type: str, formats ) timeout_left = max(0, (start_time - time.monotonic()) * 1000 + timeout) try: - data = await consumer.getmany(timeout_ms=timeout_left, max_records=1) + message = await consumer.poll(timeout=timeout_left / 1000) + if message is None: + continue + if message.error() is not None: + raise translate_from_kafkaerror(message.error()) except (GroupAuthorizationFailedError, TopicAuthorizationFailedError): KarapaceBase.r(body={"message": "Forbidden"}, content_type=content_type, status=HTTPStatus.FORBIDDEN) except KafkaError as ex: @@ -495,47 +502,44 @@ async def fetch(self, internal_name: Tuple[str, str], content_type: str, formats content_type=content_type, ) LOG.debug("Successfully polled for messages") - for topic, records in data.items(): - for rec in records: - message_count += 1 - read_bytes += max(0, 0 if rec.key is None else len(rec.key)) + max( - 0, 0 if rec.value is None else len(rec.value) - ) - poll_data[topic].append(rec) - read_buffered = True + message_count += 1 + read_bytes += max(0, 0 if message.key() is None else len(message.key())) + max( + 0, 0 if message.value() is None else len(message.value()) + ) + poll_data.append(message) + read_buffered = True LOG.info( "Gathered %d total messages (%d bytes read) in %r", message_count, read_bytes, time.monotonic() - start_time, ) - for tp in poll_data: - for msg in poll_data[tp]: - try: - key = await self.deserialize(msg.key, request_format) if msg.key else None - except DeserializationError as e: - KarapaceBase.unprocessable_entity( - message=f"key deserialization error for format {request_format}: {e}", - sub_code=RESTErrorCodes.HTTP_UNPROCESSABLE_ENTITY.value, - content_type=content_type, - ) - try: - value = await self.deserialize(msg.value, request_format) if msg.value else None - except DeserializationError as e: - KarapaceBase.unprocessable_entity( - message=f"value deserialization error for format {request_format}: {e}", - sub_code=RESTErrorCodes.HTTP_UNPROCESSABLE_ENTITY.value, - content_type=content_type, - ) - element = { - "topic": tp.topic, - "partition": tp.partition, - "offset": msg.offset, - "timestamp": msg.timestamp, - "key": key, - "value": value, - } - response.append(element) + for msg in poll_data: + try: + key = await self.deserialize(msg.key(), request_format) if msg.key() else None + except DeserializationError as e: + KarapaceBase.unprocessable_entity( + message=f"key deserialization error for format {request_format}: {e}", + sub_code=RESTErrorCodes.HTTP_UNPROCESSABLE_ENTITY.value, + content_type=content_type, + ) + try: + value = await self.deserialize(msg.value(), request_format) if msg.value() else None + except DeserializationError as e: + KarapaceBase.unprocessable_entity( + message=f"value deserialization error for format {request_format}: {e}", + sub_code=RESTErrorCodes.HTTP_UNPROCESSABLE_ENTITY.value, + content_type=content_type, + ) + element = { + "topic": msg.topic(), + "partition": msg.partition(), + "offset": msg.offset(), + "timestamp": msg.timestamp()[1], + "key": key, + "value": value, + } + response.append(element) KarapaceBase.r(content_type=content_type, body=response) diff --git a/stubs/confluent_kafka/__init__.pyi b/stubs/confluent_kafka/__init__.pyi index 3d26b0393..175569fb4 100644 --- a/stubs/confluent_kafka/__init__.pyi +++ b/stubs/confluent_kafka/__init__.pyi @@ -2,6 +2,8 @@ from ._model import IsolationLevel from .cimpl import ( Consumer, Message, + OFFSET_BEGINNING, + OFFSET_END, Producer, TIMESTAMP_CREATE_TIME, TIMESTAMP_LOG_APPEND_TIME, @@ -14,6 +16,8 @@ __all__ = ( "IsolationLevel", "Message", "Producer", + "OFFSET_BEGINNING", + "OFFSET_END", "TIMESTAMP_CREATE_TIME", "TIMESTAMP_LOG_APPEND_TIME", "TIMESTAMP_NOT_AVAILABLE", diff --git a/stubs/confluent_kafka/cimpl.pyi b/stubs/confluent_kafka/cimpl.pyi index 48d67c155..0811cb0cd 100644 --- a/stubs/confluent_kafka/cimpl.pyi +++ b/stubs/confluent_kafka/cimpl.pyi @@ -1,11 +1,16 @@ from confluent_kafka.admin._metadata import ClusterMetadata from typing import Any, Callable, Final +OFFSET_BEGINNING: Final = ... +OFFSET_END: Final = ... + class KafkaError: _NOENT: int _AUTHENTICATION: int _UNKNOWN_TOPIC: int _UNKNOWN_PARTITION: int + _TIMED_OUT: int + _STATE: int def code(self) -> int: ... @@ -38,6 +43,7 @@ class TopicPartition: self.offset: int self.metadata: str | None self.leader_epoch: int | None + self.error: KafkaError | None class Message: def offset(self) -> int: ... @@ -65,7 +71,12 @@ class Producer: def poll(self, timeout: float = -1) -> int: ... class Consumer: - def subscribe(self, topics: list[str]) -> None: ... + def subscribe( + self, + topics: list[str], + on_assign: Callable[[Consumer, list[TopicPartition]], None] | None = None, + on_revoke: Callable[[Consumer, list[TopicPartition]], None] | None = None, + ) -> None: ... def get_watermark_offsets( self, partition: TopicPartition, timeout: float | None = None, cached: bool = False ) -> tuple[int, int] | None: ... @@ -74,6 +85,13 @@ class Consumer: def consume(self, num_messages: int = 1, timeout: float = -1) -> list[Message]: ... def poll(self, timeout: float = -1) -> Message | None: ... def assign(self, partitions: list[TopicPartition]) -> None: ... + def commit( + self, message: Message | None = None, offsets: list[TopicPartition] | None = None, asynchronous: bool = True + ) -> list[TopicPartition] | None: ... + def committed(self, partitions: list[TopicPartition], timeout: float = -1) -> list[TopicPartition]: ... + def unsubscribe(self) -> None: ... + def assignment(self) -> list[TopicPartition]: ... + def seek(self, partition: TopicPartition) -> None: ... TIMESTAMP_CREATE_TIME: Final = ... TIMESTAMP_NOT_AVAILABLE: Final = ... diff --git a/tests/integration/backup/test_legacy_backup.py b/tests/integration/backup/test_legacy_backup.py index 9f7d4a77d..12a400dde 100644 --- a/tests/integration/backup/test_legacy_backup.py +++ b/tests/integration/backup/test_legacy_backup.py @@ -129,7 +129,7 @@ def _assert_canonical_key_format( ) -> None: # Consume all records and assert key order is canonical consumer = KafkaConsumer( - schemas_topic, + topic=schemas_topic, group_id="assert-canonical-key-format-consumer", enable_auto_commit=False, bootstrap_servers=bootstrap_servers, diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index bc0c538ff..94f5a4a0a 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -14,8 +14,8 @@ from karapace.client import Client from karapace.config import Config, set_config_defaults, write_config from karapace.kafka.admin import KafkaAdminClient -from karapace.kafka.producer import KafkaProducer from karapace.kafka.consumer import AIOKafkaConsumer, KafkaConsumer +from karapace.kafka.producer import AIOKafkaProducer, KafkaProducer from karapace.kafka_rest_apis import KafkaRest from pathlib import Path from tests.conftest import KAFKA_VERSION @@ -225,12 +225,45 @@ def fixture_consumer( bootstrap_servers=kafka_servers.bootstrap_servers, auto_offset_reset="earliest", enable_auto_commit=False, + # Speed things up for consumer tests to discover topics, etc. + topic_metadata_refresh_interval_ms=100, ) yield consumer finally: consumer.close() +@pytest.fixture(scope="function", name="aioproducer") +async def fixture_aioproducer( + kafka_servers: KafkaServers, + loop: asyncio.AbstractEventLoop, +) -> Iterator[AIOKafkaProducer]: + try: + aioproducer = AIOKafkaProducer(bootstrap_servers=kafka_servers.bootstrap_servers, loop=loop) + yield aioproducer + finally: + await aioproducer.stop() + + +@pytest.fixture(scope="function", name="aioconsumer") +async def fixture_aioconsumer( + kafka_servers: KafkaServers, + loop: asyncio.AbstractEventLoop, +) -> Iterator[AIOKafkaConsumer]: + try: + aioconsumer = AIOKafkaConsumer( + bootstrap_servers=kafka_servers.bootstrap_servers, + loop=loop, + auto_offset_reset="earliest", + enable_auto_commit=False, + # Speed things up for consumer tests to discover topics, etc. + topic_metadata_refresh_interval_ms=100, + ) + yield aioconsumer + finally: + await aioconsumer.close() + + @pytest.fixture(scope="function", name="rest_async") async def fixture_rest_async( request: SubRequest, diff --git a/tests/integration/kafka/test_consumer.py b/tests/integration/kafka/test_consumer.py index 4fceb1b72..eecf53d73 100644 --- a/tests/integration/kafka/test_consumer.py +++ b/tests/integration/kafka/test_consumer.py @@ -4,15 +4,21 @@ """ from __future__ import annotations -from confluent_kafka import TopicPartition +from confluent_kafka import OFFSET_BEGINNING, OFFSET_END, TopicPartition from confluent_kafka.admin import NewTopic -from kafka.errors import UnknownTopicOrPartitionError -from karapace.kafka.consumer import KafkaConsumer -from karapace.kafka.producer import KafkaProducer +from kafka.errors import IllegalStateError, KafkaTimeoutError, UnknownTopicOrPartitionError +from karapace.kafka.admin import KafkaAdminClient +from karapace.kafka.consumer import AIOKafkaConsumer, KafkaConsumer +from karapace.kafka.producer import AIOKafkaProducer, KafkaProducer +from karapace.utils import Expiration from tests.integration.utils.kafka_server import KafkaServers +from tests.utils import new_topic as create_new_topic +from typing import Final import pytest +POLL_TIMEOUT_S: Final = 10 + class TestPartitionsForTopic: def test_partitions_for_returns_empty_for_unknown_topic(self, consumer: KafkaConsumer) -> None: @@ -56,3 +62,264 @@ def test_get_watermark_offsets_topic_with_one_message( assert beginning == 0 assert end == 1 + + +class TestCommit: + def test_commit_message_and_offset_mutual_exclusion( + self, + consumer: KafkaConsumer, + producer: KafkaProducer, + new_topic: NewTopic, + ) -> None: + fut = producer.send(new_topic.topic) + producer.flush() + message = fut.result() + + with pytest.raises(ValueError): + consumer.commit(message=message, offsets=[]) + + def test_commit_message( + self, + producer: KafkaProducer, + consumer: KafkaConsumer, + new_topic: NewTopic, + ) -> None: + consumer.subscribe([new_topic.topic]) + first_fut = producer.send(new_topic.topic) + second_fut = producer.send(new_topic.topic) + producer.flush() + first_fut.result() + second_fut.result() + consumer.poll(timeout=POLL_TIMEOUT_S) + message = consumer.poll(timeout=POLL_TIMEOUT_S) + + [topic_partition] = consumer.commit(message) + [committed_partition] = consumer.committed([TopicPartition(new_topic.topic, partition=0)]) + + assert topic_partition.topic == new_topic.topic + assert topic_partition.partition == 0 + assert topic_partition.offset == 2 + assert committed_partition.topic == new_topic.topic + assert committed_partition.partition == 0 + assert committed_partition.offset == 2 + + def test_commit_offsets( + self, + producer: KafkaProducer, + consumer: KafkaConsumer, + new_topic: NewTopic, + ) -> None: + consumer.subscribe([new_topic.topic]) + first_fut = producer.send(new_topic.topic) + second_fut = producer.send(new_topic.topic) + producer.flush() + first_fut.result() + second_fut.result() + consumer.poll(timeout=POLL_TIMEOUT_S) + message = consumer.poll(timeout=POLL_TIMEOUT_S) + + [topic_partition] = consumer.commit( + offsets=[ + TopicPartition( + new_topic.topic, + partition=0, + offset=message.offset() + 1, + ), + ] + ) + [committed_partition] = consumer.committed([TopicPartition(new_topic.topic, partition=0)]) + + assert topic_partition.topic == new_topic.topic + assert topic_partition.partition == 0 + assert topic_partition.offset == 2 + assert committed_partition.topic == new_topic.topic + assert committed_partition.partition == 0 + assert committed_partition.offset == 2 + + def test_commit_raises_for_unknown_partition( + self, + consumer: KafkaConsumer, + new_topic: NewTopic, + ) -> None: + consumer.subscribe([new_topic.topic]) + with pytest.raises(UnknownTopicOrPartitionError): + consumer.commit(offsets=[TopicPartition(new_topic.topic, partition=99, offset=0)]) + + +class TestCommitted: + def test_committed_raises_on_timeout(self, consumer: KafkaConsumer) -> None: + with pytest.raises(KafkaTimeoutError): + consumer.committed([], timeout=0.00001) + + +class TestSubscribe: + def _poll_until_no_message(self, consumer: KafkaConsumer) -> None: + """Polls until there is no message returned. + + When verifying subscriptions this can be used to wait until the topics + subscribed to are all ready. Until a topic is not ready, a message is + returned with an error indicating that certain topics are not ready to + be consumed yet. + """ + msg = consumer.poll(timeout=POLL_TIMEOUT_S) + while msg is not None: + msg = consumer.poll(timeout=POLL_TIMEOUT_S) + + def test_subscription_is_recorded( + self, + admin_client: KafkaAdminClient, + consumer: KafkaConsumer, + new_topic: NewTopic, + ) -> None: + prefix = "subscribe" + topics = [create_new_topic(admin_client, prefix=prefix) for _ in range(3)] + + consumer.subscribe(topics=[new_topic.topic], patterns=[f"{prefix}.*"]) + self._poll_until_no_message(consumer) + + assert consumer.subscription() == frozenset(topics + [new_topic.topic]) + + def test_unsubscribe_empties_subscription( + self, + admin_client: KafkaAdminClient, + consumer: KafkaConsumer, + new_topic: NewTopic, + ) -> None: + prefix = "unsubscribe" + _ = [create_new_topic(admin_client, prefix=prefix) for _ in range(3)] + consumer.subscribe(topics=[new_topic.topic], patterns=[f"{prefix}.*"]) + self._poll_until_no_message(consumer) + + consumer.unsubscribe() + + self._poll_until_no_message(consumer) + + assert consumer.subscription() == frozenset() + + def test_resubscribe_modifies_subscription( + self, + admin_client: KafkaAdminClient, + consumer: KafkaConsumer, + new_topic: NewTopic, + ) -> None: + prefix = "resubscribe" + _ = [create_new_topic(admin_client, prefix=prefix) for _ in range(3)] + consumer.subscribe(topics=[new_topic.topic], patterns=[f"{prefix}.*"]) + self._poll_until_no_message(consumer) + + consumer.subscribe(topics=[new_topic.topic]) + + self._poll_until_no_message(consumer) + + assert consumer.subscription() == frozenset([new_topic.topic]) + + +class TestAssign: + def test_assign(self, consumer: KafkaConsumer, producer: KafkaProducer, new_topic: NewTopic) -> None: + first_fut = producer.send(new_topic.topic) + second_fut = producer.send(new_topic.topic, value=b"message-value") + producer.flush() + first_fut.result() + second_fut.result() + consumer.assign([TopicPartition(new_topic.topic, partition=0, offset=1)]) + + first_message = consumer.poll(POLL_TIMEOUT_S) + second_message = consumer.poll(POLL_TIMEOUT_S) + [assigned_partition] = consumer.assignment() + + assert first_message.offset() == 1 + assert first_message.value() == b"message-value" + assert second_message is None + + assert assigned_partition.topic == new_topic.topic + assert assigned_partition.partition == 0 + assert assigned_partition.offset == 1 + + def test_assign_raises_illegal_state_after_subscribe( + self, + consumer: KafkaConsumer, + new_topic: NewTopic, + ) -> None: + consumer.subscribe([new_topic.topic]) + consumer.poll(timeout=POLL_TIMEOUT_S) + + with pytest.raises(IllegalStateError): + consumer.assign([TopicPartition("some-topic", 0)]) + + +class TestSeek: + def test_seek(self, consumer: KafkaConsumer, producer: KafkaProducer, new_topic: NewTopic) -> None: + consumer.subscribe([new_topic.topic]) + fut = producer.send(new_topic.topic, value=b"message-value") + producer.flush() + fut.result() + + message = consumer.poll(timeout=POLL_TIMEOUT_S) + consumer.seek(TopicPartition(new_topic.topic, partition=0, offset=OFFSET_BEGINNING)) + same_message = consumer.poll(timeout=POLL_TIMEOUT_S) + + assert message.offset() == same_message.offset() + assert message.value() == same_message.value() + + def test_seek_unassigned_partition_raises(self, consumer: KafkaConsumer, new_topic: NewTopic) -> None: + with pytest.raises(UnknownTopicOrPartitionError): + consumer.seek(TopicPartition(new_topic.topic, partition=0, offset=OFFSET_END)) + + +class TestAsyncPoll: + async def test_async_poll( + self, + aioproducer: AIOKafkaProducer, + aioconsumer: AIOKafkaConsumer, + new_topic: NewTopic, + ) -> None: + await aioconsumer.subscribe([new_topic.topic]) + aiofut = await aioproducer.send(new_topic.topic) + await aiofut + + message = await aioconsumer.poll(timeout=POLL_TIMEOUT_S) + + assert message.offset() == 0 + assert message.key() is None + assert message.value() is None + + async def test_async_poll_no_message(self, aioconsumer: AIOKafkaConsumer, new_topic: NewTopic) -> None: + await aioconsumer.subscribe([new_topic.topic]) + + message = await aioconsumer.poll(timeout=1) + + assert message is None + + async def test_async_poll_unknown_topic(self, aioconsumer: AIOKafkaConsumer) -> None: + await aioconsumer.subscribe(["nonexistent"]) + + message = await aioconsumer.poll(timeout=POLL_TIMEOUT_S) + + assert message.error() is not None + + +async def test_pattern_subscription_async( + admin_client: KafkaAdminClient, + aioproducer: AIOKafkaProducer, + aioconsumer: AIOKafkaConsumer, +) -> None: + prefix = "patterntest" + number_of_topics = 3 + topics = [create_new_topic(admin_client, prefix=prefix) for _ in range(number_of_topics)] + await aioconsumer.subscribe(patterns=[f"{prefix}.*"]) + for i, topic in enumerate(topics): + aiofut = await aioproducer.send(topic, value=f"{i}-value") + await aiofut + + messages = [] + expiration = Expiration.from_timeout(30) + while len(messages) != 3: + expiration.raise_timeout_if_expired( + "Timeout elapsed waiting for messages from topic pattern. Only received {messages}", + messages=messages, + ) + message = await aioconsumer.poll(timeout=POLL_TIMEOUT_S) + if message is not None and message.error() is None: + messages.append(message) + + assert sorted(message.value().decode() for message in messages) == [f"{i}-value" for i in range(number_of_topics)] diff --git a/tests/integration/kafka/test_producer.py b/tests/integration/kafka/test_producer.py index 64d4deeb5..d1dab8945 100644 --- a/tests/integration/kafka/test_producer.py +++ b/tests/integration/kafka/test_producer.py @@ -9,10 +9,7 @@ from kafka.errors import MessageSizeTooLargeError, UnknownTopicOrPartitionError from karapace.kafka.producer import AIOKafkaProducer, KafkaProducer from karapace.kafka.types import Timestamp -from tests.integration.utils.kafka_server import KafkaServers -from typing import Iterator -import asyncio import pytest import time @@ -76,18 +73,6 @@ def test_partitions_for(self, producer: KafkaProducer, new_topic: NewTopic) -> N assert partitions[0].isrs == [1] -@pytest.fixture(scope="function", name="aioproducer") -async def fixture_aioproducer( - kafka_servers: KafkaServers, - loop: asyncio.AbstractEventLoop, -) -> Iterator[AIOKafkaProducer]: - try: - aioproducer = AIOKafkaProducer(bootstrap_servers=kafka_servers.bootstrap_servers, loop=loop) - yield aioproducer - finally: - await aioproducer.stop() - - class TestAsyncSend: async def test_async_send(self, aioproducer: AIOKafkaProducer, new_topic: NewTopic) -> None: key = b"key" diff --git a/tests/integration/test_rest_consumer.py b/tests/integration/test_rest_consumer.py index e1620d1b2..8c0fc6e18 100644 --- a/tests/integration/test_rest_consumer.py +++ b/tests/integration/test_rest_consumer.py @@ -11,6 +11,7 @@ repeat_until_successful_request, REST_HEADERS, schema_data, + wait_for_topics, ) import base64 @@ -87,7 +88,7 @@ async def test_subscription(rest_async_client, admin_client, producer, trail): topic_name = new_topic(admin_client) instance_id = await new_consumer(rest_async_client, group_name, fmt="binary", trail=trail) sub_path = f"/consumers/{group_name}/instances/{instance_id}/subscription{trail}" - consume_path = f"/consumers/{group_name}/instances/{instance_id}/records{trail}?timeout=1000" + consume_path = f"/consumers/{group_name}/instances/{instance_id}/records{trail}?timeout=5000" res = await rest_async_client.get(sub_path, headers=header) assert res.ok data = res.json() @@ -95,6 +96,9 @@ async def test_subscription(rest_async_client, admin_client, producer, trail): # simple sub res = await rest_async_client.post(sub_path, json={"topics": [topic_name]}, headers=header) assert res.ok + # Consume so confluent rest reevaluates the subscription + resp = await rest_async_client.get(consume_path, headers=header) + assert resp.ok res = await rest_async_client.get(sub_path, headers=header) assert res.ok data = res.json() @@ -112,6 +116,9 @@ async def test_subscription(rest_async_client, admin_client, producer, trail): # on delete it's empty again res = await rest_async_client.delete(sub_path, headers=header) assert res.ok + # Consume so confluent rest reevaluates the subscription + resp = await rest_async_client.get(consume_path, headers=header) + assert resp.ok res = await rest_async_client.get(sub_path, headers=header) assert res.ok data = res.json() @@ -119,6 +126,7 @@ async def test_subscription(rest_async_client, admin_client, producer, trail): # one pattern sub will get all 3 prefix = f"{hash(random.random())}" pattern_topics = [new_topic(admin_client, prefix=f"{prefix}{i}") for i in range(3)] + await wait_for_topics(rest_async_client, topic_names=pattern_topics, timeout=20, sleep=1) res = await rest_async_client.post(sub_path, json={"topic_pattern": f"{prefix}.*"}, headers=REST_HEADERS["json"]) assert res.ok @@ -171,11 +179,13 @@ async def test_seek(rest_async_client, admin_client, trail): seek_path = f"/consumers/{group}/instances/{instance_id}/positions{trail}" # one partition assigned, we can topic_name = new_topic(admin_client) + await wait_for_topics(rest_async_client, topic_names=[topic_name], timeout=20, sleep=1) assign_path = f"/consumers/{group}/instances/{instance_id}/assignments{trail}" assign_payload = {"partitions": [{"topic": topic_name, "partition": 0}]} res = await rest_async_client.post(assign_path, headers=REST_HEADERS["json"], json=assign_payload) assert res.ok - seek_payload = {"offsets": [{"topic": topic_name, "partition": 0, "offset": 10}]} + + seek_payload = {"offsets": [{"topic": topic_name, "partition": 0, "offset": 0}]} res = await rest_async_client.post(seek_path, json=seek_payload, headers=REST_HEADERS["json"]) assert res.ok, f"Unexpected status for {res}" extreme_payload = {"partitions": [{"topic": topic_name, "partition": 0}]} @@ -262,7 +272,7 @@ async def test_consume(rest_async_client, admin_client, producer, trail): instance_id = await new_consumer(rest_async_client, group_name, fmt=fmt, trail=trail) assign_path = f"/consumers/{group_name}/instances/{instance_id}/assignments{trail}" seek_path = f"/consumers/{group_name}/instances/{instance_id}/positions/beginning{trail}" - consume_path = f"/consumers/{group_name}/instances/{instance_id}/records{trail}?timeout=1000" + consume_path = f"/consumers/{group_name}/instances/{instance_id}/records{trail}?timeout=5000" topic_name = new_topic(admin_client) assign_payload = {"partitions": [{"topic": topic_name, "partition": 0}]} res = await rest_async_client.post(assign_path, json=assign_payload, headers=header) @@ -300,7 +310,7 @@ async def test_consume_timeout(rest_async_client, admin_client, producer): instance_id = await new_consumer(rest_async_client, group_name, fmt=fmt) assign_path = f"/consumers/{group_name}/instances/{instance_id}/assignments" seek_path = f"/consumers/{group_name}/instances/{instance_id}/positions/beginning" - consume_path = f"/consumers/{group_name}/instances/{instance_id}/records?timeout=1000" + consume_path = f"/consumers/{group_name}/instances/{instance_id}/records?timeout=5000" topic_name = new_topic(admin_client) assign_payload = {"partitions": [{"topic": topic_name, "partition": 0}]} res = await rest_async_client.post(assign_path, json=assign_payload, headers=header) @@ -393,7 +403,7 @@ async def test_consume_grafecul_deserialization_error_handling(rest_async_client res = await rest_async_client.post(assign_path, json=assign_payload, headers=headers) assert res.ok - consume_path = f"/consumers/{group_name}/instances/{instance_id}/records?timeout=1000" + consume_path = f"/consumers/{group_name}/instances/{instance_id}/records?timeout=5000" resp = await rest_async_client.get(consume_path, headers=headers) if fmt == "binary": assert resp.status_code == 200, f"Expected 200 response: {resp}" diff --git a/tests/integration/test_rest_consumer_protobuf.py b/tests/integration/test_rest_consumer_protobuf.py index 903d94327..d4c7ca7c4 100644 --- a/tests/integration/test_rest_consumer_protobuf.py +++ b/tests/integration/test_rest_consumer_protobuf.py @@ -239,7 +239,7 @@ async def test_publish_and_consume_protobuf_with_recursive_references( subscribe_path = f"/consumers/{group}/instances/{instance_id}/subscription" - consume_path = f"/consumers/{group}/instances/{instance_id}/records?timeout=1000" + consume_path = f"/consumers/{group}/instances/{instance_id}/records?timeout=5000" res = await rest_async_client.post(subscribe_path, json={"topics": [topic_name]}, headers=REST_HEADERS["binary"]) assert res.ok diff --git a/tests/utils.py b/tests/utils.py index bf17d9c8b..a48bc55cc 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -27,6 +27,7 @@ "consumer.request.timeout.ms": 11000, "fetch.min.bytes": 100000, "auto.commit.enable": "true", + "topic.metadata.refresh.interval.ms": 100, } schema_jsonschema_json = json.dumps( {