Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(self-monitoring): fix self-monitoring for endpoints with scheme #301

Merged
merged 1 commit into from
Mar 10, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 31 additions & 11 deletions images/pkg/common/otel.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"context"
"log"
"os"
"regexp"
"sync"
"time"

Expand All @@ -21,12 +22,6 @@ import (
semconv "go.opentelemetry.io/otel/semconv/v1.26.0"
)

const (
ProtocolGrpc = "grpc"
ProtocolHttpProtobuf = "http/protobuf"
ProtocolHttpJson = "http/json"
)

type OTelSdkConfig struct {
Endpoint string
Protocol string
Expand All @@ -35,10 +30,17 @@ type OTelSdkConfig struct {
Headers map[string]string
}

const (
ProtocolGrpc = "grpc"
ProtocolHttpProtobuf = "http/protobuf"
ProtocolHttpJson = "http/json"
)

var (
meterProvider otelmetric.MeterProvider
shutdownFunctions []func(ctx context.Context) error
oTelSdkMutex sync.Mutex
meterProvider otelmetric.MeterProvider
shutdownFunctions []func(ctx context.Context) error
oTelSdkMutex sync.Mutex
endpointSchemeRegex = regexp.MustCompile(`^\w+://`)
)

func InitOTelSdkFromEnvVars(
Expand Down Expand Up @@ -132,15 +134,29 @@ func InitOTelSdkWithConfig(
var err error
switch protocol {
case ProtocolGrpc:
options := []otlpmetricgrpc.Option{otlpmetricgrpc.WithEndpoint(oTelSdkConfig.Endpoint)}
var options []otlpmetricgrpc.Option
if EndpointHasScheme(oTelSdkConfig.Endpoint) {
log.Printf("Using a gRPC export for self-monitoring (via WithEndpointURL): %s \n", oTelSdkConfig.Endpoint)
options = []otlpmetricgrpc.Option{otlpmetricgrpc.WithEndpointURL(oTelSdkConfig.Endpoint)}
} else {
log.Printf("Using a gRPC export for self-monitoring (via WithEndpoint): %s\n", oTelSdkConfig.Endpoint)
options = []otlpmetricgrpc.Option{otlpmetricgrpc.WithEndpoint(oTelSdkConfig.Endpoint)}
}
if len(oTelSdkConfig.Headers) > 0 {
options = append(options, otlpmetricgrpc.WithHeaders(oTelSdkConfig.Headers))
}
if metricExporter, err = otlpmetricgrpc.New(ctx, options...); err != nil {
log.Printf("Cannot create the OTLP gRPC metrics exporter: %v\n", err)
}
case ProtocolHttpProtobuf:
options := []otlpmetrichttp.Option{otlpmetrichttp.WithEndpoint(oTelSdkConfig.Endpoint)}
var options []otlpmetrichttp.Option
if EndpointHasScheme(oTelSdkConfig.Endpoint) {
log.Printf("Using an HTTP export for self-monitoring (via WithEndpointURL): %s \n", oTelSdkConfig.Endpoint)
options = []otlpmetrichttp.Option{otlpmetrichttp.WithEndpointURL(oTelSdkConfig.Endpoint)}
} else {
log.Printf("Using an HTTP export for self-monitoring (via WithEndpoint): %s\n", oTelSdkConfig.Endpoint)
options = []otlpmetrichttp.Option{otlpmetrichttp.WithEndpoint(oTelSdkConfig.Endpoint)}
}
if len(oTelSdkConfig.Headers) > 0 {
options = append(options, otlpmetrichttp.WithHeaders(oTelSdkConfig.Headers))
}
Expand Down Expand Up @@ -255,3 +271,7 @@ func ShutDownOTelSdkThreadSafe(ctx context.Context) {

ShutDownOTelSdk(ctx)
}

func EndpointHasScheme(endpoint string) bool {
return endpointSchemeRegex.MatchString(endpoint)
}
6 changes: 5 additions & 1 deletion internal/controller/operator_configuration_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,10 +112,12 @@ func (r *OperatorConfigurationReconciler) Reconcile(ctx context.Context, req ctr
&logger,
)
if err != nil {
logger.Error(err, "operator configuration resource existence check failed")
return ctrl.Result{}, err
} else if checkResourceResult.ResourceDoesNotExist {
resourceDeleted = true
} else if checkResourceResult.StopReconcile {
logger.Info("stopping operator configuration resource reconcile request after resource existence check")
return ctrl.Result{}, nil
}

Expand Down Expand Up @@ -155,8 +157,10 @@ func (r *OperatorConfigurationReconciler) Reconcile(ctx context.Context, req ctr
&logger,
)
if err != nil {
logger.Error(err, "error in operator configuration resource uniqueness check")
return ctrl.Result{}, err
} else if stopReconcile {
logger.Info("stopping operator configuration resource reconcile after uniqueness check")
return ctrl.Result{}, nil
}

Expand All @@ -167,7 +171,7 @@ func (r *OperatorConfigurationReconciler) Reconcile(ctx context.Context, req ctr
operatorConfigurationResource.Status.Conditions,
&logger,
); err != nil {
// The error has already been logged in initStatusConditions
logger.Error(err, "error when initializing operator configuration resource status conditions")
return ctrl.Result{}, err
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ package selfmonitoringapiaccess
import (
"context"
"fmt"
"regexp"
"slices"
"strings"

Expand Down Expand Up @@ -378,7 +377,7 @@ func prependProtocol(endpoint string, defaultProtocol string) string {
// Most gRPC implementations are fine without a protocol, but the Go SDK with gRPC requires the endpoint with a
// protocol when setting it via OTEL_EXPORTER_OTLP_ENDPOINT, see
// https://github.com/open-telemetry/opentelemetry-go/pull/5632.
if !regexp.MustCompile(`^\w+://`).MatchString(endpoint) {
if !common.EndpointHasScheme(endpoint) {
// See https://grpc.github.io/grpc/core/md_doc_naming.html
return defaultProtocol + endpoint
}
Expand Down
10 changes: 10 additions & 0 deletions test-resources/bin/util
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,16 @@ deploy_otlp_sink_if_requested() {
local otlp_sink_dir="$(realpath "$(pwd)")/test-resources/e2e-test-volumes/otlp-sink"
sed "s|/tmp/telemetry|$otlp_sink_dir|g" test-resources/otlp-sink/otlp-sink.yaml > "$tmpfile"

mkdir -p "$otlp_sink_dir"
echo "deleting old OTLP sink old telemetry dump files"
rm -rf "$otlp_sink_dir/traces.jsonl"
rm -rf "$otlp_sink_dir/metrics.jsonl"
rm -rf "$otlp_sink_dir/logs.jsonl"
echo "creating OTLP sink old telemetry dump files"
touch "$otlp_sink_dir/traces.jsonl"
touch "$otlp_sink_dir/metrics.jsonl"
touch "$otlp_sink_dir/logs.jsonl"

kubectl apply -f "$tmpfile"
kubectl rollout status \
deployment \
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@ kind: Dash0OperatorConfiguration
metadata:
name: dash0-operator-configuration-resource
spec:
selfMonitoring:
enabled: false
export:
dash0:
endpoint: http://otlp-sink.otlp-sink.svc.cluster.local:4317
Expand Down
18 changes: 13 additions & 5 deletions test/e2e/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,19 @@ func addOptionalHelmParameters(arguments []string, images Images) []string {
arguments = setIfNotEmpty(arguments, "operator.image.digest", images.operator.digest)
arguments = setIfNotEmpty(arguments, "operator.image.pullPolicy", images.operator.pullPolicy)

arguments = setIfNotEmpty(arguments, "operator.initContainerImage.repository", images.instrumentation.repository)
arguments = setIfNotEmpty(arguments, "operator.initContainerImage.tag", images.instrumentation.tag)
arguments = setIfNotEmpty(arguments, "operator.initContainerImage.digest", images.instrumentation.digest)
arguments = setIfNotEmpty(arguments, "operator.initContainerImage.pullPolicy", images.instrumentation.pullPolicy)

arguments = setIfNotEmpty(arguments, "operator.secretRefResolverImage.repository",
images.secretRefResolver.repository)
arguments = setIfNotEmpty(arguments, "operator.secretRefResolverImage.tag", images.secretRefResolver.tag)
arguments = setIfNotEmpty(arguments, "operator.secretRefResolverImage.digest",
images.secretRefResolver.digest)
arguments = setIfNotEmpty(arguments, "operator.secretRefResolverImage.pullPolicy",
images.secretRefResolver.pullPolicy)

arguments = setIfNotEmpty(arguments, "operator.collectorImage.repository", images.collector.repository)
arguments = setIfNotEmpty(arguments, "operator.collectorImage.tag", images.collector.tag)
arguments = setIfNotEmpty(arguments, "operator.collectorImage.digest", images.collector.digest)
Expand All @@ -154,11 +167,6 @@ func addOptionalHelmParameters(arguments []string, images Images) []string {
arguments = setIfNotEmpty(arguments, "operator.filelogOffsetSynchImage.pullPolicy",
images.fileLogOffsetSynch.pullPolicy)

arguments = setIfNotEmpty(arguments, "operator.initContainerImage.repository", images.instrumentation.repository)
arguments = setIfNotEmpty(arguments, "operator.initContainerImage.tag", images.instrumentation.tag)
arguments = setIfNotEmpty(arguments, "operator.initContainerImage.digest", images.instrumentation.digest)
arguments = setIfNotEmpty(arguments, "operator.initContainerImage.pullPolicy", images.instrumentation.pullPolicy)

return arguments
}

Expand Down