diff --git a/src/schema_registry/__main__.py b/src/schema_registry/__main__.py index b656ac001..30c321b22 100644 --- a/src/schema_registry/__main__.py +++ b/src/schema_registry/__main__.py @@ -18,6 +18,7 @@ import schema_registry.routers.mode import schema_registry.routers.schemas import schema_registry.routers.subjects +import schema_registry.telemetry.meter import schema_registry.telemetry.middleware import schema_registry.telemetry.setup import schema_registry.telemetry.tracer @@ -31,6 +32,7 @@ __name__, schema_registry.controller, schema_registry.telemetry.tracer, + schema_registry.telemetry.meter, ] ) diff --git a/src/schema_registry/factory.py b/src/schema_registry/factory.py index 0cd970dc0..355cbe2aa 100644 --- a/src/schema_registry/factory.py +++ b/src/schema_registry/factory.py @@ -18,7 +18,7 @@ from schema_registry.middlewares import setup_middlewares from schema_registry.registry import KarapaceSchemaRegistry from schema_registry.routers.setup import setup_routers -from schema_registry.telemetry.setup import setup_tracing +from schema_registry.telemetry.setup import setup_metering, setup_tracing from typing import AsyncContextManager import logging @@ -60,6 +60,7 @@ def create_karapace_application( app = FastAPI(lifespan=lifespan) # type: ignore[arg-type] setup_tracing() + setup_metering() setup_routers(app=app) setup_exception_handlers(app=app) setup_middlewares(app=app) diff --git a/src/schema_registry/telemetry/container.py b/src/schema_registry/telemetry/container.py index d9d53ea2f..d60bdc102 100644 --- a/src/schema_registry/telemetry/container.py +++ b/src/schema_registry/telemetry/container.py @@ -9,10 +9,11 @@ from opentelemetry.sdk.resources import Resource from opentelemetry.sdk.trace import TracerProvider from opentelemetry.semconv.attributes import telemetry_attributes as T +from schema_registry.telemetry.meter import Meter from schema_registry.telemetry.tracer import Tracer -def create_tracing_resource(config: Config) -> Resource: +def create_telemetry_resource(config: Config) -> Resource: return Resource.create( { "service.name": config.telemetry.resource_service_name, @@ -26,6 +27,9 @@ def create_tracing_resource(config: Config) -> Resource: class TelemetryContainer(containers.DeclarativeContainer): karapace_container = providers.Container(KarapaceContainer) - tracing_resource = providers.Factory(create_tracing_resource, config=karapace_container.config) - tracer_provider = providers.Singleton(TracerProvider, resource=tracing_resource) + + telemetry_resource = providers.Factory(create_telemetry_resource, config=karapace_container.config) + + meter = providers.Singleton(Meter) tracer = providers.Singleton(Tracer) + tracer_provider = providers.Singleton(TracerProvider, resource=telemetry_resource) diff --git a/src/schema_registry/telemetry/meter.py b/src/schema_registry/telemetry/meter.py index 83eaf7046..d3912c4cd 100644 --- a/src/schema_registry/telemetry/meter.py +++ b/src/schema_registry/telemetry/meter.py @@ -8,7 +8,12 @@ from karapace.container import KarapaceContainer from opentelemetry import metrics from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import OTLPMetricExporter -from opentelemetry.sdk.metrics.export import ConsoleMetricExporter, MetricReader, PeriodicExportingMetricReader +from opentelemetry.sdk.metrics.export import ( + ConsoleMetricExporter, + MetricExporter, + MetricReader, + PeriodicExportingMetricReader, +) from typing import Final @@ -23,7 +28,7 @@ def get_meter(config: Config = Provide[KarapaceContainer.config]) -> metrics.Met @staticmethod @inject def get_metric_reader(config: Config = Provide[KarapaceContainer.config]) -> MetricReader: - exporter = ConsoleMetricExporter() + exporter: MetricExporter = ConsoleMetricExporter() if config.telemetry.otel_endpoint_url: exporter = OTLPMetricExporter(endpoint=config.telemetry.otel_endpoint_url) return PeriodicExportingMetricReader( diff --git a/src/schema_registry/telemetry/middleware.py b/src/schema_registry/telemetry/middleware.py index c6d14bbe9..56f0809ba 100644 --- a/src/schema_registry/telemetry/middleware.py +++ b/src/schema_registry/telemetry/middleware.py @@ -6,11 +6,14 @@ from collections.abc import Awaitable, Callable from dependency_injector.wiring import inject, Provide from fastapi import FastAPI, Request, Response +from opentelemetry.metrics import Counter, Histogram, UpDownCounter from opentelemetry.trace import SpanKind from schema_registry.telemetry.container import TelemetryContainer +from schema_registry.telemetry.meter import Meter from schema_registry.telemetry.tracer import Tracer import logging +import time LOG = logging.getLogger(__name__) @@ -20,12 +23,59 @@ async def telemetry_middleware( request: Request, call_next: Callable[[Request], Awaitable[Response]], tracer: Tracer = Provide[TelemetryContainer.tracer], + meter: Meter = Provide[TelemetryContainer.meter], ) -> Response: resource = request.url.path.split("/")[1] with tracer.get_tracer().start_as_current_span(name=f"{request.method}: /{resource}", kind=SpanKind.SERVER) as span: + span.add_event("Creating metering resources") + karapace_http_requests_in_progress: UpDownCounter = meter.get_meter().create_up_down_counter( + name="karapace_http_requests_in_progress", + description="In-progress requests for HTTP/TCP Protocol", + ) + karapace_http_requests_duration_seconds: Histogram = meter.get_meter().create_histogram( + unit="seconds", + name="karapace_http_requests_duration_seconds", + description="Request Duration for HTTP/TCP Protocol", + ) + karapace_http_requests_total: Counter = meter.get_meter().create_counter( + name="karapace_http_requests_total", + description="Total Request Count for HTTP/TCP Protocol", + ) + + # Set start time for request + setattr(request.state, meter.START_TIME_KEY, time.monotonic()) + + # Extract request labels + path = request.url.path + method = request.method + + # Increment requests in progress before response handler + span.add_event("Metering requests in progress (increase)") + karapace_http_requests_in_progress.add(amount=1, attributes={"method": method, "path": path}) + + # Call request handler tracer.update_span_with_request(request=request, span=span) + span.add_event("Calling request handler") response: Response = await call_next(request) tracer.update_span_with_response(response=response, span=span) + + # Instrument request duration + span.add_event("Metering request duration") + karapace_http_requests_duration_seconds.record( + amount=(time.monotonic() - getattr(request.state, meter.START_TIME_KEY)), + attributes={"method": method, "path": path}, + ) + + # Instrument total requests + span.add_event("Metering total requests") + karapace_http_requests_total.add( + amount=1, attributes={"method": method, "path": path, "status": response.status_code} + ) + + # Decrement requests in progress after response handler + span.add_event("Metering requests in progress (decrease)") + karapace_http_requests_in_progress.add(amount=-1, attributes={"method": method, "path": path}) + return response diff --git a/src/schema_registry/telemetry/setup.py b/src/schema_registry/telemetry/setup.py index 30b423902..294f85c50 100644 --- a/src/schema_registry/telemetry/setup.py +++ b/src/schema_registry/telemetry/setup.py @@ -4,9 +4,12 @@ """ from dependency_injector.wiring import inject, Provide -from opentelemetry import trace +from opentelemetry import metrics, trace +from opentelemetry.sdk.metrics import MeterProvider +from opentelemetry.sdk.resources import Resource from opentelemetry.sdk.trace import TracerProvider from schema_registry.telemetry.container import TelemetryContainer +from schema_registry.telemetry.meter import Meter from schema_registry.telemetry.tracer import Tracer import logging @@ -22,3 +25,12 @@ def setup_tracing( LOG.info("Setting OTel tracing provider") tracer_provider.add_span_processor(tracer.get_span_processor()) trace.set_tracer_provider(tracer_provider) + + +@inject +def setup_metering( + meter: Meter = Provide[TelemetryContainer.meter], + telemetry_resource: Resource = Provide[TelemetryContainer.telemetry_resource], +) -> None: + LOG.info("Setting OTel meter provider") + metrics.set_meter_provider(MeterProvider(resource=telemetry_resource, metric_readers=[meter.get_metric_reader()])) diff --git a/tests/conftest.py b/tests/conftest.py index 3f7b6ad9e..3e273f430 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -15,6 +15,7 @@ import pytest import re import schema_registry.controller +import schema_registry.telemetry.meter import schema_registry.telemetry.middleware import schema_registry.telemetry.setup import schema_registry.telemetry.tracer diff --git a/tests/unit/schema_registry/telemetry/test_meter.py b/tests/unit/schema_registry/telemetry/test_meter.py index 3a60d62fe..24ed093d2 100644 --- a/tests/unit/schema_registry/telemetry/test_meter.py +++ b/tests/unit/schema_registry/telemetry/test_meter.py @@ -5,6 +5,7 @@ See LICENSE for details """ +from karapace.config import KarapaceTelemetry from karapace.container import KarapaceContainer from schema_registry.telemetry.meter import Meter from unittest.mock import patch @@ -16,30 +17,35 @@ def test_meter(karapace_container: KarapaceContainer): mock_metrics.get_meter_provider.return_value.get_meter.assert_called_once_with("Karapace.meter") -def test_get_metric_reader_with_otel_endpoint(karapace_container: KarapaceContainer) -> None: +def test_get_metric_reader_without_otel_endpoint(karapace_container: KarapaceContainer) -> None: + config = karapace_container.config().set_config_defaults( + new_config={"telemetry": KarapaceTelemetry(otel_endpoint_url=None)} + ) with ( - patch("schema_registry.telemetry.meter.OTLPMetricExporter") as mock_otlp_exporter, + patch("schema_registry.telemetry.meter.ConsoleMetricExporter") as mock_console_exporter, patch("schema_registry.telemetry.meter.PeriodicExportingMetricReader") as mock_periodic_exporting_metric_reader, ): - karapace_container.config().telemetry.otel_endpoint_url = "http://otel:4317" - reader = Meter.get_metric_reader(config=karapace_container.config()) - mock_otlp_exporter.assert_called_once_with(endpoint="http://otel:4317") + reader = Meter.get_metric_reader(config=config) + mock_console_exporter.assert_called_once() mock_periodic_exporting_metric_reader.assert_called_once_with( - exporter=mock_otlp_exporter.return_value, + exporter=mock_console_exporter.return_value, export_interval_millis=10000, ) assert reader is mock_periodic_exporting_metric_reader.return_value -def test_get_metric_reader_without_otel_endpoint(karapace_container: KarapaceContainer) -> None: +def test_get_metric_reader_with_otel_endpoint(karapace_container: KarapaceContainer) -> None: + config = karapace_container.config().set_config_defaults( + new_config={"telemetry": KarapaceTelemetry(otel_endpoint_url="http://otel:4317")} + ) with ( - patch("schema_registry.telemetry.meter.ConsoleMetricExporter") as mock_console_exporter, + patch("schema_registry.telemetry.meter.OTLPMetricExporter") as mock_otlp_exporter, patch("schema_registry.telemetry.meter.PeriodicExportingMetricReader") as mock_periodic_exporting_metric_reader, ): - reader = Meter.get_metric_reader(config=karapace_container.config()) - mock_console_exporter.assert_called_once() + reader = Meter.get_metric_reader(config=config) + mock_otlp_exporter.assert_called_once_with(endpoint="http://otel:4317") mock_periodic_exporting_metric_reader.assert_called_once_with( - exporter=mock_console_exporter.return_value, + exporter=mock_otlp_exporter.return_value, export_interval_millis=10000, ) assert reader is mock_periodic_exporting_metric_reader.return_value diff --git a/tests/unit/schema_registry/telemetry/test_middleware.py b/tests/unit/schema_registry/telemetry/test_middleware.py index ecbe79307..bbabe449d 100644 --- a/tests/unit/schema_registry/telemetry/test_middleware.py +++ b/tests/unit/schema_registry/telemetry/test_middleware.py @@ -8,9 +8,10 @@ from _pytest.logging import LogCaptureFixture from fastapi import FastAPI, Request, Response from opentelemetry.trace import SpanKind +from schema_registry.telemetry.meter import Meter from schema_registry.telemetry.middleware import setup_telemetry_middleware, telemetry_middleware from schema_registry.telemetry.tracer import Tracer -from unittest.mock import AsyncMock, MagicMock +from unittest.mock import AsyncMock, call, MagicMock, patch import logging @@ -31,10 +32,11 @@ def test_setup_telemetry_middleware(caplog: LogCaptureFixture) -> None: async def test_telemetry_middleware() -> None: tracer = MagicMock(spec=Tracer) + meter = MagicMock(spec=Meter, START_TIME_KEY="start_time") request_mock = AsyncMock(spec=Request) request_mock.method = "GET" - request_mock.url.path = "/test" + request_mock.url.path = "/test/inner-path" response_mock = AsyncMock(spec=Response) response_mock.status_code = 200 @@ -42,15 +44,52 @@ async def test_telemetry_middleware() -> None: call_next = AsyncMock() call_next.return_value = response_mock - response = await telemetry_middleware(request=request_mock, call_next=call_next, tracer=tracer) - span = tracer.get_tracer.return_value.start_as_current_span.return_value.__enter__.return_value - - tracer.get_tracer.assert_called_once() - tracer.get_tracer.return_value.start_as_current_span.assert_called_once_with(name="GET: /test", kind=SpanKind.SERVER) - tracer.update_span_with_request.assert_called_once_with(request=request_mock, span=span) - tracer.update_span_with_response.assert_called_once_with(response=response_mock, span=span) - - # Check that the request handler is called - call_next.assert_awaited_once_with(request_mock) - - assert response == response_mock + with patch("schema_registry.telemetry.middleware.time.monotonic", return_value=1): + response = await telemetry_middleware(request=request_mock, call_next=call_next, tracer=tracer, meter=meter) + span = tracer.get_tracer.return_value.start_as_current_span.return_value.__enter__.return_value + + tracer.get_tracer.assert_called_once() + tracer.get_tracer.return_value.start_as_current_span.assert_called_once_with(name="GET: /test", kind=SpanKind.SERVER) + tracer.update_span_with_request.assert_called_once_with(request=request_mock, span=span) + tracer.update_span_with_response.assert_called_once_with(response=response_mock, span=span) + + # Check that the request handler is called + call_next.assert_awaited_once_with(request_mock) + + span.add_event.assert_has_calls( + [ + call("Creating metering resources"), + call("Metering requests in progress (increase)"), + call("Calling request handler"), + call("Metering request duration"), + call("Metering total requests"), + call("Metering requests in progress (decrease)"), + ] + ) + + meter.get_meter.assert_has_calls( + [ + call(), + call().create_up_down_counter( + name="karapace_http_requests_in_progress", description="In-progress requests for HTTP/TCP Protocol" + ), + call(), + call().create_histogram( + unit="seconds", + name="karapace_http_requests_duration_seconds", + description="Request Duration for HTTP/TCP Protocol", + ), + call(), + call().create_counter( + name="karapace_http_requests_total", description="Total Request Count for HTTP/TCP Protocol" + ), + call().create_up_down_counter().add(amount=1, attributes={"method": "GET", "path": "/test/inner-path"}), + call().create_histogram().record(amount=0, attributes={"method": "GET", "path": "/test/inner-path"}), + call() + .create_counter() + .add(amount=1, attributes={"method": "GET", "path": "/test/inner-path", "status": 200}), + call().create_up_down_counter().add(amount=-1, attributes={"method": "GET", "path": "/test/inner-path"}), + ] + ) + + assert response == response_mock