diff --git a/src/karapace/kafka/admin.py b/src/karapace/kafka/admin.py index fef52ebf5..0455a712c 100644 --- a/src/karapace/kafka/admin.py +++ b/src/karapace/kafka/admin.py @@ -7,7 +7,7 @@ from collections.abc import Container, Iterable from concurrent.futures import Future -from confluent_kafka import TopicPartition +from confluent_kafka import TopicCollection, TopicPartition from confluent_kafka.admin import ( AdminClient, BrokerMetadata, @@ -20,6 +20,7 @@ TopicMetadata, ) from confluent_kafka.error import KafkaException +from dependency_injector.wiring import inject, Provide from karapace.constants import TOPIC_CREATION_TIMEOUT_S from karapace.kafka.common import ( _KafkaConfigMixin, @@ -27,6 +28,8 @@ single_futmap_result, UnknownTopicOrPartitionError, ) +from schema_registry.telemetry.container import TelemetryContainer +from schema_registry.telemetry.tracer import Tracer class KafkaAdminClient(_KafkaConfigMixin, AdminClient): @@ -175,3 +178,10 @@ def get_offsets(self, topic: str, partition_id: int) -> dict[str, int]: except KafkaException as exc: raise_from_kafkaexception(exc) return {"beginning_offset": startoffset.offset, "end_offset": endoffset.offset} + + @inject + def describe_topics( + self, topics: TopicCollection, tracer: Tracer = Provide[TelemetryContainer.tracer] + ) -> dict[str, Future]: + with tracer.get_tracer().start_as_current_span(tracer.get_name_from_caller_with_class(self, self.describe_topics)): + return super().describe_topics(topics) diff --git a/src/schema_registry/__main__.py b/src/schema_registry/__main__.py index 20dfc109e..3da295b4d 100644 --- a/src/schema_registry/__main__.py +++ b/src/schema_registry/__main__.py @@ -8,6 +8,7 @@ from schema_registry.telemetry.container import TelemetryContainer import karapace.coordinator.master_coordinator +import karapace.kafka.admin import karapace.offset_watcher import schema_registry.controller import schema_registry.factory @@ -44,6 +45,7 @@ schema_registry.reader, karapace.offset_watcher, karapace.coordinator.master_coordinator, + karapace.kafka.admin, ] )