From 3e8fc5406571867ac45527be9a5e12c8227e0a61 Mon Sep 17 00:00:00 2001 From: Purnesh Dixit Date: Wed, 12 Feb 2025 02:20:17 +0700 Subject: [PATCH] stats/openetelemetry: refactor and make e2e test stats verification deterministic --- stats/opentelemetry/e2e_test.go | 763 +----------------- .../internal/testutils/testutils.go | 589 +++++++++++++- 2 files changed, 602 insertions(+), 750 deletions(-) diff --git a/stats/opentelemetry/e2e_test.go b/stats/opentelemetry/e2e_test.go index 1250237b37b6..0057724006ea 100644 --- a/stats/opentelemetry/e2e_test.go +++ b/stats/opentelemetry/e2e_test.go @@ -24,8 +24,6 @@ import ( "time" "go.opentelemetry.io/otel" - otelcodes "go.opentelemetry.io/otel/codes" - oteltrace "go.opentelemetry.io/otel/trace" v3clusterpb "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3" v3corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" @@ -72,16 +70,6 @@ func Test(t *testing.T) { grpctest.RunSubTests(t, s{}) } -// traceSpanInfo is the information received about the trace span. It contains -// subset of information that is needed to verify if correct trace is being -// attributed to the rpc. -type traceSpanInfo struct { - spanKind string - name string - events []trace.Event - attributes []attribute.KeyValue -} - // defaultMetricsOptions creates default metrics options func defaultMetricsOptions(_ *testing.T, methodAttributeFilter func(string) bool) (*opentelemetry.MetricsOptions, *metric.ManualReader) { reader := metric.NewManualReader() @@ -678,248 +666,8 @@ func (s) TestMetricsAndTracesOptionEnabled(t *testing.T) { testutils.CompareMetrics(ctx, t, reader, gotMetrics, wantMetrics) // Verify traces - spans := exporter.GetSpans() - if got, want := len(spans), 6; got != want { - t.Fatalf("got %d spans, want %d", got, want) - } - - wantSI := []traceSpanInfo{ - { - name: "grpc.testing.TestService.UnaryCall", - spanKind: oteltrace.SpanKindServer.String(), - attributes: []attribute.KeyValue{ - { - Key: "Client", - Value: attribute.IntValue(0), - }, - { - Key: "FailFast", - Value: attribute.IntValue(0), - }, - { - Key: "previous-rpc-attempts", - Value: attribute.IntValue(0), - }, - { - Key: "transparent-retry", - Value: attribute.IntValue(0), - }, - }, - events: []trace.Event{ - { - Name: "Inbound compressed message", - Attributes: []attribute.KeyValue{ - { - Key: "sequence-number", - Value: attribute.IntValue(1), - }, - { - Key: "message-size", - Value: attribute.IntValue(10006), - }, - { - Key: "message-size-compressed", - Value: attribute.IntValue(57), - }, - }, - }, - { - Name: "Outbound compressed message", - Attributes: []attribute.KeyValue{ - { - Key: "sequence-number", - Value: attribute.IntValue(1), - }, - { - Key: "message-size", - Value: attribute.IntValue(10006), - }, - { - Key: "message-size-compressed", - Value: attribute.IntValue(57), - }, - }, - }, - }, - }, - { - name: "Attempt.grpc.testing.TestService.UnaryCall", - spanKind: oteltrace.SpanKindInternal.String(), - attributes: []attribute.KeyValue{ - { - Key: "Client", - Value: attribute.IntValue(1), - }, - { - Key: "FailFast", - Value: attribute.IntValue(1), - }, - { - Key: "previous-rpc-attempts", - Value: attribute.IntValue(0), - }, - { - Key: "transparent-retry", - Value: attribute.IntValue(0), - }, - }, - events: []trace.Event{ - { - Name: "Outbound compressed message", - Attributes: []attribute.KeyValue{ - { - Key: "sequence-number", - Value: attribute.IntValue(1), - }, - { - Key: "message-size", - Value: attribute.IntValue(10006), - }, - { - Key: "message-size-compressed", - Value: attribute.IntValue(57), - }, - }, - }, - { - Name: "Inbound compressed message", - Attributes: []attribute.KeyValue{ - { - Key: "sequence-number", - Value: attribute.IntValue(1), - }, - { - Key: "message-size", - Value: attribute.IntValue(10006), - }, - { - Key: "message-size-compressed", - Value: attribute.IntValue(57), - }, - }, - }, - }, - }, - { - name: "grpc.testing.TestService.UnaryCall", - spanKind: oteltrace.SpanKindClient.String(), - attributes: []attribute.KeyValue{}, - events: []trace.Event{}, - }, - { - name: "grpc.testing.TestService.FullDuplexCall", - spanKind: oteltrace.SpanKindServer.String(), - attributes: []attribute.KeyValue{ - { - Key: "Client", - Value: attribute.IntValue(0), - }, - { - Key: "FailFast", - Value: attribute.IntValue(0), - }, - { - Key: "previous-rpc-attempts", - Value: attribute.IntValue(0), - }, - { - Key: "transparent-retry", - Value: attribute.IntValue(0), - }, - }, - events: []trace.Event{}, - }, - { - name: "grpc.testing.TestService.FullDuplexCall", - spanKind: oteltrace.SpanKindClient.String(), - attributes: []attribute.KeyValue{}, - events: []trace.Event{}, - }, - { - name: "Attempt.grpc.testing.TestService.FullDuplexCall", - spanKind: oteltrace.SpanKindInternal.String(), - attributes: []attribute.KeyValue{ - { - Key: "Client", - Value: attribute.IntValue(1), - }, - { - Key: "FailFast", - Value: attribute.IntValue(1), - }, - { - Key: "previous-rpc-attempts", - Value: attribute.IntValue(0), - }, - { - Key: "transparent-retry", - Value: attribute.IntValue(0), - }, - }, - events: []trace.Event{}, - }, - } - - // Check that same traceID is used in client and server for unary RPC call. - if got, want := spans[0].SpanContext.TraceID(), spans[2].SpanContext.TraceID(); got != want { - t.Fatal("TraceID mismatch in client span and server span.") - } - // Check that the attempt span id of client matches the span id of server - // SpanContext. - if got, want := spans[0].Parent.SpanID(), spans[1].SpanContext.SpanID(); got != want { - t.Fatal("SpanID mismatch in client span and server span.") - } - - // Check that same traceID is used in client and server for streaming RPC call. - if got, want := spans[3].SpanContext.TraceID(), spans[4].SpanContext.TraceID(); got != want { - t.Fatal("TraceID mismatch in client span and server span.") - } - // Check that the attempt span id of client matches the span id of server - // SpanContext. - if got, want := spans[3].Parent.SpanID(), spans[5].SpanContext.SpanID(); got != want { - t.Fatal("SpanID mismatch in client span and server span.") - } - - for index, span := range spans { - // Check that the attempt span has the correct status - if got, want := spans[index].Status.Code, otelcodes.Ok; got != want { - t.Errorf("Got status code %v, want %v", got, want) - } - // name - if got, want := span.Name, wantSI[index].name; got != want { - t.Errorf("Span name is %q, want %q", got, want) - } - // spanKind - if got, want := span.SpanKind.String(), wantSI[index].spanKind; got != want { - t.Errorf("Got span kind %q, want %q", got, want) - } - // attributes - if got, want := len(span.Attributes), len(wantSI[index].attributes); got != want { - t.Errorf("Got attributes list of size %q, want %q", got, want) - } - for idx, att := range span.Attributes { - if got, want := att.Key, wantSI[index].attributes[idx].Key; got != want { - t.Errorf("Got attribute key for span name %v as %v, want %v", span.Name, got, want) - } - } - // events - if got, want := len(span.Events), len(wantSI[index].events); got != want { - t.Errorf("Event length is %q, want %q", got, want) - } - for eventIdx, event := range span.Events { - if got, want := event.Name, wantSI[index].events[eventIdx].Name; got != want { - t.Errorf("Got event name for span name %q as %q, want %q", span.Name, got, want) - } - for idx, att := range event.Attributes { - if got, want := att.Key, wantSI[index].events[eventIdx].Attributes[idx].Key; got != want { - t.Errorf("Got attribute key for span name %q with event name %v, as %v, want %v", span.Name, event.Name, got, want) - } - if got, want := att.Value, wantSI[index].events[eventIdx].Attributes[idx].Value; got != want { - t.Errorf("Got attribute value for span name %v with event name %v, as %v, want %v", span.Name, event.Name, got, want) - } - } - } - } + wantSI := testutils.TraceDataWithCompressor() + testutils.VerifyAndCompareTraces(ctx, t, exporter, 6, wantSI) } // TestSpan verifies that the gRPC Trace Binary propagator correctly @@ -937,7 +685,7 @@ func (s) TestMetricsAndTracesOptionEnabled(t *testing.T) { func (s) TestSpan(t *testing.T) { mo, _ := defaultMetricsOptions(t, nil) // Using defaultTraceOptions to set up OpenTelemetry with an in-memory exporter. - to, spanExporter := defaultTraceOptions(t) + to, exporter := defaultTraceOptions(t) // Start the server with trace options. ss := setupStubServer(t, mo, to) defer ss.Stop() @@ -962,249 +710,9 @@ func (s) TestSpan(t *testing.T) { t.Fatalf("stream.Recv received an unexpected error: %v, expected an EOF error", err) } - // Get the spans from the exporter - spans := spanExporter.GetSpans() - if got, want := len(spans), 6; got != want { - t.Fatalf("got %d spans, want %d", got, want) - } - - wantSI := []traceSpanInfo{ - { - name: "grpc.testing.TestService.UnaryCall", - spanKind: oteltrace.SpanKindServer.String(), - attributes: []attribute.KeyValue{ - { - Key: "Client", - Value: attribute.IntValue(0), - }, - { - Key: "FailFast", - Value: attribute.IntValue(0), - }, - { - Key: "previous-rpc-attempts", - Value: attribute.IntValue(0), - }, - { - Key: "transparent-retry", - Value: attribute.IntValue(0), - }, - }, - events: []trace.Event{ - { - Name: "Inbound compressed message", - Attributes: []attribute.KeyValue{ - { - Key: "sequence-number", - Value: attribute.IntValue(1), - }, - { - Key: "message-size", - Value: attribute.IntValue(10006), - }, - { - Key: "message-size-compressed", - Value: attribute.IntValue(10006), - }, - }, - }, - { - Name: "Outbound compressed message", - Attributes: []attribute.KeyValue{ - { - Key: "sequence-number", - Value: attribute.IntValue(1), - }, - { - Key: "message-size", - Value: attribute.IntValue(10006), - }, - { - Key: "message-size-compressed", - Value: attribute.IntValue(10006), - }, - }, - }, - }, - }, - { - name: "Attempt.grpc.testing.TestService.UnaryCall", - spanKind: oteltrace.SpanKindInternal.String(), - attributes: []attribute.KeyValue{ - { - Key: "Client", - Value: attribute.IntValue(1), - }, - { - Key: "FailFast", - Value: attribute.IntValue(1), - }, - { - Key: "previous-rpc-attempts", - Value: attribute.IntValue(0), - }, - { - Key: "transparent-retry", - Value: attribute.IntValue(0), - }, - }, - events: []trace.Event{ - { - Name: "Outbound compressed message", - Attributes: []attribute.KeyValue{ - { - Key: "sequence-number", - Value: attribute.IntValue(1), - }, - { - Key: "message-size", - Value: attribute.IntValue(10006), - }, - { - Key: "message-size-compressed", - Value: attribute.IntValue(10006), - }, - }, - }, - { - Name: "Inbound compressed message", - Attributes: []attribute.KeyValue{ - { - Key: "sequence-number", - Value: attribute.IntValue(1), - }, - { - Key: "message-size", - Value: attribute.IntValue(10006), - }, - { - Key: "message-size-compressed", - Value: attribute.IntValue(10006), - }, - }, - }, - }, - }, - { - name: "grpc.testing.TestService.UnaryCall", - spanKind: oteltrace.SpanKindClient.String(), - attributes: []attribute.KeyValue{}, - events: []trace.Event{}, - }, - { - name: "grpc.testing.TestService.FullDuplexCall", - spanKind: oteltrace.SpanKindServer.String(), - attributes: []attribute.KeyValue{ - { - Key: "Client", - Value: attribute.IntValue(0), - }, - { - Key: "FailFast", - Value: attribute.IntValue(0), - }, - { - Key: "previous-rpc-attempts", - Value: attribute.IntValue(0), - }, - { - Key: "transparent-retry", - Value: attribute.IntValue(0), - }, - }, - events: []trace.Event{}, - }, - { - name: "grpc.testing.TestService.FullDuplexCall", - spanKind: oteltrace.SpanKindClient.String(), - attributes: []attribute.KeyValue{}, - events: []trace.Event{}, - }, - { - name: "Attempt.grpc.testing.TestService.FullDuplexCall", - spanKind: oteltrace.SpanKindInternal.String(), - attributes: []attribute.KeyValue{ - { - Key: "Client", - Value: attribute.IntValue(1), - }, - { - Key: "FailFast", - Value: attribute.IntValue(1), - }, - { - Key: "previous-rpc-attempts", - Value: attribute.IntValue(0), - }, - { - Key: "transparent-retry", - Value: attribute.IntValue(0), - }, - }, - events: []trace.Event{}, - }, - } - - // Check that same traceID is used in client and server for unary RPC call. - if got, want := spans[0].SpanContext.TraceID(), spans[2].SpanContext.TraceID(); got != want { - t.Fatal("TraceID mismatch in client span and server span.") - } - // Check that the attempt span id of client matches the span id of server - // SpanContext. - if got, want := spans[0].Parent.SpanID(), spans[1].SpanContext.SpanID(); got != want { - t.Fatal("SpanID mismatch in client span and server span.") - } - - // Check that same traceID is used in client and server for streaming RPC call. - if got, want := spans[3].SpanContext.TraceID(), spans[4].SpanContext.TraceID(); got != want { - t.Fatal("TraceID mismatch in client span and server span.") - } - // Check that the attempt span id of client matches the span id of server - // SpanContext. - if got, want := spans[3].Parent.SpanID(), spans[5].SpanContext.SpanID(); got != want { - t.Fatal("SpanID mismatch in client span and server span.") - } - - for index, span := range spans { - // Check that the attempt span has the correct status - if got, want := spans[index].Status.Code, otelcodes.Ok; got != want { - t.Errorf("Got status code %v, want %v", got, want) - } - // name - if got, want := span.Name, wantSI[index].name; got != want { - t.Errorf("Span name is %q, want %q", got, want) - } - // spanKind - if got, want := span.SpanKind.String(), wantSI[index].spanKind; got != want { - t.Errorf("Got span kind %q, want %q", got, want) - } - // attributes - if got, want := len(span.Attributes), len(wantSI[index].attributes); got != want { - t.Errorf("Got attributes list of size %q, want %q", got, want) - } - for idx, att := range span.Attributes { - if got, want := att.Key, wantSI[index].attributes[idx].Key; got != want { - t.Errorf("Got attribute key for span name %v as %v, want %v", span.Name, got, want) - } - } - // events - if got, want := len(span.Events), len(wantSI[index].events); got != want { - t.Errorf("Event length is %q, want %q", got, want) - } - for eventIdx, event := range span.Events { - if got, want := event.Name, wantSI[index].events[eventIdx].Name; got != want { - t.Errorf("Got event name for span name %q as %q, want %q", span.Name, got, want) - } - for idx, att := range event.Attributes { - if got, want := att.Key, wantSI[index].events[eventIdx].Attributes[idx].Key; got != want { - t.Errorf("Got attribute key for span name %q with event name %v, as %v, want %v", span.Name, event.Name, got, want) - } - if got, want := att.Value, wantSI[index].events[eventIdx].Attributes[idx].Value; got != want { - t.Errorf("Got attribute value for span name %v with event name %v, as %v, want %v", span.Name, event.Name, got, want) - } - } - } - } + // Verify traces + wantSI := testutils.TraceDataWithoutCompressor() + testutils.VerifyAndCompareTraces(ctx, t, exporter, 6, wantSI) } // TestSpan_WithW3CContextPropagator sets up a stub server with OpenTelemetry tracing @@ -1221,7 +729,7 @@ func (s) TestSpan(t *testing.T) { func (s) TestSpan_WithW3CContextPropagator(t *testing.T) { mo, _ := defaultMetricsOptions(t, nil) // Using defaultTraceOptions to set up OpenTelemetry with an in-memory exporter - to, spanExporter := defaultTraceOptions(t) + to, exporter := defaultTraceOptions(t) // Set the W3CContextPropagator as part of TracingOptions. to.TextMapPropagator = propagation.NewCompositeTextMapPropagator(propagation.TraceContext{}) // Start the server with OpenTelemetry options @@ -1248,248 +756,10 @@ func (s) TestSpan_WithW3CContextPropagator(t *testing.T) { if _, err = stream.Recv(); err != io.EOF { t.Fatalf("stream.Recv received an unexpected error: %v, expected an EOF error", err) } - // Get the spans from the exporter - spans := spanExporter.GetSpans() - if got, want := len(spans), 6; got != want { - t.Fatalf("Got %d spans, want %d", got, want) - } - wantSI := []traceSpanInfo{ - { - name: "grpc.testing.TestService.UnaryCall", - spanKind: oteltrace.SpanKindServer.String(), - attributes: []attribute.KeyValue{ - { - Key: "Client", - Value: attribute.IntValue(0), - }, - { - Key: "FailFast", - Value: attribute.IntValue(0), - }, - { - Key: "previous-rpc-attempts", - Value: attribute.IntValue(0), - }, - { - Key: "transparent-retry", - Value: attribute.IntValue(0), - }, - }, - events: []trace.Event{ - { - Name: "Inbound compressed message", - Attributes: []attribute.KeyValue{ - { - Key: "sequence-number", - Value: attribute.IntValue(1), - }, - { - Key: "message-size", - Value: attribute.IntValue(10006), - }, - { - Key: "message-size-compressed", - Value: attribute.IntValue(10006), - }, - }, - }, - { - Name: "Outbound compressed message", - Attributes: []attribute.KeyValue{ - { - Key: "sequence-number", - Value: attribute.IntValue(1), - }, - { - Key: "message-size", - Value: attribute.IntValue(10006), - }, - { - Key: "message-size-compressed", - Value: attribute.IntValue(10006), - }, - }, - }, - }, - }, - { - name: "Attempt.grpc.testing.TestService.UnaryCall", - spanKind: oteltrace.SpanKindInternal.String(), - attributes: []attribute.KeyValue{ - { - Key: "Client", - Value: attribute.IntValue(1), - }, - { - Key: "FailFast", - Value: attribute.IntValue(1), - }, - { - Key: "previous-rpc-attempts", - Value: attribute.IntValue(0), - }, - { - Key: "transparent-retry", - Value: attribute.IntValue(0), - }, - }, - events: []trace.Event{ - { - Name: "Outbound compressed message", - Attributes: []attribute.KeyValue{ - { - Key: "sequence-number", - Value: attribute.IntValue(1), - }, - { - Key: "message-size", - Value: attribute.IntValue(10006), - }, - { - Key: "message-size-compressed", - Value: attribute.IntValue(10006), - }, - }, - }, - { - Name: "Inbound compressed message", - Attributes: []attribute.KeyValue{ - { - Key: "sequence-number", - Value: attribute.IntValue(1), - }, - { - Key: "message-size", - Value: attribute.IntValue(10006), - }, - { - Key: "message-size-compressed", - Value: attribute.IntValue(10006), - }, - }, - }, - }, - }, - { - name: "grpc.testing.TestService.UnaryCall", - spanKind: oteltrace.SpanKindClient.String(), - attributes: []attribute.KeyValue{}, - events: []trace.Event{}, - }, - { - name: "grpc.testing.TestService.FullDuplexCall", - spanKind: oteltrace.SpanKindServer.String(), - attributes: []attribute.KeyValue{ - { - Key: "Client", - Value: attribute.IntValue(0), - }, - { - Key: "FailFast", - Value: attribute.IntValue(0), - }, - { - Key: "previous-rpc-attempts", - Value: attribute.IntValue(0), - }, - { - Key: "transparent-retry", - Value: attribute.IntValue(0), - }, - }, - events: []trace.Event{}, - }, - { - name: "grpc.testing.TestService.FullDuplexCall", - spanKind: oteltrace.SpanKindClient.String(), - attributes: []attribute.KeyValue{}, - events: []trace.Event{}, - }, - { - name: "Attempt.grpc.testing.TestService.FullDuplexCall", - spanKind: oteltrace.SpanKindInternal.String(), - attributes: []attribute.KeyValue{ - { - Key: "Client", - Value: attribute.IntValue(1), - }, - { - Key: "FailFast", - Value: attribute.IntValue(1), - }, - { - Key: "previous-rpc-attempts", - Value: attribute.IntValue(0), - }, - { - Key: "transparent-retry", - Value: attribute.IntValue(0), - }, - }, - events: []trace.Event{}, - }, - } - - // Check that same traceID is used in client and server. - if got, want := spans[0].SpanContext.TraceID(), spans[2].SpanContext.TraceID(); got != want { - t.Fatal("TraceID mismatch in client span and server span.") - } - // Check that the attempt span id of client matches the span id of server - // SpanContext. - if got, want := spans[0].Parent.SpanID(), spans[1].SpanContext.SpanID(); got != want { - t.Fatal("SpanID mismatch in client span and server span.") - } - - // Check that same traceID is used in client and server. - if got, want := spans[3].SpanContext.TraceID(), spans[4].SpanContext.TraceID(); got != want { - t.Fatal("TraceID mismatch in client span and server span.") - } - // Check that the attempt span id of client matches the span id of server - // SpanContext. - if got, want := spans[3].Parent.SpanID(), spans[5].SpanContext.SpanID(); got != want { - t.Fatal("SpanID mismatch in client span and server span.") - } - for index, span := range spans { - // Check that the attempt span has the correct status - if got, want := spans[index].Status.Code, otelcodes.Ok; got != want { - t.Errorf("Got status code %v, want %v", got, want) - } - // name - if got, want := span.Name, wantSI[index].name; got != want { - t.Errorf("Span name is %q, want %q", got, want) - } - // spanKind - if got, want := span.SpanKind.String(), wantSI[index].spanKind; got != want { - t.Errorf("Got span kind %q, want %q", got, want) - } - // attributes - if got, want := len(span.Attributes), len(wantSI[index].attributes); got != want { - t.Errorf("Got attributes list of size %q, want %q", got, want) - } - for idx, att := range span.Attributes { - if got, want := att.Key, wantSI[index].attributes[idx].Key; got != want { - t.Errorf("Got attribute key for span name %v as %v, want %v", span.Name, got, want) - } - } - // events - if got, want := len(span.Events), len(wantSI[index].events); got != want { - t.Errorf("Event length is %q, want %q", got, want) - } - for eventIdx, event := range span.Events { - if got, want := event.Name, wantSI[index].events[eventIdx].Name; got != want { - t.Errorf("Got event name for span name %q as %q, want %q", span.Name, got, want) - } - for idx, att := range event.Attributes { - if got, want := att.Key, wantSI[index].events[eventIdx].Attributes[idx].Key; got != want { - t.Errorf("Got attribute key for span name %q with event name %v, as %v, want %v", span.Name, event.Name, got, want) - } - if got, want := att.Value, wantSI[index].events[eventIdx].Attributes[idx].Value; got != want { - t.Errorf("Got attribute value for span name %v with event name %v, as %v, want %v", span.Name, event.Name, got, want) - } - } - } - } + // Verify traces + wantSI := testutils.TraceDataWithoutCompressor() + testutils.VerifyAndCompareTraces(ctx, t, exporter, 6, wantSI) } // TestMetricsAndTracesDisabled verifies that RPCs call succeed as expected @@ -1569,9 +839,16 @@ func (s) TestRPCSpanErrorStatus(t *testing.T) { }}) // Verify traces - spans := exporter.GetSpans() - if got, want := len(spans), 3; got != want { - t.Fatalf("got %d spans, want %d", got, want) + wantSpans := 3 + var spans []tracetest.SpanStub + for ; ctx.Err() == nil; <-time.After(time.Millisecond) { + spans = exporter.GetSpans() + if len(spans) == wantSpans { + break + } + } + if len(spans) != wantSpans { + t.Fatalf("got %d spans, want %d", len(spans), wantSpans) } // Verify spans has error status with rpcErrorMsg as error message. diff --git a/stats/opentelemetry/internal/testutils/testutils.go b/stats/opentelemetry/internal/testutils/testutils.go index 721ce00cd347..83fa9d6c9684 100644 --- a/stats/opentelemetry/internal/testutils/testutils.go +++ b/stats/opentelemetry/internal/testutils/testutils.go @@ -23,10 +23,15 @@ import ( "testing" "time" + otelcodes "go.opentelemetry.io/otel/codes" + oteltrace "go.opentelemetry.io/otel/trace" + "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/sdk/metric" "go.opentelemetry.io/otel/sdk/metric/metricdata" "go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest" + "go.opentelemetry.io/otel/sdk/trace" + "go.opentelemetry.io/otel/sdk/trace/tracetest" ) // Redefine default bounds here to avoid a cyclic dependency with top level @@ -39,13 +44,23 @@ var ( DefaultSizeBounds = []float64{0, 1024, 2048, 4096, 16384, 65536, 262144, 1048576, 4194304, 16777216, 67108864, 268435456, 1073741824, 4294967296} ) +// TraceSpanInfo is the information received about the trace span. It contains +// subset of information that is needed to verify if correct trace is being +// attributed to the rpc. +type TraceSpanInfo struct { + SpanKind string + Name string + Events []trace.Event + Attributes []attribute.KeyValue +} + // waitForServerCompletedRPCs waits until the unary and streaming stats.End // calls are finished processing. It does this by waiting for the expected // metric triggered by stats.End to appear through the passed in metrics reader. // // Returns a new gotMetrics map containing the metric data being polled for, or // an error if failed to wait for metric. -func waitForServerCompletedRPCs(ctx context.Context, t *testing.T, reader metric.Reader, wantMetric metricdata.Metrics) (map[string]metricdata.Metrics, error) { +func waitForServerCompletedRPCs(ctx context.Context, reader metric.Reader, wantMetric metricdata.Metrics) (map[string]metricdata.Metrics, error) { for ; ctx.Err() == nil; <-time.After(time.Millisecond) { rm := &metricdata.ResourceMetrics{} reader.Collect(ctx, rm) @@ -59,7 +74,16 @@ func waitForServerCompletedRPCs(ctx context.Context, t *testing.T, reader metric if !ok { continue } - metricdatatest.AssertEqual(t, wantMetric, val, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreExemplars()) + if _, ok := val.Data.(metricdata.Histogram[int64]); ok { + if len(wantMetric.Data.(metricdata.Histogram[int64]).DataPoints) != len(val.Data.(metricdata.Histogram[int64]).DataPoints) { + continue + } + } + if _, ok := val.Data.(metricdata.Histogram[float64]); ok { + if len(wantMetric.Data.(metricdata.Histogram[float64]).DataPoints) != len(val.Data.(metricdata.Histogram[float64]).DataPoints) { + continue + } + } return gotMetrics, nil } return nil, fmt.Errorf("error waiting for metric %v: %v", wantMetric, ctx.Err()) @@ -766,20 +790,25 @@ func CompareMetrics(ctx context.Context, t *testing.T, mr *metric.ManualReader, // handled async server side. Thus, poll until metrics created from // stats.End show up. var err error - if gotMetrics, err = waitForServerCompletedRPCs(ctx, t, mr, metric); err != nil { // move to shared helper + if gotMetrics, err = waitForServerCompletedRPCs(ctx, mr, metric); err != nil { // move to shared helper t.Fatal(err) } + val := gotMetrics[metric.Name] + if !metricdatatest.AssertEqual(t, metric, val, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreExemplars()) { + t.Fatalf("Metrics data type not equal for metric: %v", metric.Name) + } continue } // If one of the duration metrics, ignore the bucket counts, and make // sure it count falls within a bucket <= 5 seconds (maximum duration of // test due to context). - val, ok := gotMetrics[metric.Name] - if !ok { - t.Fatalf("Metric %v not present in recorded metrics", metric.Name) - } if metric.Name == "grpc.client.attempt.duration" || metric.Name == "grpc.client.call.duration" || metric.Name == "grpc.server.call.duration" { + var err error + if gotMetrics, err = waitForServerCompletedRPCs(ctx, mr, metric); err != nil { // move to shared helper + t.Fatal(err) + } + val := gotMetrics[metric.Name] if !metricdatatest.AssertEqual(t, metric, val, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreExemplars(), metricdatatest.IgnoreValue()) { t.Fatalf("Metrics data type not equal for metric: %v", metric.Name) } @@ -789,8 +818,554 @@ func CompareMetrics(ctx context.Context, t *testing.T, mr *metric.ManualReader, continue } + val, ok := gotMetrics[metric.Name] + if !ok { + t.Fatalf("Metric %v not present in recorded metrics", metric.Name) + } if !metricdatatest.AssertEqual(t, metric, val, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreExemplars()) { t.Fatalf("Metrics data type not equal for metric: %v", metric.Name) } } } + +// waitForCompleteTraceSpans waits until the in-memory span exporter has +// received all the expected number of spans. It polls the exporter at a short +// interval until the desired number of spans are available or the context is +// cancelled. +// +// Returns the collected spans or an error if the context deadline is exceeded +// before the expected number of spans are exported. +func waitForCompleteTraceSpans(ctx context.Context, exporter *tracetest.InMemoryExporter, wantSpans int) (tracetest.SpanStubs, error) { + var spans []tracetest.SpanStub + for ; ctx.Err() == nil; <-time.After(time.Millisecond) { + spans = exporter.GetSpans() + if len(spans) == wantSpans { + return spans, nil + } + } + return nil, fmt.Errorf("error waiting for complete trace spans %d: %v", wantSpans, ctx.Err()) +} + +// TraceDataWithCompressor returns a TraceSpanInfo for a unary RPC and +// streaming RPC with certain compression and message flow sent, when +// compressor is used +func TraceDataWithCompressor() []TraceSpanInfo { + return []TraceSpanInfo{ + { + Name: "grpc.testing.TestService.UnaryCall", + SpanKind: oteltrace.SpanKindServer.String(), + Attributes: []attribute.KeyValue{ + { + Key: "Client", + Value: attribute.IntValue(0), + }, + { + Key: "FailFast", + Value: attribute.IntValue(0), + }, + { + Key: "previous-rpc-attempts", + Value: attribute.IntValue(0), + }, + { + Key: "transparent-retry", + Value: attribute.IntValue(0), + }, + }, + Events: []trace.Event{ + { + Name: "Inbound compressed message", + Attributes: []attribute.KeyValue{ + { + Key: "sequence-number", + Value: attribute.IntValue(1), + }, + { + Key: "message-size", + Value: attribute.IntValue(10006), + }, + { + Key: "message-size-compressed", + Value: attribute.IntValue(57), + }, + }, + }, + { + Name: "Outbound compressed message", + Attributes: []attribute.KeyValue{ + { + Key: "sequence-number", + Value: attribute.IntValue(1), + }, + { + Key: "message-size", + Value: attribute.IntValue(10006), + }, + { + Key: "message-size-compressed", + Value: attribute.IntValue(57), + }, + }, + }, + }, + }, + { + Name: "Attempt.grpc.testing.TestService.UnaryCall", + SpanKind: oteltrace.SpanKindInternal.String(), + Attributes: []attribute.KeyValue{ + { + Key: "Client", + Value: attribute.IntValue(1), + }, + { + Key: "FailFast", + Value: attribute.IntValue(1), + }, + { + Key: "previous-rpc-attempts", + Value: attribute.IntValue(0), + }, + { + Key: "transparent-retry", + Value: attribute.IntValue(0), + }, + }, + Events: []trace.Event{ + { + Name: "Outbound compressed message", + Attributes: []attribute.KeyValue{ + { + Key: "sequence-number", + Value: attribute.IntValue(1), + }, + { + Key: "message-size", + Value: attribute.IntValue(10006), + }, + { + Key: "message-size-compressed", + Value: attribute.IntValue(57), + }, + }, + }, + { + Name: "Inbound compressed message", + Attributes: []attribute.KeyValue{ + { + Key: "sequence-number", + Value: attribute.IntValue(1), + }, + { + Key: "message-size", + Value: attribute.IntValue(10006), + }, + { + Key: "message-size-compressed", + Value: attribute.IntValue(57), + }, + }, + }, + }, + }, + { + Name: "grpc.testing.TestService.UnaryCall", + SpanKind: oteltrace.SpanKindClient.String(), + Attributes: []attribute.KeyValue{}, + Events: []trace.Event{}, + }, + { + Name: "grpc.testing.TestService.FullDuplexCall", + SpanKind: oteltrace.SpanKindServer.String(), + Attributes: []attribute.KeyValue{ + { + Key: "Client", + Value: attribute.IntValue(0), + }, + { + Key: "FailFast", + Value: attribute.IntValue(0), + }, + { + Key: "previous-rpc-attempts", + Value: attribute.IntValue(0), + }, + { + Key: "transparent-retry", + Value: attribute.IntValue(0), + }, + }, + Events: []trace.Event{}, + }, + { + Name: "grpc.testing.TestService.FullDuplexCall", + SpanKind: oteltrace.SpanKindClient.String(), + Attributes: []attribute.KeyValue{}, + Events: []trace.Event{}, + }, + { + Name: "Attempt.grpc.testing.TestService.FullDuplexCall", + SpanKind: oteltrace.SpanKindInternal.String(), + Attributes: []attribute.KeyValue{ + { + Key: "Client", + Value: attribute.IntValue(1), + }, + { + Key: "FailFast", + Value: attribute.IntValue(1), + }, + { + Key: "previous-rpc-attempts", + Value: attribute.IntValue(0), + }, + { + Key: "transparent-retry", + Value: attribute.IntValue(0), + }, + }, + Events: []trace.Event{}, + }, + } +} + +// TraceDataWithoutCompressor returns a TraceSpanInfo for a unary RPC and +// streaming RPC with certain compression and message flow sent, when +// compressor is not used. +func TraceDataWithoutCompressor() []TraceSpanInfo { + return []TraceSpanInfo{ + { + Name: "grpc.testing.TestService.UnaryCall", + SpanKind: oteltrace.SpanKindServer.String(), + Attributes: []attribute.KeyValue{ + { + Key: "Client", + Value: attribute.IntValue(0), + }, + { + Key: "FailFast", + Value: attribute.IntValue(0), + }, + { + Key: "previous-rpc-attempts", + Value: attribute.IntValue(0), + }, + { + Key: "transparent-retry", + Value: attribute.IntValue(0), + }, + }, + Events: []trace.Event{ + { + Name: "Inbound compressed message", + Attributes: []attribute.KeyValue{ + { + Key: "sequence-number", + Value: attribute.IntValue(1), + }, + { + Key: "message-size", + Value: attribute.IntValue(10006), + }, + { + Key: "message-size-compressed", + Value: attribute.IntValue(10006), + }, + }, + }, + { + Name: "Outbound compressed message", + Attributes: []attribute.KeyValue{ + { + Key: "sequence-number", + Value: attribute.IntValue(1), + }, + { + Key: "message-size", + Value: attribute.IntValue(10006), + }, + { + Key: "message-size-compressed", + Value: attribute.IntValue(10006), + }, + }, + }, + }, + }, + { + Name: "Attempt.grpc.testing.TestService.UnaryCall", + SpanKind: oteltrace.SpanKindInternal.String(), + Attributes: []attribute.KeyValue{ + { + Key: "Client", + Value: attribute.IntValue(1), + }, + { + Key: "FailFast", + Value: attribute.IntValue(1), + }, + { + Key: "previous-rpc-attempts", + Value: attribute.IntValue(0), + }, + { + Key: "transparent-retry", + Value: attribute.IntValue(0), + }, + }, + Events: []trace.Event{ + { + Name: "Outbound compressed message", + Attributes: []attribute.KeyValue{ + { + Key: "sequence-number", + Value: attribute.IntValue(1), + }, + { + Key: "message-size", + Value: attribute.IntValue(10006), + }, + { + Key: "message-size-compressed", + Value: attribute.IntValue(10006), + }, + }, + }, + { + Name: "Inbound compressed message", + Attributes: []attribute.KeyValue{ + { + Key: "sequence-number", + Value: attribute.IntValue(1), + }, + { + Key: "message-size", + Value: attribute.IntValue(10006), + }, + { + Key: "message-size-compressed", + Value: attribute.IntValue(10006), + }, + }, + }, + }, + }, + { + Name: "grpc.testing.TestService.UnaryCall", + SpanKind: oteltrace.SpanKindClient.String(), + Attributes: []attribute.KeyValue{}, + Events: []trace.Event{}, + }, + { + Name: "grpc.testing.TestService.FullDuplexCall", + SpanKind: oteltrace.SpanKindServer.String(), + Attributes: []attribute.KeyValue{ + { + Key: "Client", + Value: attribute.IntValue(0), + }, + { + Key: "FailFast", + Value: attribute.IntValue(0), + }, + { + Key: "previous-rpc-attempts", + Value: attribute.IntValue(0), + }, + { + Key: "transparent-retry", + Value: attribute.IntValue(0), + }, + }, + Events: []trace.Event{}, + }, + { + Name: "grpc.testing.TestService.FullDuplexCall", + SpanKind: oteltrace.SpanKindClient.String(), + Attributes: []attribute.KeyValue{}, + Events: []trace.Event{}, + }, + { + Name: "Attempt.grpc.testing.TestService.FullDuplexCall", + SpanKind: oteltrace.SpanKindInternal.String(), + Attributes: []attribute.KeyValue{ + { + Key: "Client", + Value: attribute.IntValue(1), + }, + { + Key: "FailFast", + Value: attribute.IntValue(1), + }, + { + Key: "previous-rpc-attempts", + Value: attribute.IntValue(0), + }, + { + Key: "transparent-retry", + Value: attribute.IntValue(0), + }, + }, + Events: []trace.Event{}, + }, + } +} + +// VerifyAndCompareTraces first waits for the exporter to receive the expected +// number of spans. It then groups the received spans by their TraceID. For +// each trace group, it identifies the client, server, and attempt spans for +// both unary and streaming RPCs. It checks that the expected spans are +// present and that the server spans have the correct parent (attempt span). +// Finally, it compares the content of each span (name, kind, attributes, +// events) against the provided expected spans information. +func VerifyAndCompareTraces(ctx context.Context, t *testing.T, exporter *tracetest.InMemoryExporter, wantSpans int, wantSpanInfos []TraceSpanInfo) { + spans, err := waitForCompleteTraceSpans(ctx, exporter, wantSpans) + if err != nil { + t.Fatal(err) + } + + // Group spans by TraceID + traceSpans := make(map[oteltrace.TraceID][]tracetest.SpanStub) + for _, span := range spans { + traceID := span.SpanContext.TraceID() + traceSpans[traceID] = append(traceSpans[traceID], span) + } + + // For each trace group, verify relationships and content + for traceID, spans := range traceSpans { + var unaryClient, unaryServer, unaryAttempt *tracetest.SpanStub + var streamClient, streamServer, streamAttempt *tracetest.SpanStub + var isUnary, isStream bool + + for _, span := range spans { + switch { + case span.Name == "grpc.testing.TestService.UnaryCall": + isUnary = true + if span.SpanKind == oteltrace.SpanKindClient { + unaryClient = &span + } else { + unaryServer = &span + } + case span.Name == "Attempt.grpc.testing.TestService.UnaryCall": + isUnary = true + unaryAttempt = &span + case span.Name == "grpc.testing.TestService.FullDuplexCall": + isStream = true + if span.SpanKind == oteltrace.SpanKindClient { + streamClient = &span + } else { + streamServer = &span + } + case span.Name == "Attempt.grpc.testing.TestService.FullDuplexCall": + isStream = true + streamAttempt = &span + } + } + + if isUnary { + // Verify Unary Call Spans + if unaryClient == nil { + t.Error("Unary call client span not found") + } + if unaryServer == nil { + t.Error("Unary call server span not found") + } + if unaryAttempt == nil { + t.Error("Unary call attempt span not found") + } + // Check TraceID consistency + if unaryClient != nil && unaryClient.SpanContext.TraceID() != traceID || unaryServer.SpanContext.TraceID() != traceID { + t.Error("Unary call spans have inconsistent TraceIDs") + } + // Check parent-child relationship via SpanID + if unaryServer != nil && unaryServer.Parent.SpanID() != unaryAttempt.SpanContext.SpanID() { + t.Error("Unary server span parent does not match attempt span ID") + } + } + + if isStream { + // Verify Streaming Call Spans + if streamClient == nil { + t.Error("Streaming call client span not found") + } + if streamServer == nil { + t.Error("Streaming call server span not found") + } + if streamAttempt == nil { + t.Error("Streaming call attempt span not found") + } + // Check TraceID consistency + if streamClient != nil && streamClient.SpanContext.TraceID() != traceID || streamServer.SpanContext.TraceID() != traceID { + t.Error("Streaming call spans have inconsistent TraceIDs") + } + if streamServer != nil && streamServer.Parent.SpanID() != streamAttempt.SpanContext.SpanID() { + t.Error("Streaming server span parent does not match attempt span ID") + } + } + } + + compareTraces(t, spans, wantSpanInfos) +} + +func compareTraces(t *testing.T, spans tracetest.SpanStubs, wantSpanInfos []TraceSpanInfo) { + // Validate attributes/events by span type instead of index + for _, span := range spans { + // Check that the attempt span has the correct status + if got, want := span.Status.Code, otelcodes.Ok; got != want { + t.Errorf("Got status code %v, want %v", got, want) + } + + var want TraceSpanInfo + switch { + case span.Name == "grpc.testing.TestService.UnaryCall" && span.SpanKind == oteltrace.SpanKindServer: + want = wantSpanInfos[0] // Reference expected unary server span + case span.Name == "Attempt.grpc.testing.TestService.UnaryCall" && span.SpanKind == oteltrace.SpanKindInternal: + want = wantSpanInfos[1] + case span.Name == "grpc.testing.TestService.UnaryCall" && span.SpanKind == oteltrace.SpanKindClient: + want = wantSpanInfos[2] + case span.Name == "grpc.testing.TestService.FullDuplexCall" && span.SpanKind == oteltrace.SpanKindServer: + want = wantSpanInfos[3] + case span.Name == "grpc.testing.TestService.FullDuplexCall" && span.SpanKind == oteltrace.SpanKindClient: + want = wantSpanInfos[4] + case span.Name == "Attempt.grpc.testing.TestService.FullDuplexCall" && span.SpanKind == oteltrace.SpanKindInternal: + want = wantSpanInfos[5] + default: + t.Errorf("Unexpected span Name: %q", span.Name) + continue + } + + // name + if got, want := span.Name, want.Name; got != want { + t.Errorf("Span name is %q, want %q", got, want) + } + // spanKind + if got, want := span.SpanKind.String(), want.SpanKind; got != want { + t.Errorf("Got span kind %q, want %q", got, want) + } + // attributes + if got, want := len(span.Attributes), len(want.Attributes); got != want { + t.Errorf("Got attributes list of size %q, want %q", got, want) + } + for idx, att := range span.Attributes { + if got, want := att.Key, want.Attributes[idx].Key; got != want { + t.Errorf("Got attribute key for span name %v as %v, want %v", span.Name, got, want) + } + } + // events + if got, want := len(span.Events), len(want.Events); got != want { + t.Errorf("Event length is %q, want %q", got, want) + } + for eventIdx, event := range span.Events { + if got, want := event.Name, want.Events[eventIdx].Name; got != want { + t.Errorf("Got event name for span name %q as %q, want %q", span.Name, got, want) + } + for idx, att := range event.Attributes { + if got, want := att.Key, want.Events[eventIdx].Attributes[idx].Key; got != want { + t.Errorf("Got attribute key for span name %q with event name %v, as %v, want %v", span.Name, event.Name, got, want) + } + if got, want := att.Value, want.Events[eventIdx].Attributes[idx].Value; got != want { + t.Errorf("Got attribute value for span name %v with event name %v, as %v, want %v", span.Name, event.Name, got, want) + } + } + } + } +}