diff --git a/.chloggen/prw-WAL-deadlock.yaml b/.chloggen/prw-WAL-deadlock.yaml new file mode 100644 index 000000000000..a7cacdf78750 --- /dev/null +++ b/.chloggen/prw-WAL-deadlock.yaml @@ -0,0 +1,19 @@ +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: bug_fix + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: prometheusremotewriteexproter + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: | + Resolves a deadlock in the WAL by temporarily releasing a lock while waiting for new writes to the WAL. +# One or more tracking issues related to the change +issues: [19363, 24399, 15277] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + + +change_logs: [user] \ No newline at end of file diff --git a/exporter/prometheusremotewriteexporter/go.mod b/exporter/prometheusremotewriteexporter/go.mod index aa8a53fef73d..91a610e6955b 100644 --- a/exporter/prometheusremotewriteexporter/go.mod +++ b/exporter/prometheusremotewriteexporter/go.mod @@ -4,7 +4,6 @@ go 1.22.0 require ( github.com/cenkalti/backoff/v4 v4.3.0 - github.com/fsnotify/fsnotify v1.8.0 github.com/gogo/protobuf v1.3.2 github.com/golang/snappy v0.0.4 github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.118.0 @@ -37,6 +36,8 @@ require ( go.uber.org/zap v1.27.0 ) +require github.com/fsnotify/fsnotify v1.8.0 // indirect + require ( github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect diff --git a/exporter/prometheusremotewriteexporter/wal.go b/exporter/prometheusremotewriteexporter/wal.go index f40e21d063b9..381b5b2ce36d 100644 --- a/exporter/prometheusremotewriteexporter/wal.go +++ b/exporter/prometheusremotewriteexporter/wal.go @@ -12,7 +12,6 @@ import ( "sync/atomic" "time" - "github.com/fsnotify/fsnotify" "github.com/gogo/protobuf/proto" "github.com/prometheus/prometheus/prompb" "github.com/tidwall/wal" @@ -30,6 +29,7 @@ type prweWAL struct { stopOnce sync.Once stopChan chan struct{} + rNotify chan struct{} rWALIndex *atomic.Uint64 wWALIndex *atomic.Uint64 } @@ -70,6 +70,7 @@ func newWAL(walConfig *WALConfig, exportSink func(context.Context, []*prompb.Wri exportSink: exportSink, walConfig: walConfig, stopChan: make(chan struct{}), + rNotify: make(chan struct{}), rWALIndex: &atomic.Uint64{}, wWALIndex: &atomic.Uint64{}, } @@ -315,13 +316,15 @@ func (prwe *prweWAL) persistToWAL(requests []*prompb.WriteRequest) error { batch.Write(wIndex, protoBlob) } + // Notify reader go routine that is possibly waiting for writes. + select { + case prwe.rNotify <- struct{}{}: + default: + } return prwe.wal.WriteBatch(batch) } func (prwe *prweWAL) readPrompbFromWAL(ctx context.Context, index uint64) (wreq *prompb.WriteRequest, err error) { - prwe.mu.Lock() - defer prwe.mu.Unlock() - var protoBlob []byte for i := 0; i < 12; i++ { // Firstly check if we've been terminated, then exit if so. @@ -337,10 +340,10 @@ func (prwe *prweWAL) readPrompbFromWAL(ctx context.Context, index uint64) (wreq index = 1 } + prwe.mu.Lock() if prwe.wal == nil { return nil, fmt.Errorf("attempt to read from closed WAL") } - protoBlob, err = prwe.wal.Read(index) if err == nil { // The read succeeded. req := new(prompb.WriteRequest) @@ -351,74 +354,26 @@ func (prwe *prweWAL) readPrompbFromWAL(ctx context.Context, index uint64) (wreq // Now increment the WAL's read index. prwe.rWALIndex.Add(1) + prwe.mu.Unlock() return req, nil } + prwe.mu.Unlock() - if !errors.Is(err, wal.ErrNotFound) { - return nil, err - } - - if index <= 1 { - // This could be the very first attempted read, so try again, after a small sleep. - time.Sleep(time.Duration(1<