From 3f88bd666c623ba85b920de98dda5640f3441ab5 Mon Sep 17 00:00:00 2001 From: Bastian Krol Date: Mon, 10 Mar 2025 13:35:20 +0100 Subject: [PATCH] fix(self-monitoring): fix self-monitoring for endpoints with scheme This fixes the issue that self-monitoring telemetry could not be exported from the operator manager when the configured export endpoint included a scheme (http://, https://). --- images/pkg/common/otel.go | 42 ++++++++++++++----- .../operator_configuration_controller.go | 6 ++- .../self_monitoring_and_api_access.go | 3 +- test-resources/bin/util | 10 +++++ .../dash0operatorconfiguration.otlpsink.yaml | 2 - test/e2e/operator.go | 18 +++++--- 6 files changed, 60 insertions(+), 21 deletions(-) diff --git a/images/pkg/common/otel.go b/images/pkg/common/otel.go index 41b5fef7..ceea02ad 100644 --- a/images/pkg/common/otel.go +++ b/images/pkg/common/otel.go @@ -7,6 +7,7 @@ import ( "context" "log" "os" + "regexp" "sync" "time" @@ -21,12 +22,6 @@ import ( semconv "go.opentelemetry.io/otel/semconv/v1.26.0" ) -const ( - ProtocolGrpc = "grpc" - ProtocolHttpProtobuf = "http/protobuf" - ProtocolHttpJson = "http/json" -) - type OTelSdkConfig struct { Endpoint string Protocol string @@ -35,10 +30,17 @@ type OTelSdkConfig struct { Headers map[string]string } +const ( + ProtocolGrpc = "grpc" + ProtocolHttpProtobuf = "http/protobuf" + ProtocolHttpJson = "http/json" +) + var ( - meterProvider otelmetric.MeterProvider - shutdownFunctions []func(ctx context.Context) error - oTelSdkMutex sync.Mutex + meterProvider otelmetric.MeterProvider + shutdownFunctions []func(ctx context.Context) error + oTelSdkMutex sync.Mutex + endpointSchemeRegex = regexp.MustCompile(`^\w+://`) ) func InitOTelSdkFromEnvVars( @@ -132,7 +134,14 @@ func InitOTelSdkWithConfig( var err error switch protocol { case ProtocolGrpc: - options := []otlpmetricgrpc.Option{otlpmetricgrpc.WithEndpoint(oTelSdkConfig.Endpoint)} + var options []otlpmetricgrpc.Option + if EndpointHasScheme(oTelSdkConfig.Endpoint) { + log.Printf("Using a gRPC export for self-monitoring (via WithEndpointURL): %s \n", oTelSdkConfig.Endpoint) + options = []otlpmetricgrpc.Option{otlpmetricgrpc.WithEndpointURL(oTelSdkConfig.Endpoint)} + } else { + log.Printf("Using a gRPC export for self-monitoring (via WithEndpoint): %s\n", oTelSdkConfig.Endpoint) + options = []otlpmetricgrpc.Option{otlpmetricgrpc.WithEndpoint(oTelSdkConfig.Endpoint)} + } if len(oTelSdkConfig.Headers) > 0 { options = append(options, otlpmetricgrpc.WithHeaders(oTelSdkConfig.Headers)) } @@ -140,7 +149,14 @@ func InitOTelSdkWithConfig( log.Printf("Cannot create the OTLP gRPC metrics exporter: %v\n", err) } case ProtocolHttpProtobuf: - options := []otlpmetrichttp.Option{otlpmetrichttp.WithEndpoint(oTelSdkConfig.Endpoint)} + var options []otlpmetrichttp.Option + if EndpointHasScheme(oTelSdkConfig.Endpoint) { + log.Printf("Using an HTTP export for self-monitoring (via WithEndpointURL): %s \n", oTelSdkConfig.Endpoint) + options = []otlpmetrichttp.Option{otlpmetrichttp.WithEndpointURL(oTelSdkConfig.Endpoint)} + } else { + log.Printf("Using an HTTP export for self-monitoring (via WithEndpoint): %s\n", oTelSdkConfig.Endpoint) + options = []otlpmetrichttp.Option{otlpmetrichttp.WithEndpoint(oTelSdkConfig.Endpoint)} + } if len(oTelSdkConfig.Headers) > 0 { options = append(options, otlpmetrichttp.WithHeaders(oTelSdkConfig.Headers)) } @@ -255,3 +271,7 @@ func ShutDownOTelSdkThreadSafe(ctx context.Context) { ShutDownOTelSdk(ctx) } + +func EndpointHasScheme(endpoint string) bool { + return endpointSchemeRegex.MatchString(endpoint) +} diff --git a/internal/controller/operator_configuration_controller.go b/internal/controller/operator_configuration_controller.go index d14a2526..e733214e 100644 --- a/internal/controller/operator_configuration_controller.go +++ b/internal/controller/operator_configuration_controller.go @@ -112,10 +112,12 @@ func (r *OperatorConfigurationReconciler) Reconcile(ctx context.Context, req ctr &logger, ) if err != nil { + logger.Error(err, "operator configuration resource existence check failed") return ctrl.Result{}, err } else if checkResourceResult.ResourceDoesNotExist { resourceDeleted = true } else if checkResourceResult.StopReconcile { + logger.Info("stopping operator configuration resource reconcile request after resource existence check") return ctrl.Result{}, nil } @@ -155,8 +157,10 @@ func (r *OperatorConfigurationReconciler) Reconcile(ctx context.Context, req ctr &logger, ) if err != nil { + logger.Error(err, "error in operator configuration resource uniqueness check") return ctrl.Result{}, err } else if stopReconcile { + logger.Info("stopping operator configuration resource reconcile after uniqueness check") return ctrl.Result{}, nil } @@ -167,7 +171,7 @@ func (r *OperatorConfigurationReconciler) Reconcile(ctx context.Context, req ctr operatorConfigurationResource.Status.Conditions, &logger, ); err != nil { - // The error has already been logged in initStatusConditions + logger.Error(err, "error when initializing operator configuration resource status conditions") return ctrl.Result{}, err } diff --git a/internal/selfmonitoringapiaccess/self_monitoring_and_api_access.go b/internal/selfmonitoringapiaccess/self_monitoring_and_api_access.go index c78d6b42..003af433 100644 --- a/internal/selfmonitoringapiaccess/self_monitoring_and_api_access.go +++ b/internal/selfmonitoringapiaccess/self_monitoring_and_api_access.go @@ -6,7 +6,6 @@ package selfmonitoringapiaccess import ( "context" "fmt" - "regexp" "slices" "strings" @@ -378,7 +377,7 @@ func prependProtocol(endpoint string, defaultProtocol string) string { // Most gRPC implementations are fine without a protocol, but the Go SDK with gRPC requires the endpoint with a // protocol when setting it via OTEL_EXPORTER_OTLP_ENDPOINT, see // https://github.com/open-telemetry/opentelemetry-go/pull/5632. - if !regexp.MustCompile(`^\w+://`).MatchString(endpoint) { + if !common.EndpointHasScheme(endpoint) { // See https://grpc.github.io/grpc/core/md_doc_naming.html return defaultProtocol + endpoint } diff --git a/test-resources/bin/util b/test-resources/bin/util index ad0e62ff..585ee95f 100644 --- a/test-resources/bin/util +++ b/test-resources/bin/util @@ -459,6 +459,16 @@ deploy_otlp_sink_if_requested() { local otlp_sink_dir="$(realpath "$(pwd)")/test-resources/e2e-test-volumes/otlp-sink" sed "s|/tmp/telemetry|$otlp_sink_dir|g" test-resources/otlp-sink/otlp-sink.yaml > "$tmpfile" + mkdir -p "$otlp_sink_dir" + echo "deleting old OTLP sink old telemetry dump files" + rm -rf "$otlp_sink_dir/traces.jsonl" + rm -rf "$otlp_sink_dir/metrics.jsonl" + rm -rf "$otlp_sink_dir/logs.jsonl" + echo "creating OTLP sink old telemetry dump files" + touch "$otlp_sink_dir/traces.jsonl" + touch "$otlp_sink_dir/metrics.jsonl" + touch "$otlp_sink_dir/logs.jsonl" + kubectl apply -f "$tmpfile" kubectl rollout status \ deployment \ diff --git a/test-resources/customresources/dash0operatorconfiguration/dash0operatorconfiguration.otlpsink.yaml b/test-resources/customresources/dash0operatorconfiguration/dash0operatorconfiguration.otlpsink.yaml index 0c1f16ed..8ecf6c53 100644 --- a/test-resources/customresources/dash0operatorconfiguration/dash0operatorconfiguration.otlpsink.yaml +++ b/test-resources/customresources/dash0operatorconfiguration/dash0operatorconfiguration.otlpsink.yaml @@ -3,8 +3,6 @@ kind: Dash0OperatorConfiguration metadata: name: dash0-operator-configuration-resource spec: - selfMonitoring: - enabled: false export: dash0: endpoint: http://otlp-sink.otlp-sink.svc.cluster.local:4317 diff --git a/test/e2e/operator.go b/test/e2e/operator.go index e1c85b25..1a78ae59 100644 --- a/test/e2e/operator.go +++ b/test/e2e/operator.go @@ -133,6 +133,19 @@ func addOptionalHelmParameters(arguments []string, images Images) []string { arguments = setIfNotEmpty(arguments, "operator.image.digest", images.operator.digest) arguments = setIfNotEmpty(arguments, "operator.image.pullPolicy", images.operator.pullPolicy) + arguments = setIfNotEmpty(arguments, "operator.initContainerImage.repository", images.instrumentation.repository) + arguments = setIfNotEmpty(arguments, "operator.initContainerImage.tag", images.instrumentation.tag) + arguments = setIfNotEmpty(arguments, "operator.initContainerImage.digest", images.instrumentation.digest) + arguments = setIfNotEmpty(arguments, "operator.initContainerImage.pullPolicy", images.instrumentation.pullPolicy) + + arguments = setIfNotEmpty(arguments, "operator.secretRefResolverImage.repository", + images.secretRefResolver.repository) + arguments = setIfNotEmpty(arguments, "operator.secretRefResolverImage.tag", images.secretRefResolver.tag) + arguments = setIfNotEmpty(arguments, "operator.secretRefResolverImage.digest", + images.secretRefResolver.digest) + arguments = setIfNotEmpty(arguments, "operator.secretRefResolverImage.pullPolicy", + images.secretRefResolver.pullPolicy) + arguments = setIfNotEmpty(arguments, "operator.collectorImage.repository", images.collector.repository) arguments = setIfNotEmpty(arguments, "operator.collectorImage.tag", images.collector.tag) arguments = setIfNotEmpty(arguments, "operator.collectorImage.digest", images.collector.digest) @@ -154,11 +167,6 @@ func addOptionalHelmParameters(arguments []string, images Images) []string { arguments = setIfNotEmpty(arguments, "operator.filelogOffsetSynchImage.pullPolicy", images.fileLogOffsetSynch.pullPolicy) - arguments = setIfNotEmpty(arguments, "operator.initContainerImage.repository", images.instrumentation.repository) - arguments = setIfNotEmpty(arguments, "operator.initContainerImage.tag", images.instrumentation.tag) - arguments = setIfNotEmpty(arguments, "operator.initContainerImage.digest", images.instrumentation.digest) - arguments = setIfNotEmpty(arguments, "operator.initContainerImage.pullPolicy", images.instrumentation.pullPolicy) - return arguments }