From 3338ffd0871f60cb18b057d9473df74d0e8efdcd Mon Sep 17 00:00:00 2001 From: Piotr <17101802+thampiotr@users.noreply.github.com> Date: Tue, 12 Nov 2024 12:42:55 +0000 Subject: [PATCH] Test the pausing scheduler --- .../internal/lazyconsumer/lazyconsumer.go | 33 ++-- .../lazyconsumer/lazyconsumer_test.go | 150 +++++++++++++++--- .../otelcol/internal/scheduler/scheduler.go | 5 +- .../internal/scheduler/scheduler_test.go | 2 +- 4 files changed, 145 insertions(+), 45 deletions(-) diff --git a/internal/component/otelcol/internal/lazyconsumer/lazyconsumer.go b/internal/component/otelcol/internal/lazyconsumer/lazyconsumer.go index efde19a06..a5813e846 100644 --- a/internal/component/otelcol/internal/lazyconsumer/lazyconsumer.go +++ b/internal/component/otelcol/internal/lazyconsumer/lazyconsumer.go @@ -17,7 +17,7 @@ import ( type Consumer struct { ctx context.Context - // pauseMut is used to implement Pause & Resume semantics. See Pause method for more info. + // pauseMut and pausedWg are used to implement Pause & Resume semantics. See Pause method for more info. pauseMut sync.RWMutex pausedWg *sync.WaitGroup @@ -63,11 +63,7 @@ func (c *Consumer) ConsumeTraces(ctx context.Context, td ptrace.Traces) error { return c.ctx.Err() } - c.pauseMut.RLock() // wait until resumed - defer c.pauseMut.RUnlock() - if c.pausedWg != nil { - c.pausedWg.Wait() - } + c.waitUntilResumed() c.mut.RLock() defer c.mut.RUnlock() @@ -90,11 +86,7 @@ func (c *Consumer) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error return c.ctx.Err() } - c.pauseMut.RLock() // wait until resumed - defer c.pauseMut.RUnlock() - if c.pausedWg != nil { - c.pausedWg.Wait() - } + c.waitUntilResumed() c.mut.RLock() defer c.mut.RUnlock() @@ -117,11 +109,7 @@ func (c *Consumer) ConsumeLogs(ctx context.Context, ld plog.Logs) error { return c.ctx.Err() } - c.pauseMut.RLock() // wait until resumed - defer c.pauseMut.RUnlock() - if c.pausedWg != nil { - c.pausedWg.Wait() - } + c.waitUntilResumed() c.mut.RLock() defer c.mut.RUnlock() @@ -138,6 +126,15 @@ func (c *Consumer) ConsumeLogs(ctx context.Context, ld plog.Logs) error { return c.logsConsumer.ConsumeLogs(ctx, ld) } +func (c *Consumer) waitUntilResumed() { + c.pauseMut.RLock() + pausedWg := c.pausedWg + c.pauseMut.RUnlock() + if pausedWg != nil { + pausedWg.Wait() + } +} + // SetConsumers updates the internal consumers that Consumer will forward data // to. It is valid for any combination of m, l, and t to be nil. func (c *Consumer) SetConsumers(t otelconsumer.Traces, m otelconsumer.Metrics, l otelconsumer.Logs) { @@ -169,10 +166,10 @@ func (c *Consumer) Resume() { defer c.pauseMut.Unlock() if c.pausedWg == nil { - return // not paused + return // already resumed } - c.pausedWg.Done() + c.pausedWg.Done() // release all waiting c.pausedWg = nil } diff --git a/internal/component/otelcol/internal/lazyconsumer/lazyconsumer_test.go b/internal/component/otelcol/internal/lazyconsumer/lazyconsumer_test.go index 4085b22fc..1980029d5 100644 --- a/internal/component/otelcol/internal/lazyconsumer/lazyconsumer_test.go +++ b/internal/component/otelcol/internal/lazyconsumer/lazyconsumer_test.go @@ -2,13 +2,21 @@ package lazyconsumer import ( "context" + "sync" "testing" + "time" "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/pdata/plog" + "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/pdata/ptrace" + "go.uber.org/goleak" + + "github.com/grafana/alloy/internal/runtime/componenttest" ) func Test_PauseAndResume(t *testing.T) { - c := New(context.Background()) + c := New(componenttest.TestContext(t)) require.False(t, c.IsPaused()) c.Pause() require.True(t, c.IsPaused()) @@ -17,14 +25,14 @@ func Test_PauseAndResume(t *testing.T) { } func Test_NewPaused(t *testing.T) { - c := NewPaused(context.Background()) + c := NewPaused(componenttest.TestContext(t)) require.True(t, c.IsPaused()) c.Resume() require.False(t, c.IsPaused()) } func Test_PauseResume_MultipleCalls(t *testing.T) { - c := New(context.Background()) + c := New(componenttest.TestContext(t)) require.False(t, c.IsPaused()) c.Pause() c.Pause() @@ -36,24 +44,118 @@ func Test_PauseResume_MultipleCalls(t *testing.T) { require.False(t, c.IsPaused()) } -// func Test_PauseResume_Multithreaded(t *testing.T) { -// TODO(thampiotr): implement this test -// ctx := componenttest.TestContext(t) -// routines := 10 -// -// pauses -// -// for i := 0; i < routines; i++ { -// go func() { -// for { -// select { -// case <-ctx.Done(): -// return -// } -// } -// }() -// } -// -// -// -// } +func Test_ConsumeWaitsForResume(t *testing.T) { + goleak.VerifyNone(t, goleak.IgnoreCurrent()) + c := NewPaused(componenttest.TestContext(t)) + require.True(t, c.IsPaused()) + + method := map[string]func(){ + "ConsumeTraces": func() { + _ = c.ConsumeTraces(nil, ptrace.NewTraces()) + }, + "ConsumeMetrics": func() { + _ = c.ConsumeMetrics(nil, pmetric.NewMetrics()) + }, + "ConsumeLogs": func() { + _ = c.ConsumeLogs(nil, plog.NewLogs()) + }, + } + + for name, fn := range method { + t.Run(name, func(t *testing.T) { + c.Pause() + require.True(t, c.IsPaused()) + + started := make(chan struct{}) + finished := make(chan struct{}) + + // Start goroutine that attempts to run Consume* method + go func() { + started <- struct{}{} + fn() + finished <- struct{}{} + }() + + // Wait to be started + select { + case <-started: + case <-time.After(5 * time.Second): + t.Fatal("consumer goroutine never started") + } + + // Wait for a bit to ensure the consumer is blocking on Consume* function + select { + case <-finished: + t.Fatal("consumer should not have finished yet - it's paused") + case <-time.After(100 * time.Millisecond): + } + + // Resume the consumer and verify the Consume* function unblocked + c.Resume() + select { + case <-finished: + case <-time.After(5 * time.Second): + t.Fatal("consumer should have finished after resuming") + } + + }) + } +} + +func Test_PauseResume_Multithreaded(t *testing.T) { + goleak.VerifyNone(t, goleak.IgnoreCurrent()) + ctx, cancel := context.WithCancel(componenttest.TestContext(t)) + runs := 500 + routines := 5 + allDone := sync.WaitGroup{} + + c := NewPaused(componenttest.TestContext(t)) + require.True(t, c.IsPaused()) + + // Run goroutines that constantly try to call Consume* methods + for i := 0; i < routines; i++ { + allDone.Add(1) + go func() { + for { + select { + case <-ctx.Done(): + allDone.Done() + return + default: + _ = c.ConsumeLogs(ctx, plog.NewLogs()) + _ = c.ConsumeMetrics(ctx, pmetric.NewMetrics()) + _ = c.ConsumeTraces(ctx, ptrace.NewTraces()) + } + } + }() + } + + // Run goroutines that Pause and then Resume in parallel. + // In particular, this verifies we can call .Pause() and .Resume() on an already paused or already resumed consumer. + workChan := make(chan struct{}, routines) + for i := 0; i < routines; i++ { + allDone.Add(1) + go func() { + for { + select { + case <-workChan: + c.Pause() + c.Resume() + case <-ctx.Done(): + allDone.Done() + return + } + } + }() + } + + for i := 0; i < runs; i++ { + workChan <- struct{}{} + } + cancel() + + allDone.Wait() + + // Should not be paused as last call will always be c.Resume() + require.False(t, c.IsPaused()) +} diff --git a/internal/component/otelcol/internal/scheduler/scheduler.go b/internal/component/otelcol/internal/scheduler/scheduler.go index 489dada61..2c731616a 100644 --- a/internal/component/otelcol/internal/scheduler/scheduler.go +++ b/internal/component/otelcol/internal/scheduler/scheduler.go @@ -60,7 +60,8 @@ func New(l log.Logger) *Scheduler { // NewWithPauseCallbacks is like New, but allows to specify onPause and onResume callbacks. The scheduler is assumed to // start paused and only when its components are scheduled, it will call onResume. From then on, each update to running -// components via Schedule method will trigger a call to onPause and then onResume. +// components via Schedule method will trigger a call to onPause and then onResume. When scheduler is shutting down, it +// will call onResume as a last step. func NewWithPauseCallbacks(l log.Logger, onPause func(), onResume func()) *Scheduler { return &Scheduler{ log: l, @@ -104,7 +105,7 @@ func (cs *Scheduler) Run(ctx context.Context) error { cs.onPause() } cs.stopComponents(context.Background(), components...) - // We don't resume, as the scheduler is exiting. + cs.onResume() }() // Wait for a write to cs.newComponentsCh. The initial list of components is diff --git a/internal/component/otelcol/internal/scheduler/scheduler_test.go b/internal/component/otelcol/internal/scheduler/scheduler_test.go index 3a567b999..469d679b7 100644 --- a/internal/component/otelcol/internal/scheduler/scheduler_test.go +++ b/internal/component/otelcol/internal/scheduler/scheduler_test.go @@ -110,7 +110,7 @@ func TestScheduler(t *testing.T) { require.EventuallyWithT(t, func(t *assert.CollectT) { assert.Equal(t, 2, toInt(pauseCalls), "pause callback should be called on shutdown") - assert.Equal(t, 2, toInt(resumeCalls), "resume callback should not be called on shutdown") + assert.Equal(t, 3, toInt(resumeCalls), "resume callback should be called on shutdown") }, 5*time.Second, 10*time.Millisecond, "pause/resume callbacks not called correctly") })