Skip to content

Commit

Permalink
fix(self-monitoring): fix self-monitoring for endpoints with scheme
Browse files Browse the repository at this point in the history
This fixes the issue that self-monitoring telemetry could not be
exported from the operator manager when the configured export endpoint
included a scheme (http://, https://).
  • Loading branch information
basti1302 committed Mar 10, 2025
1 parent 7e06662 commit 457dde8
Show file tree
Hide file tree
Showing 6 changed files with 60 additions and 21 deletions.
42 changes: 31 additions & 11 deletions images/pkg/common/otel.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"context"
"log"
"os"
"regexp"
"sync"
"time"

Expand All @@ -21,12 +22,6 @@ import (
semconv "go.opentelemetry.io/otel/semconv/v1.26.0"
)

const (
ProtocolGrpc = "grpc"
ProtocolHttpProtobuf = "http/protobuf"
ProtocolHttpJson = "http/json"
)

type OTelSdkConfig struct {
Endpoint string
Protocol string
Expand All @@ -35,10 +30,17 @@ type OTelSdkConfig struct {
Headers map[string]string
}

const (
ProtocolGrpc = "grpc"
ProtocolHttpProtobuf = "http/protobuf"
ProtocolHttpJson = "http/json"
)

var (
meterProvider otelmetric.MeterProvider
shutdownFunctions []func(ctx context.Context) error
oTelSdkMutex sync.Mutex
meterProvider otelmetric.MeterProvider
shutdownFunctions []func(ctx context.Context) error
oTelSdkMutex sync.Mutex
endpointSchemeRegex = regexp.MustCompile(`^\w+://`)
)

func InitOTelSdkFromEnvVars(
Expand Down Expand Up @@ -132,15 +134,29 @@ func InitOTelSdkWithConfig(
var err error
switch protocol {
case ProtocolGrpc:
options := []otlpmetricgrpc.Option{otlpmetricgrpc.WithEndpoint(oTelSdkConfig.Endpoint)}
var options []otlpmetricgrpc.Option
if EndpointHasScheme(oTelSdkConfig.Endpoint) {
log.Printf("Using a gRPC export for self-monitoring (via WithEndpointURL): %s \n", oTelSdkConfig.Endpoint)
options = []otlpmetricgrpc.Option{otlpmetricgrpc.WithEndpointURL(oTelSdkConfig.Endpoint)}
} else {
log.Printf("Using a gRPC export for self-monitoring (via WithEndpoint): %s\n", oTelSdkConfig.Endpoint)
options = []otlpmetricgrpc.Option{otlpmetricgrpc.WithEndpoint(oTelSdkConfig.Endpoint)}
}
if len(oTelSdkConfig.Headers) > 0 {
options = append(options, otlpmetricgrpc.WithHeaders(oTelSdkConfig.Headers))
}
if metricExporter, err = otlpmetricgrpc.New(ctx, options...); err != nil {
log.Printf("Cannot create the OTLP gRPC metrics exporter: %v\n", err)
}
case ProtocolHttpProtobuf:
options := []otlpmetrichttp.Option{otlpmetrichttp.WithEndpoint(oTelSdkConfig.Endpoint)}
var options []otlpmetrichttp.Option
if EndpointHasScheme(oTelSdkConfig.Endpoint) {
log.Printf("Using an HTTP export for self-monitoring (via WithEndpointURL): %s \n", oTelSdkConfig.Endpoint)
options = []otlpmetrichttp.Option{otlpmetrichttp.WithEndpointURL(oTelSdkConfig.Endpoint)}
} else {
log.Printf("Using an HTTP export for self-monitoring (via WithEndpoint): %s\n", oTelSdkConfig.Endpoint)
options = []otlpmetrichttp.Option{otlpmetrichttp.WithEndpoint(oTelSdkConfig.Endpoint)}
}
if len(oTelSdkConfig.Headers) > 0 {
options = append(options, otlpmetrichttp.WithHeaders(oTelSdkConfig.Headers))
}
Expand Down Expand Up @@ -255,3 +271,7 @@ func ShutDownOTelSdkThreadSafe(ctx context.Context) {

ShutDownOTelSdk(ctx)
}

func EndpointHasScheme(endpoint string) bool {
return endpointSchemeRegex.MatchString(endpoint)
}
6 changes: 5 additions & 1 deletion internal/controller/operator_configuration_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,10 +112,12 @@ func (r *OperatorConfigurationReconciler) Reconcile(ctx context.Context, req ctr
&logger,
)
if err != nil {
logger.Error(err, "operator configuration resource existence check failed")
return ctrl.Result{}, err
} else if checkResourceResult.ResourceDoesNotExist {
resourceDeleted = true
} else if checkResourceResult.StopReconcile {
logger.Info("stopping operator configuration resource reconcile request after resource existence check")
return ctrl.Result{}, nil
}

Expand Down Expand Up @@ -155,8 +157,10 @@ func (r *OperatorConfigurationReconciler) Reconcile(ctx context.Context, req ctr
&logger,
)
if err != nil {
logger.Error(err, "error in operator configuration resource uniqueness check")
return ctrl.Result{}, err
} else if stopReconcile {
logger.Info("stopping operator configuration resource reconcile after uniqueness check")
return ctrl.Result{}, nil
}

Expand All @@ -167,7 +171,7 @@ func (r *OperatorConfigurationReconciler) Reconcile(ctx context.Context, req ctr
operatorConfigurationResource.Status.Conditions,
&logger,
); err != nil {
// The error has already been logged in initStatusConditions
logger.Error(err, "error when initializing operator configuration resource status conditions")
return ctrl.Result{}, err
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ package selfmonitoringapiaccess
import (
"context"
"fmt"
"regexp"
"slices"
"strings"

Expand Down Expand Up @@ -378,7 +377,7 @@ func prependProtocol(endpoint string, defaultProtocol string) string {
// Most gRPC implementations are fine without a protocol, but the Go SDK with gRPC requires the endpoint with a
// protocol when setting it via OTEL_EXPORTER_OTLP_ENDPOINT, see
// https://github.com/open-telemetry/opentelemetry-go/pull/5632.
if !regexp.MustCompile(`^\w+://`).MatchString(endpoint) {
if !common.EndpointHasScheme(endpoint) {
// See https://grpc.github.io/grpc/core/md_doc_naming.html
return defaultProtocol + endpoint
}
Expand Down
10 changes: 10 additions & 0 deletions test-resources/bin/util
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,16 @@ deploy_otlp_sink_if_requested() {
local otlp_sink_dir="$(realpath "$(pwd)")/test-resources/e2e-test-volumes/otlp-sink"
sed "s|/tmp/telemetry|$otlp_sink_dir|g" test-resources/otlp-sink/otlp-sink.yaml > "$tmpfile"

mkdir -p "$otlp_sink_dir"
echo "deleting old OTLP sink old telemetry dump files"
rm -rf "$otlp_sink_dir/traces.jsonl"
rm -rf "$otlp_sink_dir/metrics.jsonl"
rm -rf "$otlp_sink_dir/logs.jsonl"
echo "creating OTLP sink old telemetry dump files"
touch "$otlp_sink_dir/traces.jsonl"
touch "$otlp_sink_dir/metrics.jsonl"
touch "$otlp_sink_dir/logs.jsonl"

kubectl apply -f "$tmpfile"
kubectl rollout status \
deployment \
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@ kind: Dash0OperatorConfiguration
metadata:
name: dash0-operator-configuration-resource
spec:
selfMonitoring:
enabled: false
export:
dash0:
endpoint: http://otlp-sink.otlp-sink.svc.cluster.local:4317
Expand Down
18 changes: 13 additions & 5 deletions test/e2e/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,19 @@ func addOptionalHelmParameters(arguments []string, images Images) []string {
arguments = setIfNotEmpty(arguments, "operator.image.digest", images.operator.digest)
arguments = setIfNotEmpty(arguments, "operator.image.pullPolicy", images.operator.pullPolicy)

arguments = setIfNotEmpty(arguments, "operator.initContainerImage.repository", images.instrumentation.repository)
arguments = setIfNotEmpty(arguments, "operator.initContainerImage.tag", images.instrumentation.tag)
arguments = setIfNotEmpty(arguments, "operator.initContainerImage.digest", images.instrumentation.digest)
arguments = setIfNotEmpty(arguments, "operator.initContainerImage.pullPolicy", images.instrumentation.pullPolicy)

arguments = setIfNotEmpty(arguments, "operator.secretRefResolverImage.repository",
images.secretRefResolver.repository)
arguments = setIfNotEmpty(arguments, "operator.secretRefResolverImage.tag", images.secretRefResolver.tag)
arguments = setIfNotEmpty(arguments, "operator.secretRefResolverImage.digest",
images.secretRefResolver.digest)
arguments = setIfNotEmpty(arguments, "operator.secretRefResolverImage.pullPolicy",
images.secretRefResolver.pullPolicy)

arguments = setIfNotEmpty(arguments, "operator.collectorImage.repository", images.collector.repository)
arguments = setIfNotEmpty(arguments, "operator.collectorImage.tag", images.collector.tag)
arguments = setIfNotEmpty(arguments, "operator.collectorImage.digest", images.collector.digest)
Expand All @@ -154,11 +167,6 @@ func addOptionalHelmParameters(arguments []string, images Images) []string {
arguments = setIfNotEmpty(arguments, "operator.filelogOffsetSynchImage.pullPolicy",
images.fileLogOffsetSynch.pullPolicy)

arguments = setIfNotEmpty(arguments, "operator.initContainerImage.repository", images.instrumentation.repository)
arguments = setIfNotEmpty(arguments, "operator.initContainerImage.tag", images.instrumentation.tag)
arguments = setIfNotEmpty(arguments, "operator.initContainerImage.digest", images.instrumentation.digest)
arguments = setIfNotEmpty(arguments, "operator.initContainerImage.pullPolicy", images.instrumentation.pullPolicy)

return arguments
}

Expand Down

0 comments on commit 457dde8

Please sign in to comment.