Skip to content

Commit

Permalink
adds otels tracer span to span reader
Browse files Browse the repository at this point in the history
Signed-off-by: Afzal <[email protected]>
  • Loading branch information
afzalbin64 committed Jul 29, 2023
1 parent b36c8a9 commit a56c018
Showing 1 changed file with 44 additions and 36 deletions.
80 changes: 44 additions & 36 deletions plugin/storage/cassandra/spanstore/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ import (
"fmt"
"time"

"github.com/opentracing/opentracing-go"
ottag "github.com/opentracing/opentracing-go/ext"
otlog "github.com/opentracing/opentracing-go/log"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
semconv "go.opentelemetry.io/otel/semconv/v1.4.0"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"

Expand Down Expand Up @@ -158,10 +158,13 @@ func (s *SpanReader) GetOperations(
return s.operationNamesReader(query)
}

func (s *SpanReader) readTrace(ctx context.Context, traceID dbmodel.TraceID) (*model.Trace, error) {
span, ctx := startSpanForQuery(ctx, "readTrace", querySpanByTraceID)
defer span.Finish()
span.LogFields(otlog.String("event", "searching"), otlog.Object("trace_id", traceID))
func (s *SpanReader) readTrace(ctx context.Context, traceID dbmodel.TraceID, tracer trace.Tracer) (*model.Trace, error) {
ctx, span := startSpanForQuery(ctx, "readTrace", querySpanByTraceID, tracer)
defer span.End()
span.SetAttributes(
attribute.Key("event").String("searching"),
attribute.Key("trace_id").String(traceID.String()),
)

trace, err := s.readTraceInSpan(ctx, traceID)
logErrorToSpan(span, err)
Expand Down Expand Up @@ -217,7 +220,7 @@ func (s *SpanReader) readTraceInSpan(ctx context.Context, traceID dbmodel.TraceI

// GetTrace takes a traceID and returns a Trace associated with that traceID
func (s *SpanReader) GetTrace(ctx context.Context, traceID model.TraceID) (*model.Trace, error) {
return s.readTrace(ctx, dbmodel.TraceIDFromDomain(traceID))
return s.readTrace(ctx, dbmodel.TraceIDFromDomain(traceID), s.tracer)
}

func validateQuery(p *spanstore.TraceQueryParameters) error {
Expand Down Expand Up @@ -295,7 +298,7 @@ func (s *SpanReader) findTraceIDs(ctx context.Context, traceQuery *spanstore.Tra
return nil, err
}
if len(traceQuery.Tags) > 0 {
tagTraceIds, err := s.queryByTagsAndLogs(ctx, traceQuery)
tagTraceIds, err := s.queryByTagsAndLogs(ctx, traceQuery, s.tracer)
if err != nil {
return nil, err
}
Expand All @@ -307,19 +310,22 @@ func (s *SpanReader) findTraceIDs(ctx context.Context, traceQuery *spanstore.Tra
return traceIds, nil
}
if len(traceQuery.Tags) > 0 {
return s.queryByTagsAndLogs(ctx, traceQuery)
return s.queryByTagsAndLogs(ctx, traceQuery, s.tracer)
}
return s.queryByService(ctx, traceQuery)
}

func (s *SpanReader) queryByTagsAndLogs(ctx context.Context, tq *spanstore.TraceQueryParameters) (dbmodel.UniqueTraceIDs, error) {
span, ctx := startSpanForQuery(ctx, "queryByTagsAndLogs", queryByTag)
defer span.Finish()
func (s *SpanReader) queryByTagsAndLogs(ctx context.Context, tq *spanstore.TraceQueryParameters, tracer trace.Tracer) (dbmodel.UniqueTraceIDs, error) {
ctx, span := startSpanForQuery(ctx, "queryByTagsAndLogs", queryByTag, tracer)
defer span.End()

results := make([]dbmodel.UniqueTraceIDs, 0, len(tq.Tags))
for k, v := range tq.Tags {
childSpan, _ := opentracing.StartSpanFromContext(ctx, "queryByTag")
childSpan.LogFields(otlog.String("tag.key", k), otlog.String("tag.value", v))
_, childSpan := s.tracer.Start(ctx, "queryByTag")
childSpan.SetAttributes(
attribute.Key("tag.key").String(k),
attribute.Key("tag.value").String(v),
)
query := s.session.Query(
queryByTag,
tq.ServiceName,
Expand All @@ -330,7 +336,7 @@ func (s *SpanReader) queryByTagsAndLogs(ctx context.Context, tq *spanstore.Trace
tq.NumTraces*limitMultiple,
).PageSize(0)
t, err := s.executeQuery(childSpan, query, s.metrics.queryTagIndex)
childSpan.Finish()
defer childSpan.End()
if err != nil {
return nil, err
}
Expand All @@ -340,8 +346,8 @@ func (s *SpanReader) queryByTagsAndLogs(ctx context.Context, tq *spanstore.Trace
}

func (s *SpanReader) queryByDuration(ctx context.Context, traceQuery *spanstore.TraceQueryParameters) (dbmodel.UniqueTraceIDs, error) {
span, ctx := startSpanForQuery(ctx, "queryByDuration", queryByDuration)
defer span.Finish()
ctx, span := startSpanForQuery(ctx, "queryByDuration", queryByDuration, s.tracer)
defer span.End()

results := dbmodel.UniqueTraceIDs{}

Expand All @@ -357,8 +363,8 @@ func (s *SpanReader) queryByDuration(ctx context.Context, traceQuery *spanstore.
endTimeByHour := traceQuery.StartTimeMax.Round(durationBucketSize)

for timeBucket := endTimeByHour; timeBucket.After(startTimeByHour) || timeBucket.Equal(startTimeByHour); timeBucket = timeBucket.Add(-1 * durationBucketSize) {
childSpan, _ := opentracing.StartSpanFromContext(ctx, "queryForTimeBucket")
childSpan.LogFields(otlog.String("timeBucket", timeBucket.String()))
_, childSpan := s.tracer.Start(ctx, "queryForTimeBucket")
childSpan.SetAttributes(attribute.Key("timeBucket").String(timeBucket.String()))
query := s.session.Query(
queryByDuration,
timeBucket,
Expand All @@ -368,7 +374,7 @@ func (s *SpanReader) queryByDuration(ctx context.Context, traceQuery *spanstore.
maxDurationMicros,
traceQuery.NumTraces*limitMultiple)
t, err := s.executeQuery(childSpan, query, s.metrics.queryDurationIndex)
childSpan.Finish()
childSpan.End()
if err != nil {
return nil, err
}
Expand All @@ -384,8 +390,8 @@ func (s *SpanReader) queryByDuration(ctx context.Context, traceQuery *spanstore.
}

func (s *SpanReader) queryByServiceNameAndOperation(ctx context.Context, tq *spanstore.TraceQueryParameters) (dbmodel.UniqueTraceIDs, error) {
span, _ := startSpanForQuery(ctx, "queryByServiceNameAndOperation", queryByServiceAndOperationName)
defer span.Finish()
_, span := startSpanForQuery(ctx, "queryByServiceNameAndOperation", queryByServiceAndOperationName, s.tracer)
defer span.End()
query := s.session.Query(
queryByServiceAndOperationName,
tq.ServiceName,
Expand All @@ -398,8 +404,8 @@ func (s *SpanReader) queryByServiceNameAndOperation(ctx context.Context, tq *spa
}

func (s *SpanReader) queryByService(ctx context.Context, tq *spanstore.TraceQueryParameters) (dbmodel.UniqueTraceIDs, error) {
span, _ := startSpanForQuery(ctx, "queryByService", queryByServiceName)
defer span.Finish()
_, span:= startSpanForQuery(ctx, "queryByService", queryByServiceName, s.tracer)
defer span.End()
query := s.session.Query(
queryByServiceName,
tq.ServiceName,
Expand All @@ -410,7 +416,7 @@ func (s *SpanReader) queryByService(ctx context.Context, tq *spanstore.TraceQuer
return s.executeQuery(span, query, s.metrics.queryServiceNameIndex)
}

func (s *SpanReader) executeQuery(span opentracing.Span, query cassandra.Query, tableMetrics *casMetrics.Table) (dbmodel.UniqueTraceIDs, error) {
func (s *SpanReader) executeQuery(span trace.Span, query cassandra.Query, tableMetrics *casMetrics.Table) (dbmodel.UniqueTraceIDs, error) {
start := time.Now()
i := query.Iter()
retMe := dbmodel.UniqueTraceIDs{}
Expand All @@ -422,25 +428,27 @@ func (s *SpanReader) executeQuery(span opentracing.Span, query cassandra.Query,
tableMetrics.Emit(err, time.Since(start))
if err != nil {
logErrorToSpan(span, err)
span.LogFields(otlog.String("query", query.String()))
span.SetAttributes(attribute.Key("query").String(query.String()))
s.logger.Error("Failed to exec query", zap.Error(err), zap.String("query", query.String()))
return nil, err
}
return retMe, nil
}

func startSpanForQuery(ctx context.Context, name, query string) (opentracing.Span, context.Context) {
span, ctx := opentracing.StartSpanFromContext(ctx, name)
ottag.DBStatement.Set(span, query)
ottag.DBType.Set(span, "cassandra")
ottag.Component.Set(span, "gocql")
return span, ctx
func startSpanForQuery(ctx context.Context, name, query string, tp trace.Tracer) (context.Context, trace.Span) {
ctx, span := tp.Start(ctx, name)
span.SetAttributes(
attribute.Key(semconv.DBStatementKey).String(query),
attribute.Key(semconv.DBSystemKey).String("cassandra"),
attribute.Key("component").String("gocql"),
)
return ctx, span
}

func logErrorToSpan(span opentracing.Span, err error) {
func logErrorToSpan(span trace.Span, err error) {
if err == nil {
return
}
ottag.Error.Set(span, true)
span.LogFields(otlog.Error(err))
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
}

0 comments on commit a56c018

Please sign in to comment.