From 3fd11a0ef520f824e2c885c35eb81316b742681b Mon Sep 17 00:00:00 2001 From: Arthur Silva Sens Date: Fri, 31 Jan 2025 19:09:35 -0300 Subject: [PATCH 1/6] Add tests showcasing deadlock Signed-off-by: Arthur Silva Sens --- exporter/prometheusremotewriteexporter/wal.go | 39 ++++++++---- .../prometheusremotewriteexporter/wal_test.go | 62 +++++++++++++++++++ 2 files changed, 90 insertions(+), 11 deletions(-) diff --git a/exporter/prometheusremotewriteexporter/wal.go b/exporter/prometheusremotewriteexporter/wal.go index f40e21d063b9..1067e270612e 100644 --- a/exporter/prometheusremotewriteexporter/wal.go +++ b/exporter/prometheusremotewriteexporter/wal.go @@ -18,10 +18,12 @@ import ( "github.com/tidwall/wal" "go.uber.org/multierr" "go.uber.org/zap" + + "github.com/ietxaniz/delock" ) type prweWAL struct { - mu sync.Mutex // mu protects the fields below. + mu delock.Mutex // mu protects the fields below. wal *wal.Log walConfig *WALConfig walPath string @@ -94,8 +96,11 @@ var ( // retrieveWALIndices queries the WriteAheadLog for its current first and last indices. func (prwe *prweWAL) retrieveWALIndices() (err error) { - prwe.mu.Lock() - defer prwe.mu.Unlock() + lockID, err := prwe.mu.Lock() + if err != nil { + panic(err) + } + defer prwe.mu.Unlock(lockID) err = prwe.closeWAL() if err != nil { @@ -127,8 +132,11 @@ func (prwe *prweWAL) retrieveWALIndices() (err error) { func (prwe *prweWAL) stop() error { err := errAlreadyClosed prwe.stopOnce.Do(func() { - prwe.mu.Lock() - defer prwe.mu.Unlock() + lockID, err := prwe.mu.Lock() + if err != nil { + panic(err) + } + defer prwe.mu.Unlock(lockID) close(prwe.stopChan) err = prwe.closeWAL() @@ -260,8 +268,11 @@ func (prwe *prweWAL) closeWAL() error { } func (prwe *prweWAL) syncAndTruncateFront() error { - prwe.mu.Lock() - defer prwe.mu.Unlock() + lockID, err := prwe.mu.Lock() + if err != nil { + panic(err) + } + defer prwe.mu.Unlock(lockID) if prwe.wal == nil { return errNilWAL @@ -301,8 +312,11 @@ func (prwe *prweWAL) exportThenFrontTruncateWAL(ctx context.Context, reqL []*pro // write them to the Write-Ahead-Log so that shutdowns won't lose data, and that the routine that // reads from the WAL can then process the previously serialized requests. func (prwe *prweWAL) persistToWAL(requests []*prompb.WriteRequest) error { - prwe.mu.Lock() - defer prwe.mu.Unlock() + lockID, err := prwe.mu.Lock() + if err != nil { + panic(err) + } + defer prwe.mu.Unlock(lockID) // Write all the requests to the WAL in a batch. batch := new(wal.Batch) @@ -319,8 +333,11 @@ func (prwe *prweWAL) persistToWAL(requests []*prompb.WriteRequest) error { } func (prwe *prweWAL) readPrompbFromWAL(ctx context.Context, index uint64) (wreq *prompb.WriteRequest, err error) { - prwe.mu.Lock() - defer prwe.mu.Unlock() + lockID, err := prwe.mu.Lock() + if err != nil { + panic(err) + } + defer prwe.mu.Unlock(lockID) var protoBlob []byte for i := 0; i < 12; i++ { diff --git a/exporter/prometheusremotewriteexporter/wal_test.go b/exporter/prometheusremotewriteexporter/wal_test.go index 9059b65ed2da..c26a6c08c9b2 100644 --- a/exporter/prometheusremotewriteexporter/wal_test.go +++ b/exporter/prometheusremotewriteexporter/wal_test.go @@ -5,13 +5,22 @@ package prometheusremotewriteexporter import ( "context" + "io" + "net/http" + "net/http/httptest" "sort" "testing" "time" + "github.com/gogo/protobuf/proto" + "github.com/golang/snappy" "github.com/prometheus/prometheus/prompb" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/config/confighttp" + "go.opentelemetry.io/collector/exporter/exportertest" ) func doNothingExportSink(_ context.Context, reqL []*prompb.WriteRequest) error { @@ -149,3 +158,56 @@ func TestWAL_persist(t *testing.T) { require.Equal(t, reqLFromWAL[0], reqL[0]) require.Equal(t, reqLFromWAL[1], reqL[1]) } + +func TestExportWithWALEnabled(t *testing.T) { + cfg := &Config{ + WAL: &WALConfig{ + Directory: t.TempDir(), + }, + TargetInfo: &TargetInfo{}, // Declared just to avoid nil pointer dereference. + CreatedMetric: &CreatedMetric{}, // Declared just to avoid nil pointer dereference. + } + buildInfo := component.BuildInfo{ + Description: "OpenTelemetry Collector", + Version: "1.0", + } + set := exportertest.NewNopSettings() + set.BuildInfo = buildInfo + + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + body, err := io.ReadAll(r.Body) + require.NoError(t, err) + require.NotNil(t, body) + // Receives the http requests and unzip, unmarshalls, and extracts TimeSeries + writeReq := &prompb.WriteRequest{} + var unzipped []byte + + dest, err := snappy.Decode(unzipped, body) + require.NoError(t, err) + + ok := proto.Unmarshal(dest, writeReq) + require.NoError(t, ok) + + assert.Len(t, writeReq.Timeseries, 1) + })) + clientConfig := confighttp.NewDefaultClientConfig() + clientConfig.Endpoint = server.URL + cfg.ClientConfig = clientConfig + + prwe, err := newPRWExporter(cfg, set) + assert.NoError(t, err) + assert.NotNil(t, prwe) + err = prwe.Start(context.Background(), componenttest.NewNopHost()) + require.NoError(t, err) + assert.NotNil(t, prwe.client) + + metrics := map[string]*prompb.TimeSeries{ + "test_metric": { + Labels: []prompb.Label{{Name: "__name__", Value: "test_metric"}}, + Samples: []prompb.Sample{{Value: 1, Timestamp: 100}}, + }, + } + err = prwe.handleExport(context.Background(), metrics, nil) + assert.NoError(t, err) +} From bdd5c211abfa308c7bd65d1dfe6ef81ebcfb1ac1 Mon Sep 17 00:00:00 2001 From: Arthur Silva Sens Date: Sun, 2 Feb 2025 12:06:55 -0300 Subject: [PATCH 2/6] Adopt 'notifier/subscriber go routines' strategy for WAL Signed-off-by: Arthur Silva Sens --- exporter/prometheusremotewriteexporter/go.mod | 3 +- exporter/prometheusremotewriteexporter/wal.go | 120 ++++-------------- 2 files changed, 30 insertions(+), 93 deletions(-) diff --git a/exporter/prometheusremotewriteexporter/go.mod b/exporter/prometheusremotewriteexporter/go.mod index aa8a53fef73d..91a610e6955b 100644 --- a/exporter/prometheusremotewriteexporter/go.mod +++ b/exporter/prometheusremotewriteexporter/go.mod @@ -4,7 +4,6 @@ go 1.22.0 require ( github.com/cenkalti/backoff/v4 v4.3.0 - github.com/fsnotify/fsnotify v1.8.0 github.com/gogo/protobuf v1.3.2 github.com/golang/snappy v0.0.4 github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.118.0 @@ -37,6 +36,8 @@ require ( go.uber.org/zap v1.27.0 ) +require github.com/fsnotify/fsnotify v1.8.0 // indirect + require ( github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect diff --git a/exporter/prometheusremotewriteexporter/wal.go b/exporter/prometheusremotewriteexporter/wal.go index 1067e270612e..fc755e9da73e 100644 --- a/exporter/prometheusremotewriteexporter/wal.go +++ b/exporter/prometheusremotewriteexporter/wal.go @@ -12,18 +12,15 @@ import ( "sync/atomic" "time" - "github.com/fsnotify/fsnotify" "github.com/gogo/protobuf/proto" "github.com/prometheus/prometheus/prompb" "github.com/tidwall/wal" "go.uber.org/multierr" "go.uber.org/zap" - - "github.com/ietxaniz/delock" ) type prweWAL struct { - mu delock.Mutex // mu protects the fields below. + mu sync.Mutex // mu protects the fields below. wal *wal.Log walConfig *WALConfig walPath string @@ -32,6 +29,7 @@ type prweWAL struct { stopOnce sync.Once stopChan chan struct{} + rNotify chan struct{} rWALIndex *atomic.Uint64 wWALIndex *atomic.Uint64 } @@ -72,6 +70,7 @@ func newWAL(walConfig *WALConfig, exportSink func(context.Context, []*prompb.Wri exportSink: exportSink, walConfig: walConfig, stopChan: make(chan struct{}), + rNotify: make(chan struct{}), rWALIndex: &atomic.Uint64{}, wWALIndex: &atomic.Uint64{}, } @@ -96,11 +95,8 @@ var ( // retrieveWALIndices queries the WriteAheadLog for its current first and last indices. func (prwe *prweWAL) retrieveWALIndices() (err error) { - lockID, err := prwe.mu.Lock() - if err != nil { - panic(err) - } - defer prwe.mu.Unlock(lockID) + prwe.mu.Lock() + defer prwe.mu.Unlock() err = prwe.closeWAL() if err != nil { @@ -132,11 +128,8 @@ func (prwe *prweWAL) retrieveWALIndices() (err error) { func (prwe *prweWAL) stop() error { err := errAlreadyClosed prwe.stopOnce.Do(func() { - lockID, err := prwe.mu.Lock() - if err != nil { - panic(err) - } - defer prwe.mu.Unlock(lockID) + prwe.mu.Lock() + defer prwe.mu.Unlock() close(prwe.stopChan) err = prwe.closeWAL() @@ -268,11 +261,8 @@ func (prwe *prweWAL) closeWAL() error { } func (prwe *prweWAL) syncAndTruncateFront() error { - lockID, err := prwe.mu.Lock() - if err != nil { - panic(err) - } - defer prwe.mu.Unlock(lockID) + prwe.mu.Lock() + defer prwe.mu.Unlock() if prwe.wal == nil { return errNilWAL @@ -312,11 +302,8 @@ func (prwe *prweWAL) exportThenFrontTruncateWAL(ctx context.Context, reqL []*pro // write them to the Write-Ahead-Log so that shutdowns won't lose data, and that the routine that // reads from the WAL can then process the previously serialized requests. func (prwe *prweWAL) persistToWAL(requests []*prompb.WriteRequest) error { - lockID, err := prwe.mu.Lock() - if err != nil { - panic(err) - } - defer prwe.mu.Unlock(lockID) + prwe.mu.Lock() + defer prwe.mu.Unlock() // Write all the requests to the WAL in a batch. batch := new(wal.Batch) @@ -329,16 +316,15 @@ func (prwe *prweWAL) persistToWAL(requests []*prompb.WriteRequest) error { batch.Write(wIndex, protoBlob) } + // Notify reader go routine that is possibly waiting for writes. + select { + case prwe.rNotify <- struct{}{}: + default: + } return prwe.wal.WriteBatch(batch) } func (prwe *prweWAL) readPrompbFromWAL(ctx context.Context, index uint64) (wreq *prompb.WriteRequest, err error) { - lockID, err := prwe.mu.Lock() - if err != nil { - panic(err) - } - defer prwe.mu.Unlock(lockID) - var protoBlob []byte for i := 0; i < 12; i++ { // Firstly check if we've been terminated, then exit if so. @@ -354,10 +340,10 @@ func (prwe *prweWAL) readPrompbFromWAL(ctx context.Context, index uint64) (wreq index = 1 } + prwe.mu.Lock() if prwe.wal == nil { return nil, fmt.Errorf("attempt to read from closed WAL") } - protoBlob, err = prwe.wal.Read(index) if err == nil { // The read succeeded. req := new(prompb.WriteRequest) @@ -368,74 +354,24 @@ func (prwe *prweWAL) readPrompbFromWAL(ctx context.Context, index uint64) (wreq // Now increment the WAL's read index. prwe.rWALIndex.Add(1) + prwe.mu.Unlock() return req, nil } + prwe.mu.Unlock() - if !errors.Is(err, wal.ErrNotFound) { - return nil, err - } - - if index <= 1 { - // This could be the very first attempted read, so try again, after a small sleep. - time.Sleep(time.Duration(1< Date: Sun, 2 Feb 2025 12:10:45 -0300 Subject: [PATCH 3/6] Add changelog Signed-off-by: Arthur Silva Sens --- .chloggen/prw-WAL-deadlock.yaml | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) create mode 100644 .chloggen/prw-WAL-deadlock.yaml diff --git a/.chloggen/prw-WAL-deadlock.yaml b/.chloggen/prw-WAL-deadlock.yaml new file mode 100644 index 000000000000..a7cacdf78750 --- /dev/null +++ b/.chloggen/prw-WAL-deadlock.yaml @@ -0,0 +1,19 @@ +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: bug_fix + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: prometheusremotewriteexproter + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: | + Resolves a deadlock in the WAL by temporarily releasing a lock while waiting for new writes to the WAL. +# One or more tracking issues related to the change +issues: [19363, 24399, 15277] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + + +change_logs: [user] \ No newline at end of file From aa8b34e24f814d4f3275392877e1c4041774a24d Mon Sep 17 00:00:00 2001 From: Arthur Silva Sens Date: Sun, 2 Feb 2025 12:25:21 -0300 Subject: [PATCH 4/6] make lint Signed-off-by: Arthur Silva Sens --- exporter/prometheusremotewriteexporter/wal_test.go | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/exporter/prometheusremotewriteexporter/wal_test.go b/exporter/prometheusremotewriteexporter/wal_test.go index c26a6c08c9b2..fa9993bbbd7a 100644 --- a/exporter/prometheusremotewriteexporter/wal_test.go +++ b/exporter/prometheusremotewriteexporter/wal_test.go @@ -164,7 +164,7 @@ func TestExportWithWALEnabled(t *testing.T) { WAL: &WALConfig{ Directory: t.TempDir(), }, - TargetInfo: &TargetInfo{}, // Declared just to avoid nil pointer dereference. + TargetInfo: &TargetInfo{}, // Declared just to avoid nil pointer dereference. CreatedMetric: &CreatedMetric{}, // Declared just to avoid nil pointer dereference. } buildInfo := component.BuildInfo{ @@ -174,20 +174,19 @@ func TestExportWithWALEnabled(t *testing.T) { set := exportertest.NewNopSettings() set.BuildInfo = buildInfo - - server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + server := httptest.NewServer(http.HandlerFunc(func(_ http.ResponseWriter, r *http.Request) { body, err := io.ReadAll(r.Body) - require.NoError(t, err) - require.NotNil(t, body) + assert.NoError(t, err) + assert.NotNil(t, body) // Receives the http requests and unzip, unmarshalls, and extracts TimeSeries writeReq := &prompb.WriteRequest{} var unzipped []byte dest, err := snappy.Decode(unzipped, body) - require.NoError(t, err) + assert.NoError(t, err) ok := proto.Unmarshal(dest, writeReq) - require.NoError(t, ok) + assert.NoError(t, ok) assert.Len(t, writeReq.Timeseries, 1) })) From df491252f64963ef3e437640df4dd7801cc8cd20 Mon Sep 17 00:00:00 2001 From: Arthur Silva Sens Date: Mon, 3 Feb 2025 09:57:46 -0300 Subject: [PATCH 5/6] Shutdown WAL to avoid flaky tests Signed-off-by: Arthur Silva Sens --- exporter/prometheusremotewriteexporter/wal_test.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/exporter/prometheusremotewriteexporter/wal_test.go b/exporter/prometheusremotewriteexporter/wal_test.go index fa9993bbbd7a..db4c41c91c1c 100644 --- a/exporter/prometheusremotewriteexporter/wal_test.go +++ b/exporter/prometheusremotewriteexporter/wal_test.go @@ -209,4 +209,9 @@ func TestExportWithWALEnabled(t *testing.T) { } err = prwe.handleExport(context.Background(), metrics, nil) assert.NoError(t, err) + + // While on Unix systems, t.TempDir() would easily close the WAL files, + // on Windows, it doesn't. So we need to close it manually to avoid flaky tests. + err = prwe.Shutdown(context.Background()) + assert.NoError(t, err) } From aea10475ab6ff52d6fd934564f3c8417e9c83f96 Mon Sep 17 00:00:00 2001 From: Arthur Silva Sens Date: Mon, 3 Feb 2025 13:36:45 -0300 Subject: [PATCH 6/6] Respect stop channel while waiting for WAL writes Signed-off-by: Arthur Silva Sens --- exporter/prometheusremotewriteexporter/wal.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/exporter/prometheusremotewriteexporter/wal.go b/exporter/prometheusremotewriteexporter/wal.go index fc755e9da73e..381b5b2ce36d 100644 --- a/exporter/prometheusremotewriteexporter/wal.go +++ b/exporter/prometheusremotewriteexporter/wal.go @@ -366,6 +366,8 @@ func (prwe *prweWAL) readPrompbFromWAL(ctx context.Context, index uint64) (wreq case <-prwe.rNotify: case <-ctx.Done(): return nil, ctx.Err() + case <-prwe.stopChan: + return nil, fmt.Errorf("attempt to read from WAL after stopped") } }