diff --git a/stats/opentelemetry/csm/observability_test.go b/stats/opentelemetry/csm/observability_test.go index 520d353a6707..2b2967c892b2 100644 --- a/stats/opentelemetry/csm/observability_test.go +++ b/stats/opentelemetry/csm/observability_test.go @@ -246,7 +246,8 @@ func (s) TestCSMPluginOptionUnary(t *testing.T) { opts := test.opts opts.Target = ss.Target wantMetrics := itestutils.MetricDataUnary(opts) - itestutils.CompareMetrics(ctx, t, reader, gotMetrics, wantMetrics) + gotMetrics = itestutils.WaitForServerMetrics(ctx, t, reader, gotMetrics, wantMetrics) + itestutils.CompareMetrics(t, gotMetrics, wantMetrics) }) } } @@ -419,7 +420,8 @@ func (s) TestCSMPluginOptionStreaming(t *testing.T) { opts := test.opts opts.Target = ss.Target wantMetrics := itestutils.MetricDataStreaming(opts) - itestutils.CompareMetrics(ctx, t, reader, gotMetrics, wantMetrics) + gotMetrics = itestutils.WaitForServerMetrics(ctx, t, reader, gotMetrics, wantMetrics) + itestutils.CompareMetrics(t, gotMetrics, wantMetrics) }) } } @@ -603,7 +605,8 @@ func (s) TestXDSLabels(t *testing.T) { }, } - itestutils.CompareMetrics(ctx, t, reader, gotMetrics, wantMetrics) + gotMetrics = itestutils.WaitForServerMetrics(ctx, t, reader, gotMetrics, wantMetrics) + itestutils.CompareMetrics(t, gotMetrics, wantMetrics) } // TestObservability tests that Observability global function compiles and runs diff --git a/stats/opentelemetry/e2e_test.go b/stats/opentelemetry/e2e_test.go index 1250237b37b6..1e3d16b6e59f 100644 --- a/stats/opentelemetry/e2e_test.go +++ b/stats/opentelemetry/e2e_test.go @@ -20,6 +20,7 @@ import ( "context" "fmt" "io" + "slices" "testing" "time" @@ -34,6 +35,8 @@ import ( v3routepb "github.com/envoyproxy/go-control-plane/envoy/config/route/v3" v3clientsideweightedroundrobinpb "github.com/envoyproxy/go-control-plane/envoy/extensions/load_balancing_policies/client_side_weighted_round_robin/v3" v3wrrlocalitypb "github.com/envoyproxy/go-control-plane/envoy/extensions/load_balancing_policies/wrr_locality/v3" + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/durationpb" "google.golang.org/protobuf/types/known/wrapperspb" @@ -82,6 +85,13 @@ type traceSpanInfo struct { attributes []attribute.KeyValue } +// traceSpanInfoMapKey is the key struct for constructing a map of trace spans +// retrievable by span name and span kind +type traceSpanInfoMapKey struct { + spanName string + spanKind string +} + // defaultMetricsOptions creates default metrics options func defaultMetricsOptions(_ *testing.T, methodAttributeFilter func(string) bool) (*opentelemetry.MetricsOptions, *metric.ManualReader) { reader := metric.NewManualReader() @@ -143,6 +153,163 @@ func setupStubServer(t *testing.T, metricsOptions *opentelemetry.MetricsOptions, return ss } +// waitForTraceSpans waits until the in-memory span exporter has received the +// expected trace spans based on span name and kind. It polls the exporter at a +// short interval until the desired spans are available or the context is +// cancelled. +// +// Returns the collected spans or an error if the context deadline is exceeded +// before the expected spans are exported. +func waitForTraceSpans(ctx context.Context, exporter *tracetest.InMemoryExporter, wantSpans []traceSpanInfo) (tracetest.SpanStubs, error) { + for ; ctx.Err() == nil; <-time.After(time.Millisecond) { + spans := exporter.GetSpans() + missingAnySpan := false + for _, wantSpan := range wantSpans { + if !slices.ContainsFunc(spans, func(span tracetest.SpanStub) bool { + return span.Name == wantSpan.name && span.SpanKind.String() == wantSpan.spanKind + }) { + missingAnySpan = true + } + } + if !missingAnySpan { + return spans, nil + } + } + return nil, fmt.Errorf("error waiting for complete trace spans %v: %v", wantSpans, ctx.Err()) +} + +// validateTraces first first groups the received spans by their TraceID. For +// each trace group, it identifies the client, server, and attempt spans for +// both unary and streaming RPCs. It checks that the expected spans are +// present and that the server spans have the correct parent (attempt span). +// Finally, it compares the content of each span (name, kind, attributes, +// events) against the provided expected spans information. +func validateTraces(t *testing.T, spans tracetest.SpanStubs, wantSpanInfos []traceSpanInfo) { + // Group spans by TraceID. + traceSpans := make(map[oteltrace.TraceID][]tracetest.SpanStub) + for _, span := range spans { + traceID := span.SpanContext.TraceID() + traceSpans[traceID] = append(traceSpans[traceID], span) + } + + // For each trace group, verify relationships and content. + for traceID, spans := range traceSpans { + var unaryClient, unaryServer, unaryAttempt *tracetest.SpanStub + var streamClient, streamServer, streamAttempt *tracetest.SpanStub + var isUnary, isStream bool + + for _, span := range spans { + switch { + case span.Name == "grpc.testing.TestService.UnaryCall": + isUnary = true + if span.SpanKind == oteltrace.SpanKindClient { + unaryClient = &span + } else { + unaryServer = &span + } + case span.Name == "Attempt.grpc.testing.TestService.UnaryCall": + isUnary = true + unaryAttempt = &span + case span.Name == "grpc.testing.TestService.FullDuplexCall": + isStream = true + if span.SpanKind == oteltrace.SpanKindClient { + streamClient = &span + } else { + streamServer = &span + } + case span.Name == "Attempt.grpc.testing.TestService.FullDuplexCall": + isStream = true + streamAttempt = &span + } + } + + if isUnary { + // Verify Unary Call Spans. + if unaryClient == nil { + t.Error("Unary call client span not found") + } + if unaryServer == nil { + t.Error("Unary call server span not found") + } + if unaryAttempt == nil { + t.Error("Unary call attempt span not found") + } + // Check TraceID consistency. + if unaryClient != nil && unaryClient.SpanContext.TraceID() != traceID || unaryServer.SpanContext.TraceID() != traceID { + t.Error("Unary call spans have inconsistent TraceIDs") + } + // Check parent-child relationship via SpanID. + if unaryServer != nil && unaryServer.Parent.SpanID() != unaryAttempt.SpanContext.SpanID() { + t.Error("Unary server span parent does not match attempt span ID") + } + } + + if isStream { + // Verify Streaming Call Spans. + if streamClient == nil { + t.Error("Streaming call client span not found") + } + if streamServer == nil { + t.Error("Streaming call server span not found") + } + if streamAttempt == nil { + t.Error("Streaming call attempt span not found") + } + // Check TraceID consistency. + if streamClient != nil && streamClient.SpanContext.TraceID() != traceID || streamServer.SpanContext.TraceID() != traceID { + t.Error("Streaming call spans have inconsistent TraceIDs") + } + if streamServer != nil && streamServer.Parent.SpanID() != streamAttempt.SpanContext.SpanID() { + t.Error("Streaming server span parent does not match attempt span ID") + } + } + } + + // Constructs a map from a slice of traceSpanInfo to retrieve the + // corresponding expected span info based on span name and span kind + // for comparison. + wantSpanInfosMap := make(map[traceSpanInfoMapKey]traceSpanInfo) + for _, info := range wantSpanInfos { + key := traceSpanInfoMapKey{spanName: info.name, spanKind: info.spanKind} + wantSpanInfosMap[key] = info + } + + // Compare retrieved spans with expected spans. + for _, span := range spans { + // Check that the attempt span has the correct status. + if got, want := span.Status.Code, otelcodes.Ok; got != want { + t.Errorf("Got status code %v, want %v", got, want) + } + + // Retrieve the corresponding expected span info based on span name and + // span kind to compare. + want, ok := wantSpanInfosMap[traceSpanInfoMapKey{spanName: span.Name, spanKind: span.SpanKind.String()}] + if !ok { + t.Errorf("Unexpected span: %v", span) + continue + } + + // comparers + attributesSort := cmpopts.SortSlices(func(a, b attribute.KeyValue) bool { + return a.Key < b.Key + }) + attributesValueComparable := cmpopts.EquateComparable(attribute.KeyValue{}.Value) + eventsSort := cmpopts.SortSlices(func(a, b trace.Event) bool { + return a.Name < b.Name + }) + eventsTimeIgnore := cmpopts.IgnoreFields(trace.Event{}, "Time") + + // attributes + if diff := cmp.Diff(want.attributes, span.Attributes, attributesSort, attributesValueComparable); diff != "" { + t.Errorf("Attributes mismatch for span %s (-want +got):\n%s", span.Name, diff) + } + // events + if diff := cmp.Diff(want.events, span.Events, eventsSort, attributesSort, attributesValueComparable, eventsTimeIgnore); diff != "" { + t.Errorf("Events mismatch for span %s (-want +got):\n%s", span.Name, diff) + } + } +} + // TestMethodAttributeFilter tests the method attribute filter. The method // filter set should bucket the grpc.method attribute into "other" if the method // attribute filter specifies. @@ -224,7 +391,8 @@ func (s) TestMethodAttributeFilter(t *testing.T) { }, } - testutils.CompareMetrics(ctx, t, reader, gotMetrics, wantMetrics) + gotMetrics = testutils.WaitForServerMetrics(ctx, t, reader, gotMetrics, wantMetrics) + testutils.CompareMetrics(t, gotMetrics, wantMetrics) } // TestAllMetricsOneFunction tests emitted metrics from OpenTelemetry @@ -274,7 +442,8 @@ func (s) TestAllMetricsOneFunction(t *testing.T) { Target: ss.Target, UnaryCompressedMessageSize: float64(57), }) - testutils.CompareMetrics(ctx, t, reader, gotMetrics, wantMetrics) + gotMetrics = testutils.WaitForServerMetrics(ctx, t, reader, gotMetrics, wantMetrics) + testutils.CompareMetrics(t, gotMetrics, wantMetrics) stream, err = ss.Client.FullDuplexCall(ctx) if err != nil { @@ -675,26 +844,21 @@ func (s) TestMetricsAndTracesOptionEnabled(t *testing.T) { Target: ss.Target, UnaryCompressedMessageSize: float64(57), }) - testutils.CompareMetrics(ctx, t, reader, gotMetrics, wantMetrics) + gotMetrics = testutils.WaitForServerMetrics(ctx, t, reader, gotMetrics, wantMetrics) + testutils.CompareMetrics(t, gotMetrics, wantMetrics) - // Verify traces - spans := exporter.GetSpans() - if got, want := len(spans), 6; got != want { - t.Fatalf("got %d spans, want %d", got, want) - } - - wantSI := []traceSpanInfo{ + wantSpanInfos := []traceSpanInfo{ { name: "grpc.testing.TestService.UnaryCall", spanKind: oteltrace.SpanKindServer.String(), attributes: []attribute.KeyValue{ { Key: "Client", - Value: attribute.IntValue(0), + Value: attribute.BoolValue(false), }, { Key: "FailFast", - Value: attribute.IntValue(0), + Value: attribute.BoolValue(false), }, { Key: "previous-rpc-attempts", @@ -702,7 +866,7 @@ func (s) TestMetricsAndTracesOptionEnabled(t *testing.T) { }, { Key: "transparent-retry", - Value: attribute.IntValue(0), + Value: attribute.BoolValue(false), }, }, events: []trace.Event{ @@ -748,11 +912,11 @@ func (s) TestMetricsAndTracesOptionEnabled(t *testing.T) { attributes: []attribute.KeyValue{ { Key: "Client", - Value: attribute.IntValue(1), + Value: attribute.BoolValue(true), }, { Key: "FailFast", - Value: attribute.IntValue(1), + Value: attribute.BoolValue(true), }, { Key: "previous-rpc-attempts", @@ -760,7 +924,7 @@ func (s) TestMetricsAndTracesOptionEnabled(t *testing.T) { }, { Key: "transparent-retry", - Value: attribute.IntValue(0), + Value: attribute.BoolValue(false), }, }, events: []trace.Event{ @@ -803,8 +967,8 @@ func (s) TestMetricsAndTracesOptionEnabled(t *testing.T) { { name: "grpc.testing.TestService.UnaryCall", spanKind: oteltrace.SpanKindClient.String(), - attributes: []attribute.KeyValue{}, - events: []trace.Event{}, + attributes: nil, + events: nil, }, { name: "grpc.testing.TestService.FullDuplexCall", @@ -812,11 +976,11 @@ func (s) TestMetricsAndTracesOptionEnabled(t *testing.T) { attributes: []attribute.KeyValue{ { Key: "Client", - Value: attribute.IntValue(0), + Value: attribute.BoolValue(false), }, { Key: "FailFast", - Value: attribute.IntValue(0), + Value: attribute.BoolValue(false), }, { Key: "previous-rpc-attempts", @@ -824,16 +988,16 @@ func (s) TestMetricsAndTracesOptionEnabled(t *testing.T) { }, { Key: "transparent-retry", - Value: attribute.IntValue(0), + Value: attribute.BoolValue(false), }, }, - events: []trace.Event{}, + events: nil, }, { name: "grpc.testing.TestService.FullDuplexCall", spanKind: oteltrace.SpanKindClient.String(), - attributes: []attribute.KeyValue{}, - events: []trace.Event{}, + attributes: nil, + events: nil, }, { name: "Attempt.grpc.testing.TestService.FullDuplexCall", @@ -841,11 +1005,11 @@ func (s) TestMetricsAndTracesOptionEnabled(t *testing.T) { attributes: []attribute.KeyValue{ { Key: "Client", - Value: attribute.IntValue(1), + Value: attribute.BoolValue(true), }, { Key: "FailFast", - Value: attribute.IntValue(1), + Value: attribute.BoolValue(true), }, { Key: "previous-rpc-attempts", @@ -853,73 +1017,18 @@ func (s) TestMetricsAndTracesOptionEnabled(t *testing.T) { }, { Key: "transparent-retry", - Value: attribute.IntValue(0), + Value: attribute.BoolValue(false), }, }, - events: []trace.Event{}, + events: nil, }, } - // Check that same traceID is used in client and server for unary RPC call. - if got, want := spans[0].SpanContext.TraceID(), spans[2].SpanContext.TraceID(); got != want { - t.Fatal("TraceID mismatch in client span and server span.") - } - // Check that the attempt span id of client matches the span id of server - // SpanContext. - if got, want := spans[0].Parent.SpanID(), spans[1].SpanContext.SpanID(); got != want { - t.Fatal("SpanID mismatch in client span and server span.") - } - - // Check that same traceID is used in client and server for streaming RPC call. - if got, want := spans[3].SpanContext.TraceID(), spans[4].SpanContext.TraceID(); got != want { - t.Fatal("TraceID mismatch in client span and server span.") - } - // Check that the attempt span id of client matches the span id of server - // SpanContext. - if got, want := spans[3].Parent.SpanID(), spans[5].SpanContext.SpanID(); got != want { - t.Fatal("SpanID mismatch in client span and server span.") - } - - for index, span := range spans { - // Check that the attempt span has the correct status - if got, want := spans[index].Status.Code, otelcodes.Ok; got != want { - t.Errorf("Got status code %v, want %v", got, want) - } - // name - if got, want := span.Name, wantSI[index].name; got != want { - t.Errorf("Span name is %q, want %q", got, want) - } - // spanKind - if got, want := span.SpanKind.String(), wantSI[index].spanKind; got != want { - t.Errorf("Got span kind %q, want %q", got, want) - } - // attributes - if got, want := len(span.Attributes), len(wantSI[index].attributes); got != want { - t.Errorf("Got attributes list of size %q, want %q", got, want) - } - for idx, att := range span.Attributes { - if got, want := att.Key, wantSI[index].attributes[idx].Key; got != want { - t.Errorf("Got attribute key for span name %v as %v, want %v", span.Name, got, want) - } - } - // events - if got, want := len(span.Events), len(wantSI[index].events); got != want { - t.Errorf("Event length is %q, want %q", got, want) - } - for eventIdx, event := range span.Events { - if got, want := event.Name, wantSI[index].events[eventIdx].Name; got != want { - t.Errorf("Got event name for span name %q as %q, want %q", span.Name, got, want) - } - for idx, att := range event.Attributes { - if got, want := att.Key, wantSI[index].events[eventIdx].Attributes[idx].Key; got != want { - t.Errorf("Got attribute key for span name %q with event name %v, as %v, want %v", span.Name, event.Name, got, want) - } - if got, want := att.Value, wantSI[index].events[eventIdx].Attributes[idx].Value; got != want { - t.Errorf("Got attribute value for span name %v with event name %v, as %v, want %v", span.Name, event.Name, got, want) - } - } - } + spans, err := waitForTraceSpans(ctx, exporter, wantSpanInfos) + if err != nil { + t.Fatal(err) } + validateTraces(t, spans, wantSpanInfos) } // TestSpan verifies that the gRPC Trace Binary propagator correctly @@ -937,7 +1046,7 @@ func (s) TestMetricsAndTracesOptionEnabled(t *testing.T) { func (s) TestSpan(t *testing.T) { mo, _ := defaultMetricsOptions(t, nil) // Using defaultTraceOptions to set up OpenTelemetry with an in-memory exporter. - to, spanExporter := defaultTraceOptions(t) + to, exporter := defaultTraceOptions(t) // Start the server with trace options. ss := setupStubServer(t, mo, to) defer ss.Stop() @@ -962,24 +1071,18 @@ func (s) TestSpan(t *testing.T) { t.Fatalf("stream.Recv received an unexpected error: %v, expected an EOF error", err) } - // Get the spans from the exporter - spans := spanExporter.GetSpans() - if got, want := len(spans), 6; got != want { - t.Fatalf("got %d spans, want %d", got, want) - } - - wantSI := []traceSpanInfo{ + wantSpanInfos := []traceSpanInfo{ { name: "grpc.testing.TestService.UnaryCall", spanKind: oteltrace.SpanKindServer.String(), attributes: []attribute.KeyValue{ { Key: "Client", - Value: attribute.IntValue(0), + Value: attribute.BoolValue(false), }, { Key: "FailFast", - Value: attribute.IntValue(0), + Value: attribute.BoolValue(false), }, { Key: "previous-rpc-attempts", @@ -987,7 +1090,7 @@ func (s) TestSpan(t *testing.T) { }, { Key: "transparent-retry", - Value: attribute.IntValue(0), + Value: attribute.BoolValue(false), }, }, events: []trace.Event{ @@ -1033,11 +1136,11 @@ func (s) TestSpan(t *testing.T) { attributes: []attribute.KeyValue{ { Key: "Client", - Value: attribute.IntValue(1), + Value: attribute.BoolValue(true), }, { Key: "FailFast", - Value: attribute.IntValue(1), + Value: attribute.BoolValue(true), }, { Key: "previous-rpc-attempts", @@ -1045,7 +1148,7 @@ func (s) TestSpan(t *testing.T) { }, { Key: "transparent-retry", - Value: attribute.IntValue(0), + Value: attribute.BoolValue(false), }, }, events: []trace.Event{ @@ -1088,8 +1191,8 @@ func (s) TestSpan(t *testing.T) { { name: "grpc.testing.TestService.UnaryCall", spanKind: oteltrace.SpanKindClient.String(), - attributes: []attribute.KeyValue{}, - events: []trace.Event{}, + attributes: nil, + events: nil, }, { name: "grpc.testing.TestService.FullDuplexCall", @@ -1097,11 +1200,11 @@ func (s) TestSpan(t *testing.T) { attributes: []attribute.KeyValue{ { Key: "Client", - Value: attribute.IntValue(0), + Value: attribute.BoolValue(false), }, { Key: "FailFast", - Value: attribute.IntValue(0), + Value: attribute.BoolValue(false), }, { Key: "previous-rpc-attempts", @@ -1109,16 +1212,16 @@ func (s) TestSpan(t *testing.T) { }, { Key: "transparent-retry", - Value: attribute.IntValue(0), + Value: attribute.BoolValue(false), }, }, - events: []trace.Event{}, + events: nil, }, { name: "grpc.testing.TestService.FullDuplexCall", spanKind: oteltrace.SpanKindClient.String(), - attributes: []attribute.KeyValue{}, - events: []trace.Event{}, + attributes: nil, + events: nil, }, { name: "Attempt.grpc.testing.TestService.FullDuplexCall", @@ -1126,11 +1229,11 @@ func (s) TestSpan(t *testing.T) { attributes: []attribute.KeyValue{ { Key: "Client", - Value: attribute.IntValue(1), + Value: attribute.BoolValue(true), }, { Key: "FailFast", - Value: attribute.IntValue(1), + Value: attribute.BoolValue(true), }, { Key: "previous-rpc-attempts", @@ -1138,73 +1241,18 @@ func (s) TestSpan(t *testing.T) { }, { Key: "transparent-retry", - Value: attribute.IntValue(0), + Value: attribute.BoolValue(false), }, }, - events: []trace.Event{}, + events: nil, }, } - // Check that same traceID is used in client and server for unary RPC call. - if got, want := spans[0].SpanContext.TraceID(), spans[2].SpanContext.TraceID(); got != want { - t.Fatal("TraceID mismatch in client span and server span.") - } - // Check that the attempt span id of client matches the span id of server - // SpanContext. - if got, want := spans[0].Parent.SpanID(), spans[1].SpanContext.SpanID(); got != want { - t.Fatal("SpanID mismatch in client span and server span.") - } - - // Check that same traceID is used in client and server for streaming RPC call. - if got, want := spans[3].SpanContext.TraceID(), spans[4].SpanContext.TraceID(); got != want { - t.Fatal("TraceID mismatch in client span and server span.") - } - // Check that the attempt span id of client matches the span id of server - // SpanContext. - if got, want := spans[3].Parent.SpanID(), spans[5].SpanContext.SpanID(); got != want { - t.Fatal("SpanID mismatch in client span and server span.") - } - - for index, span := range spans { - // Check that the attempt span has the correct status - if got, want := spans[index].Status.Code, otelcodes.Ok; got != want { - t.Errorf("Got status code %v, want %v", got, want) - } - // name - if got, want := span.Name, wantSI[index].name; got != want { - t.Errorf("Span name is %q, want %q", got, want) - } - // spanKind - if got, want := span.SpanKind.String(), wantSI[index].spanKind; got != want { - t.Errorf("Got span kind %q, want %q", got, want) - } - // attributes - if got, want := len(span.Attributes), len(wantSI[index].attributes); got != want { - t.Errorf("Got attributes list of size %q, want %q", got, want) - } - for idx, att := range span.Attributes { - if got, want := att.Key, wantSI[index].attributes[idx].Key; got != want { - t.Errorf("Got attribute key for span name %v as %v, want %v", span.Name, got, want) - } - } - // events - if got, want := len(span.Events), len(wantSI[index].events); got != want { - t.Errorf("Event length is %q, want %q", got, want) - } - for eventIdx, event := range span.Events { - if got, want := event.Name, wantSI[index].events[eventIdx].Name; got != want { - t.Errorf("Got event name for span name %q as %q, want %q", span.Name, got, want) - } - for idx, att := range event.Attributes { - if got, want := att.Key, wantSI[index].events[eventIdx].Attributes[idx].Key; got != want { - t.Errorf("Got attribute key for span name %q with event name %v, as %v, want %v", span.Name, event.Name, got, want) - } - if got, want := att.Value, wantSI[index].events[eventIdx].Attributes[idx].Value; got != want { - t.Errorf("Got attribute value for span name %v with event name %v, as %v, want %v", span.Name, event.Name, got, want) - } - } - } + spans, err := waitForTraceSpans(ctx, exporter, wantSpanInfos) + if err != nil { + t.Fatal(err) } + validateTraces(t, spans, wantSpanInfos) } // TestSpan_WithW3CContextPropagator sets up a stub server with OpenTelemetry tracing @@ -1221,7 +1269,7 @@ func (s) TestSpan(t *testing.T) { func (s) TestSpan_WithW3CContextPropagator(t *testing.T) { mo, _ := defaultMetricsOptions(t, nil) // Using defaultTraceOptions to set up OpenTelemetry with an in-memory exporter - to, spanExporter := defaultTraceOptions(t) + to, exporter := defaultTraceOptions(t) // Set the W3CContextPropagator as part of TracingOptions. to.TextMapPropagator = propagation.NewCompositeTextMapPropagator(propagation.TraceContext{}) // Start the server with OpenTelemetry options @@ -1248,24 +1296,19 @@ func (s) TestSpan_WithW3CContextPropagator(t *testing.T) { if _, err = stream.Recv(); err != io.EOF { t.Fatalf("stream.Recv received an unexpected error: %v, expected an EOF error", err) } - // Get the spans from the exporter - spans := spanExporter.GetSpans() - if got, want := len(spans), 6; got != want { - t.Fatalf("Got %d spans, want %d", got, want) - } - wantSI := []traceSpanInfo{ + wantSpanInfos := []traceSpanInfo{ { name: "grpc.testing.TestService.UnaryCall", spanKind: oteltrace.SpanKindServer.String(), attributes: []attribute.KeyValue{ { Key: "Client", - Value: attribute.IntValue(0), + Value: attribute.BoolValue(false), }, { Key: "FailFast", - Value: attribute.IntValue(0), + Value: attribute.BoolValue(false), }, { Key: "previous-rpc-attempts", @@ -1273,7 +1316,7 @@ func (s) TestSpan_WithW3CContextPropagator(t *testing.T) { }, { Key: "transparent-retry", - Value: attribute.IntValue(0), + Value: attribute.BoolValue(false), }, }, events: []trace.Event{ @@ -1319,11 +1362,11 @@ func (s) TestSpan_WithW3CContextPropagator(t *testing.T) { attributes: []attribute.KeyValue{ { Key: "Client", - Value: attribute.IntValue(1), + Value: attribute.BoolValue(true), }, { Key: "FailFast", - Value: attribute.IntValue(1), + Value: attribute.BoolValue(true), }, { Key: "previous-rpc-attempts", @@ -1331,7 +1374,7 @@ func (s) TestSpan_WithW3CContextPropagator(t *testing.T) { }, { Key: "transparent-retry", - Value: attribute.IntValue(0), + Value: attribute.BoolValue(false), }, }, events: []trace.Event{ @@ -1374,8 +1417,8 @@ func (s) TestSpan_WithW3CContextPropagator(t *testing.T) { { name: "grpc.testing.TestService.UnaryCall", spanKind: oteltrace.SpanKindClient.String(), - attributes: []attribute.KeyValue{}, - events: []trace.Event{}, + attributes: nil, + events: nil, }, { name: "grpc.testing.TestService.FullDuplexCall", @@ -1383,11 +1426,11 @@ func (s) TestSpan_WithW3CContextPropagator(t *testing.T) { attributes: []attribute.KeyValue{ { Key: "Client", - Value: attribute.IntValue(0), + Value: attribute.BoolValue(false), }, { Key: "FailFast", - Value: attribute.IntValue(0), + Value: attribute.BoolValue(false), }, { Key: "previous-rpc-attempts", @@ -1395,16 +1438,16 @@ func (s) TestSpan_WithW3CContextPropagator(t *testing.T) { }, { Key: "transparent-retry", - Value: attribute.IntValue(0), + Value: attribute.BoolValue(false), }, }, - events: []trace.Event{}, + events: nil, }, { name: "grpc.testing.TestService.FullDuplexCall", spanKind: oteltrace.SpanKindClient.String(), - attributes: []attribute.KeyValue{}, - events: []trace.Event{}, + attributes: nil, + events: nil, }, { name: "Attempt.grpc.testing.TestService.FullDuplexCall", @@ -1412,11 +1455,11 @@ func (s) TestSpan_WithW3CContextPropagator(t *testing.T) { attributes: []attribute.KeyValue{ { Key: "Client", - Value: attribute.IntValue(1), + Value: attribute.BoolValue(true), }, { Key: "FailFast", - Value: attribute.IntValue(1), + Value: attribute.BoolValue(true), }, { Key: "previous-rpc-attempts", @@ -1424,72 +1467,18 @@ func (s) TestSpan_WithW3CContextPropagator(t *testing.T) { }, { Key: "transparent-retry", - Value: attribute.IntValue(0), + Value: attribute.BoolValue(false), }, }, - events: []trace.Event{}, + events: nil, }, } - // Check that same traceID is used in client and server. - if got, want := spans[0].SpanContext.TraceID(), spans[2].SpanContext.TraceID(); got != want { - t.Fatal("TraceID mismatch in client span and server span.") - } - // Check that the attempt span id of client matches the span id of server - // SpanContext. - if got, want := spans[0].Parent.SpanID(), spans[1].SpanContext.SpanID(); got != want { - t.Fatal("SpanID mismatch in client span and server span.") - } - - // Check that same traceID is used in client and server. - if got, want := spans[3].SpanContext.TraceID(), spans[4].SpanContext.TraceID(); got != want { - t.Fatal("TraceID mismatch in client span and server span.") - } - // Check that the attempt span id of client matches the span id of server - // SpanContext. - if got, want := spans[3].Parent.SpanID(), spans[5].SpanContext.SpanID(); got != want { - t.Fatal("SpanID mismatch in client span and server span.") - } - for index, span := range spans { - // Check that the attempt span has the correct status - if got, want := spans[index].Status.Code, otelcodes.Ok; got != want { - t.Errorf("Got status code %v, want %v", got, want) - } - // name - if got, want := span.Name, wantSI[index].name; got != want { - t.Errorf("Span name is %q, want %q", got, want) - } - // spanKind - if got, want := span.SpanKind.String(), wantSI[index].spanKind; got != want { - t.Errorf("Got span kind %q, want %q", got, want) - } - // attributes - if got, want := len(span.Attributes), len(wantSI[index].attributes); got != want { - t.Errorf("Got attributes list of size %q, want %q", got, want) - } - for idx, att := range span.Attributes { - if got, want := att.Key, wantSI[index].attributes[idx].Key; got != want { - t.Errorf("Got attribute key for span name %v as %v, want %v", span.Name, got, want) - } - } - // events - if got, want := len(span.Events), len(wantSI[index].events); got != want { - t.Errorf("Event length is %q, want %q", got, want) - } - for eventIdx, event := range span.Events { - if got, want := event.Name, wantSI[index].events[eventIdx].Name; got != want { - t.Errorf("Got event name for span name %q as %q, want %q", span.Name, got, want) - } - for idx, att := range event.Attributes { - if got, want := att.Key, wantSI[index].events[eventIdx].Attributes[idx].Key; got != want { - t.Errorf("Got attribute key for span name %q with event name %v, as %v, want %v", span.Name, event.Name, got, want) - } - if got, want := att.Value, wantSI[index].events[eventIdx].Attributes[idx].Value; got != want { - t.Errorf("Got attribute value for span name %v with event name %v, as %v, want %v", span.Name, event.Name, got, want) - } - } - } + spans, err := waitForTraceSpans(ctx, exporter, wantSpanInfos) + if err != nil { + t.Fatal(err) } + validateTraces(t, spans, wantSpanInfos) } // TestMetricsAndTracesDisabled verifies that RPCs call succeed as expected @@ -1568,13 +1557,11 @@ func (s) TestRPCSpanErrorStatus(t *testing.T) { Body: make([]byte, 10000), }}) - // Verify traces - spans := exporter.GetSpans() - if got, want := len(spans), 3; got != want { - t.Fatalf("got %d spans, want %d", got, want) - } - // Verify spans has error status with rpcErrorMsg as error message. + for ; len(exporter.GetSpans()) == 0 && ctx.Err() == nil; <-time.After(time.Millisecond) { + // wait until trace spans are collected + } + spans := exporter.GetSpans() if got, want := spans[0].Status.Description, rpcErrorMsg; got != want { t.Fatalf("got rpc error %s, want %s", spans[0].Status.Description, rpcErrorMsg) } diff --git a/stats/opentelemetry/internal/testutils/testutils.go b/stats/opentelemetry/internal/testutils/testutils.go index 721ce00cd347..6128b421829e 100644 --- a/stats/opentelemetry/internal/testutils/testutils.go +++ b/stats/opentelemetry/internal/testutils/testutils.go @@ -20,6 +20,7 @@ package testutils import ( "context" "fmt" + "slices" "testing" "time" @@ -45,7 +46,7 @@ var ( // // Returns a new gotMetrics map containing the metric data being polled for, or // an error if failed to wait for metric. -func waitForServerCompletedRPCs(ctx context.Context, t *testing.T, reader metric.Reader, wantMetric metricdata.Metrics) (map[string]metricdata.Metrics, error) { +func waitForServerCompletedRPCs(ctx context.Context, reader metric.Reader, wantMetric metricdata.Metrics) (map[string]metricdata.Metrics, error) { for ; ctx.Err() == nil; <-time.After(time.Millisecond) { rm := &metricdata.ResourceMetrics{} reader.Collect(ctx, rm) @@ -59,7 +60,16 @@ func waitForServerCompletedRPCs(ctx context.Context, t *testing.T, reader metric if !ok { continue } - metricdatatest.AssertEqual(t, wantMetric, val, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreExemplars()) + switch data := val.Data.(type) { + case metricdata.Histogram[int64]: + if len(wantMetric.Data.(metricdata.Histogram[int64]).DataPoints) > len(data.DataPoints) { + continue + } + case metricdata.Histogram[float64]: + if len(wantMetric.Data.(metricdata.Histogram[float64]).DataPoints) > len(data.DataPoints) { + continue + } + } return gotMetrics, nil } return nil, fmt.Errorf("error waiting for metric %v: %v", wantMetric, ctx.Err()) @@ -755,19 +765,21 @@ func MetricData(options MetricDataOptions) []metricdata.Metrics { } } -// CompareMetrics asserts wantMetrics are what we expect. It polls for eventual -// server metrics (not emitted synchronously with client side rpc returning), -// and for duration metrics makes sure the data point is within possible testing -// time (five seconds from context timeout). -func CompareMetrics(ctx context.Context, t *testing.T, mr *metric.ManualReader, gotMetrics map[string]metricdata.Metrics, wantMetrics []metricdata.Metrics) { +// CompareMetrics asserts wantMetrics are what we expect. For duration metrics +// makes sure the data point is within possible testing time (five seconds from +// context timeout). +func CompareMetrics(t *testing.T, gotMetrics map[string]metricdata.Metrics, wantMetrics []metricdata.Metrics) { for _, metric := range wantMetrics { + val, ok := gotMetrics[metric.Name] + if !ok { + t.Errorf("Metric %v not present in recorded metrics", metric.Name) + continue + } + if metric.Name == "grpc.server.call.sent_total_compressed_message_size" || metric.Name == "grpc.server.call.rcvd_total_compressed_message_size" { - // Sync the metric reader to see the event because stats.End is - // handled async server side. Thus, poll until metrics created from - // stats.End show up. - var err error - if gotMetrics, err = waitForServerCompletedRPCs(ctx, t, mr, metric); err != nil { // move to shared helper - t.Fatal(err) + val := gotMetrics[metric.Name] + if !metricdatatest.AssertEqual(t, metric, val, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreExemplars()) { + t.Errorf("Metrics data type not equal for metric: %v", metric.Name) } continue } @@ -775,22 +787,45 @@ func CompareMetrics(ctx context.Context, t *testing.T, mr *metric.ManualReader, // If one of the duration metrics, ignore the bucket counts, and make // sure it count falls within a bucket <= 5 seconds (maximum duration of // test due to context). - val, ok := gotMetrics[metric.Name] - if !ok { - t.Fatalf("Metric %v not present in recorded metrics", metric.Name) - } if metric.Name == "grpc.client.attempt.duration" || metric.Name == "grpc.client.call.duration" || metric.Name == "grpc.server.call.duration" { + val := gotMetrics[metric.Name] if !metricdatatest.AssertEqual(t, metric, val, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreExemplars(), metricdatatest.IgnoreValue()) { - t.Fatalf("Metrics data type not equal for metric: %v", metric.Name) + t.Errorf("Metrics data type not equal for metric: %v", metric.Name) } if err := checkDataPointWithinFiveSeconds(val); err != nil { - t.Fatalf("Data point not within five seconds for metric %v: %v", metric.Name, err) + t.Errorf("Data point not within five seconds for metric %v: %v", metric.Name, err) } continue } if !metricdatatest.AssertEqual(t, metric, val, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreExemplars()) { - t.Fatalf("Metrics data type not equal for metric: %v", metric.Name) + t.Errorf("Metrics data type not equal for metric: %v", metric.Name) + } + } +} + +// WaitForServerMetrics waits for eventual server metrics (not emitted +// synchronously with client side rpc returning). +func WaitForServerMetrics(ctx context.Context, t *testing.T, mr *metric.ManualReader, gotMetrics map[string]metricdata.Metrics, wantMetrics []metricdata.Metrics) map[string]metricdata.Metrics { + terminalMetrics := []string{ + "grpc.server.call.sent_total_compressed_message_size", + "grpc.server.call.rcvd_total_compressed_message_size", + "grpc.client.attempt.duration", + "grpc.client.call.duration", + "grpc.server.call.duration", + } + for _, metric := range wantMetrics { + if !slices.Contains(terminalMetrics, metric.Name) { + continue + } + // Sync the metric reader to see the event because stats.End is + // handled async server side. Thus, poll until metrics created from + // stats.End show up. + var err error + if gotMetrics, err = waitForServerCompletedRPCs(ctx, mr, metric); err != nil { // move to shared helper + t.Fatal(err) } } + + return gotMetrics } diff --git a/stats/opentelemetry/trace.go b/stats/opentelemetry/trace.go index cd5c23cd3b23..71babd1ac0b8 100644 --- a/stats/opentelemetry/trace.go +++ b/stats/opentelemetry/trace.go @@ -46,7 +46,7 @@ func populateSpan(rs stats.RPCStats, ai *attemptInfo) { // correctness. span.SetAttributes( attribute.Bool("Client", rs.Client), - attribute.Bool("FailFast", rs.Client), + attribute.Bool("FailFast", rs.FailFast), attribute.Int64("previous-rpc-attempts", int64(ai.previousRPCAttempts)), attribute.Bool("transparent-retry", rs.IsTransparentRetryAttempt), )