Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[exporter/prometheusremotewrite] Fix WAL deadlock #37630

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions .chloggen/prw-WAL-deadlock.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: prometheusremotewriteexproter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: |
Resolves a deadlock in the WAL by temporarily releasing a lock while waiting for new writes to the WAL.
# One or more tracking issues related to the change
issues: [19363, 24399, 15277]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:


change_logs: [user]
3 changes: 2 additions & 1 deletion exporter/prometheusremotewriteexporter/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ go 1.22.0

require (
github.com/cenkalti/backoff/v4 v4.3.0
github.com/fsnotify/fsnotify v1.8.0
github.com/gogo/protobuf v1.3.2
github.com/golang/snappy v0.0.4
github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.118.0
Expand Down Expand Up @@ -37,6 +36,8 @@ require (
go.uber.org/zap v1.27.0
)

require github.com/fsnotify/fsnotify v1.8.0 // indirect

require (
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
Expand Down
87 changes: 21 additions & 66 deletions exporter/prometheusremotewriteexporter/wal.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"sync/atomic"
"time"

"github.com/fsnotify/fsnotify"
"github.com/gogo/protobuf/proto"
"github.com/prometheus/prometheus/prompb"
"github.com/tidwall/wal"
Expand All @@ -30,6 +29,7 @@ type prweWAL struct {

stopOnce sync.Once
stopChan chan struct{}
rNotify chan struct{}
rWALIndex *atomic.Uint64
wWALIndex *atomic.Uint64
}
Expand Down Expand Up @@ -70,6 +70,7 @@ func newWAL(walConfig *WALConfig, exportSink func(context.Context, []*prompb.Wri
exportSink: exportSink,
walConfig: walConfig,
stopChan: make(chan struct{}),
rNotify: make(chan struct{}),
rWALIndex: &atomic.Uint64{},
wWALIndex: &atomic.Uint64{},
}
Expand Down Expand Up @@ -315,13 +316,15 @@ func (prwe *prweWAL) persistToWAL(requests []*prompb.WriteRequest) error {
batch.Write(wIndex, protoBlob)
}

// Notify reader go routine that is possibly waiting for writes.
select {
case prwe.rNotify <- struct{}{}:
default:
}
return prwe.wal.WriteBatch(batch)
}

func (prwe *prweWAL) readPrompbFromWAL(ctx context.Context, index uint64) (wreq *prompb.WriteRequest, err error) {
prwe.mu.Lock()
defer prwe.mu.Unlock()

var protoBlob []byte
for i := 0; i < 12; i++ {
// Firstly check if we've been terminated, then exit if so.
Expand All @@ -337,10 +340,10 @@ func (prwe *prweWAL) readPrompbFromWAL(ctx context.Context, index uint64) (wreq
index = 1
}

prwe.mu.Lock()
if prwe.wal == nil {
return nil, fmt.Errorf("attempt to read from closed WAL")
}

protoBlob, err = prwe.wal.Read(index)
if err == nil { // The read succeeded.
req := new(prompb.WriteRequest)
Expand All @@ -351,74 +354,26 @@ func (prwe *prweWAL) readPrompbFromWAL(ctx context.Context, index uint64) (wreq
// Now increment the WAL's read index.
prwe.rWALIndex.Add(1)

prwe.mu.Unlock()
return req, nil
}
prwe.mu.Unlock()

if !errors.Is(err, wal.ErrNotFound) {
return nil, err
}

if index <= 1 {
// This could be the very first attempted read, so try again, after a small sleep.
time.Sleep(time.Duration(1<<i) * time.Millisecond)
continue
}

// Otherwise, we couldn't find the record, let's try watching
// the WAL file until perhaps there is a write to it.
walWatcher, werr := fsnotify.NewWatcher()
if werr != nil {
return nil, werr
}
if werr = walWatcher.Add(prwe.walPath); werr != nil {
return nil, werr
}

// Watch until perhaps there is a write to the WAL file.
watchCh := make(chan error)
wErr := err
go func() {
defer func() {
watchCh <- wErr
close(watchCh)
// Close the file watcher.
walWatcher.Close()
}()

// If WAL was empty, let's wait for a notification from
// the writer go routine.
if errors.Is(err, wal.ErrNotFound) {
select {
case <-ctx.Done(): // If the context was cancelled, bail out ASAP.
wErr = ctx.Err()
return

case event, ok := <-walWatcher.Events:
if !ok {
return
}
switch event.Op {
case fsnotify.Remove:
// The file got deleted.
// TODO: Add capabilities to search for the updated file.
case fsnotify.Rename:
// Renamed, we don't have information about the renamed file's new name.
case fsnotify.Write:
// Finally a write, let's try reading again, but after some watch.
wErr = nil
}

case eerr, ok := <-walWatcher.Errors:
if ok {
wErr = eerr
}
case <-prwe.rNotify:
case <-ctx.Done():
ArthurSens marked this conversation as resolved.
Show resolved Hide resolved
return nil, ctx.Err()
case <-prwe.stopChan:
return nil, fmt.Errorf("attempt to read from WAL after stopped")
}
}()

if gerr := <-watchCh; gerr != nil {
return nil, gerr
}

// Otherwise a write occurred might have occurred,
// and we can sleep for a little bit then try again.
time.Sleep(time.Duration(1<<i) * time.Millisecond)
if !errors.Is(err, wal.ErrNotFound) {
return nil, err
}
}
return nil, err
}
66 changes: 66 additions & 0 deletions exporter/prometheusremotewriteexporter/wal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,22 @@ package prometheusremotewriteexporter

import (
"context"
"io"
"net/http"
"net/http/httptest"
"sort"
"testing"
"time"

"github.com/gogo/protobuf/proto"
"github.com/golang/snappy"
"github.com/prometheus/prometheus/prompb"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config/confighttp"
"go.opentelemetry.io/collector/exporter/exportertest"
)

func doNothingExportSink(_ context.Context, reqL []*prompb.WriteRequest) error {
Expand Down Expand Up @@ -149,3 +158,60 @@ func TestWAL_persist(t *testing.T) {
require.Equal(t, reqLFromWAL[0], reqL[0])
require.Equal(t, reqLFromWAL[1], reqL[1])
}

func TestExportWithWALEnabled(t *testing.T) {
cfg := &Config{
WAL: &WALConfig{
Directory: t.TempDir(),
},
TargetInfo: &TargetInfo{}, // Declared just to avoid nil pointer dereference.
CreatedMetric: &CreatedMetric{}, // Declared just to avoid nil pointer dereference.
}
buildInfo := component.BuildInfo{
Description: "OpenTelemetry Collector",
Version: "1.0",
}
set := exportertest.NewNopSettings()
set.BuildInfo = buildInfo

server := httptest.NewServer(http.HandlerFunc(func(_ http.ResponseWriter, r *http.Request) {
body, err := io.ReadAll(r.Body)
assert.NoError(t, err)
assert.NotNil(t, body)
// Receives the http requests and unzip, unmarshalls, and extracts TimeSeries
writeReq := &prompb.WriteRequest{}
var unzipped []byte

dest, err := snappy.Decode(unzipped, body)
assert.NoError(t, err)

ok := proto.Unmarshal(dest, writeReq)
assert.NoError(t, ok)

assert.Len(t, writeReq.Timeseries, 1)
}))
clientConfig := confighttp.NewDefaultClientConfig()
clientConfig.Endpoint = server.URL
cfg.ClientConfig = clientConfig

prwe, err := newPRWExporter(cfg, set)
assert.NoError(t, err)
assert.NotNil(t, prwe)
err = prwe.Start(context.Background(), componenttest.NewNopHost())
require.NoError(t, err)
assert.NotNil(t, prwe.client)

metrics := map[string]*prompb.TimeSeries{
"test_metric": {
Labels: []prompb.Label{{Name: "__name__", Value: "test_metric"}},
Samples: []prompb.Sample{{Value: 1, Timestamp: 100}},
},
}
err = prwe.handleExport(context.Background(), metrics, nil)
assert.NoError(t, err)

// While on Unix systems, t.TempDir() would easily close the WAL files,
// on Windows, it doesn't. So we need to close it manually to avoid flaky tests.
err = prwe.Shutdown(context.Background())
assert.NoError(t, err)
}