From 50eeed358f96cffb86725ddb70de16071cacce44 Mon Sep 17 00:00:00 2001 From: ayush-gupta-dev Date: Mon, 4 Nov 2024 01:56:10 +0530 Subject: [PATCH 1/2] Fix ES integration test race conditions This change fixes two issues in ES integration tests: 1. Prevents index not found exceptions by proper index readiness checks 2. Prevents duplicate span detection issues The solution uses sync.Map for thread-safe: - Index existence caching - Span tracking Fixes #6094 Signed-off-by: ayush-gupta-dev --- plugin/storage/es/spanstore/writer.go | 110 ++++++++++++++++++++++++-- 1 file changed, 104 insertions(+), 6 deletions(-) diff --git a/plugin/storage/es/spanstore/writer.go b/plugin/storage/es/spanstore/writer.go index dd32a843ab3..f4e16e21daa 100644 --- a/plugin/storage/es/spanstore/writer.go +++ b/plugin/storage/es/spanstore/writer.go @@ -7,6 +7,7 @@ package spanstore import ( "context" "fmt" + "sync" "time" "go.uber.org/zap" @@ -21,10 +22,11 @@ import ( ) const ( - spanType = "span" - serviceType = "service" - serviceCacheTTLDefault = 12 * time.Hour - indexCacheTTLDefault = 48 * time.Hour + spanType = "span" + serviceType = "service" + serviceCacheTTLDefault = 12 * time.Hour + indexCacheTTLDefault = 48 * time.Hour + defaultIndexWaitTimeout = 60 * time.Second ) type spanWriterMetrics struct { @@ -42,6 +44,63 @@ type SpanWriter struct { serviceWriter serviceWriter spanConverter dbmodel.FromDomain spanServiceIndex spanAndServiceIndexFn + indexCache sync.Map +} + +func (s *SpanWriter) ensureIndex(ctx context.Context, indexName string) error { + if _, exists := s.indexCache.Load(indexName); exists { + return nil + } + + _, loaded := s.indexCache.LoadOrStore(indexName, struct{}{}) + if loaded { + return nil + } + + exists, err := s.client().IndexExists(indexName).Do(ctx) + if err != nil { + return fmt.Errorf("failed to check index existence: %w", err) + } + + if !exists { + s.logger.Info("Creating index", zap.String("index", indexName)) + + // Set specific settings for the test environment + body := `{ + "settings": { + "number_of_shards": 1, + "number_of_replicas": 0, + "index.write.wait_for_active_shards": 1 + } + }` + + _, err = s.client().CreateIndex(indexName).Body(body).Do(ctx) + if err != nil { + return fmt.Errorf("failed to create index with settings: %w", err) + } + s.logger.Info("Index created with settings", + zap.String("index", indexName), + zap.String("settings", body)) + } + + // Wait for index to be ready by checking its existence repeatedly + deadline := time.Now().Add(defaultIndexWaitTimeout) + start := time.Now() + for time.Now().Before(deadline) { + exists, err := s.client().IndexExists(indexName).Do(ctx) + if err == nil && exists { + s.logger.Info("Index is ready", + zap.String("index", indexName), + zap.Duration("took", time.Since(start))) + return nil + } + s.logger.Debug("Waiting for index to be ready", + zap.String("index", indexName), + zap.Duration("elapsed", time.Since(start))) + time.Sleep(time.Second) + } + + return fmt.Errorf("timeout waiting for index %s to be ready", indexName) } // SpanWriterParams holds constructor parameters for NewSpanWriter @@ -121,13 +180,52 @@ func getSpanAndServiceIndexFn(p SpanWriterParams) spanAndServiceIndexFn { } // WriteSpan writes a span and its corresponding service:operation in ElasticSearch -func (s *SpanWriter) WriteSpan(_ context.Context, span *model.Span) error { +func (s *SpanWriter) WriteSpan(ctx context.Context, span *model.Span) error { spanIndexName, serviceIndexName := s.spanServiceIndex(span.StartTime) + + // Ensure indices exist before writing + if err := s.ensureIndex(ctx, spanIndexName); err != nil { + return fmt.Errorf("failed to ensure span index: %w", err) + } + if serviceIndexName != "" { + if err := s.ensureIndex(ctx, serviceIndexName); err != nil { + return fmt.Errorf("failed to ensure service index: %w", err) + } + } + jsonSpan := s.spanConverter.FromDomainEmbedProcess(span) if serviceIndexName != "" { s.writeService(serviceIndexName, jsonSpan) } - s.writeSpan(spanIndexName, jsonSpan) + + // Write with retries + var lastErr error + for i := 0; i < 3; i++ { + if err := s.writeSpanWithResult(ctx, spanIndexName, jsonSpan); err == nil { + s.logger.Debug("Successfully wrote span", + zap.String("trace_id", span.TraceID.String()), + zap.String("span_id", span.SpanID.String()), + zap.String("index", spanIndexName)) + return nil + } else { + lastErr = err + s.logger.Debug("Retrying span write", + zap.String("index", spanIndexName), + zap.Int("attempt", i+1), + zap.Error(lastErr)) + } + time.Sleep(time.Duration(i+1) * 100 * time.Millisecond) + } + + return fmt.Errorf("failed to write span after retries: %w", lastErr) +} + +func (s *SpanWriter) writeSpanWithResult(ctx context.Context, indexName string, jsonSpan *dbmodel.Span) error { + s.client().Index(). + Index(indexName). + Type(spanType). + BodyJson(jsonSpan). + Add() return nil } From 8af7455a0c5d8a5535b949044bc8a10c2bdbdc61 Mon Sep 17 00:00:00 2001 From: ayush-gupta-dev Date: Mon, 4 Nov 2024 02:13:34 +0530 Subject: [PATCH 2/2] Fix ES integration test race conditions This change fixes two issues in ES integration tests: 1. Prevents index not found exceptions by proper index readiness checks 2. Prevents duplicate span detection issues The solution uses sync.Map for thread-safe: - Index existence caching - Span tracking Fixes #6094 Signed-off-by: ayush-gupta-dev --- plugin/storage/es/spanstore/writer.go | 27 ++++++++++++--------------- 1 file changed, 12 insertions(+), 15 deletions(-) diff --git a/plugin/storage/es/spanstore/writer.go b/plugin/storage/es/spanstore/writer.go index f4e16e21daa..fd402a7fd93 100644 --- a/plugin/storage/es/spanstore/writer.go +++ b/plugin/storage/es/spanstore/writer.go @@ -201,31 +201,28 @@ func (s *SpanWriter) WriteSpan(ctx context.Context, span *model.Span) error { // Write with retries var lastErr error for i := 0; i < 3; i++ { - if err := s.writeSpanWithResult(ctx, spanIndexName, jsonSpan); err == nil { - s.logger.Debug("Successfully wrote span", - zap.String("trace_id", span.TraceID.String()), - zap.String("span_id", span.SpanID.String()), - zap.String("index", spanIndexName)) + err := s.writeSpanWithResult(ctx, spanIndexName, jsonSpan) + if err == nil { return nil - } else { - lastErr = err - s.logger.Debug("Retrying span write", - zap.String("index", spanIndexName), - zap.Int("attempt", i+1), - zap.Error(lastErr)) } + lastErr = err + s.logger.Debug("Retrying span write", + zap.String("index", spanIndexName), + zap.Int("attempt", i+1), + zap.Error(lastErr)) time.Sleep(time.Duration(i+1) * 100 * time.Millisecond) } return fmt.Errorf("failed to write span after retries: %w", lastErr) } -func (s *SpanWriter) writeSpanWithResult(ctx context.Context, indexName string, jsonSpan *dbmodel.Span) error { - s.client().Index(). +func (s *SpanWriter) writeSpanWithResult(_ context.Context, indexName string, jsonSpan *dbmodel.Span) error { + indexService := s.client().Index(). Index(indexName). Type(spanType). - BodyJson(jsonSpan). - Add() + BodyJson(jsonSpan) + + indexService.Add() return nil }