Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix es integration test 6094 #6157

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 101 additions & 7 deletions plugin/storage/es/spanstore/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package spanstore
import (
"context"
"fmt"
"sync"
"time"

"go.uber.org/zap"
Expand All @@ -21,10 +22,11 @@ import (
)

const (
spanType = "span"
serviceType = "service"
serviceCacheTTLDefault = 12 * time.Hour
indexCacheTTLDefault = 48 * time.Hour
spanType = "span"
serviceType = "service"
serviceCacheTTLDefault = 12 * time.Hour
indexCacheTTLDefault = 48 * time.Hour
defaultIndexWaitTimeout = 60 * time.Second
)

type spanWriterMetrics struct {
Expand All @@ -42,6 +44,63 @@ type SpanWriter struct {
serviceWriter serviceWriter
spanConverter dbmodel.FromDomain
spanServiceIndex spanAndServiceIndexFn
indexCache sync.Map
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please delete from 43

}

func (s *SpanWriter) ensureIndex(ctx context.Context, indexName string) error {
if _, exists := s.indexCache.Load(indexName); exists {
return nil
}

Comment on lines +51 to +54
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the following statement looks like a CAS operation, so why do you need this one above?

_, loaded := s.indexCache.LoadOrStore(indexName, struct{}{})
if loaded {
return nil
}

exists, err := s.client().IndexExists(indexName).Do(ctx)
if err != nil {
return fmt.Errorf("failed to check index existence: %w", err)
}

if !exists {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shortcircuit

if exists { return }

s.logger.Info("Creating index", zap.String("index", indexName))

// Set specific settings for the test environment
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

plugin/storage/es/spanstore/writer.go is production code, there's no place for test settings here

body := `{
"settings": {
"number_of_shards": 1,
"number_of_replicas": 0,
"index.write.wait_for_active_shards": 1
}
}`

_, err = s.client().CreateIndex(indexName).Body(body).Do(ctx)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the writer should not be responsible for creating indices. The indices are created by ES automatically from templates on writes.

The templates are created by the factory:

if err := writer.CreateTemplates(spanMapping, serviceMapping, cfg.Indices.IndexPrefix); err != nil {

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yurishkuro

I see, now i understand the core issue better:

  1. it's about template creation timing vs span writing.
  2. Indices are automatically created by ES based on templates.
  3. The race condition occurs in factory.go where templates are created and then spans are written before templates are fully ready?

Would this be a better approach?

  1. Remove all index management code from SpanWriter
  2. Add template creation synchronization in factory.go's createSpanWriter,

something like, passing context in createSpanWriter and in Create Templates function we can handle the context.. I am taling about these 2 functions, if thinking in correct direction shall i proceed and test?

Copy link
Author

@madmecodes madmecodes Nov 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Visualizing like this (What do you think, shall i proceed with this?) I really think by passing context we can mititgate the issue.

Before:
Start -> Create Writer -> Start Template Creation -> Return Writer -> Write Spans (fails since template still creating)
[Templates Still Creating...]

After:
Start-> Create Writer -> Create Templates -> Retry if needed -> Templates Ready -> Return Writer -> Write Spans

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we want the sequence in the After part, we just need to enforce that the templates finished creating before returning the writer.

if err != nil {
return fmt.Errorf("failed to create index with settings: %w", err)
}
s.logger.Info("Index created with settings",
zap.String("index", indexName),
zap.String("settings", body))
}

// Wait for index to be ready by checking its existence repeatedly
deadline := time.Now().Add(defaultIndexWaitTimeout)
start := time.Now()
for time.Now().Before(deadline) {
exists, err := s.client().IndexExists(indexName).Do(ctx)
if err == nil && exists {
s.logger.Info("Index is ready",
zap.String("index", indexName),
zap.Duration("took", time.Since(start)))
return nil
}
s.logger.Debug("Waiting for index to be ready",
zap.String("index", indexName),
zap.Duration("elapsed", time.Since(start)))
time.Sleep(time.Second)
}

return fmt.Errorf("timeout waiting for index %s to be ready", indexName)
}

// SpanWriterParams holds constructor parameters for NewSpanWriter
Expand Down Expand Up @@ -121,14 +180,49 @@ func getSpanAndServiceIndexFn(p SpanWriterParams) spanAndServiceIndexFn {
}

// WriteSpan writes a span and its corresponding service:operation in ElasticSearch
func (s *SpanWriter) WriteSpan(_ context.Context, span *model.Span) error {
func (s *SpanWriter) WriteSpan(ctx context.Context, span *model.Span) error {
spanIndexName, serviceIndexName := s.spanServiceIndex(span.StartTime)

// Ensure indices exist before writing
if err := s.ensureIndex(ctx, spanIndexName); err != nil {
return fmt.Errorf("failed to ensure span index: %w", err)
}
if serviceIndexName != "" {
if err := s.ensureIndex(ctx, serviceIndexName); err != nil {
return fmt.Errorf("failed to ensure service index: %w", err)
}
}

jsonSpan := s.spanConverter.FromDomainEmbedProcess(span)
if serviceIndexName != "" {
s.writeService(serviceIndexName, jsonSpan)
}
s.writeSpan(spanIndexName, jsonSpan)
s.logger.Debug("Wrote span to ES index", zap.String("index", spanIndexName))

// Write with retries
var lastErr error
for i := 0; i < 3; i++ {
err := s.writeSpanWithResult(ctx, spanIndexName, jsonSpan)
if err == nil {
return nil
}
lastErr = err
s.logger.Debug("Retrying span write",
zap.String("index", spanIndexName),
zap.Int("attempt", i+1),
zap.Error(lastErr))
time.Sleep(time.Duration(i+1) * 100 * time.Millisecond)
}

return fmt.Errorf("failed to write span after retries: %w", lastErr)
}

func (s *SpanWriter) writeSpanWithResult(_ context.Context, indexName string, jsonSpan *dbmodel.Span) error {
indexService := s.client().Index().
Index(indexName).
Type(spanType).
BodyJson(jsonSpan)

indexService.Add()
return nil
}

Expand Down