Skip to content

Commit

Permalink
Test the pausing scheduler
Browse files Browse the repository at this point in the history
  • Loading branch information
thampiotr committed Nov 12, 2024
1 parent 9957e33 commit 3338ffd
Show file tree
Hide file tree
Showing 4 changed files with 145 additions and 45 deletions.
33 changes: 15 additions & 18 deletions internal/component/otelcol/internal/lazyconsumer/lazyconsumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
type Consumer struct {
ctx context.Context

// pauseMut is used to implement Pause & Resume semantics. See Pause method for more info.
// pauseMut and pausedWg are used to implement Pause & Resume semantics. See Pause method for more info.
pauseMut sync.RWMutex
pausedWg *sync.WaitGroup

Expand Down Expand Up @@ -63,11 +63,7 @@ func (c *Consumer) ConsumeTraces(ctx context.Context, td ptrace.Traces) error {
return c.ctx.Err()
}

c.pauseMut.RLock() // wait until resumed
defer c.pauseMut.RUnlock()
if c.pausedWg != nil {
c.pausedWg.Wait()
}
c.waitUntilResumed()

c.mut.RLock()
defer c.mut.RUnlock()
Expand All @@ -90,11 +86,7 @@ func (c *Consumer) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error
return c.ctx.Err()
}

c.pauseMut.RLock() // wait until resumed
defer c.pauseMut.RUnlock()
if c.pausedWg != nil {
c.pausedWg.Wait()
}
c.waitUntilResumed()

c.mut.RLock()
defer c.mut.RUnlock()
Expand All @@ -117,11 +109,7 @@ func (c *Consumer) ConsumeLogs(ctx context.Context, ld plog.Logs) error {
return c.ctx.Err()
}

c.pauseMut.RLock() // wait until resumed
defer c.pauseMut.RUnlock()
if c.pausedWg != nil {
c.pausedWg.Wait()
}
c.waitUntilResumed()

c.mut.RLock()
defer c.mut.RUnlock()
Expand All @@ -138,6 +126,15 @@ func (c *Consumer) ConsumeLogs(ctx context.Context, ld plog.Logs) error {
return c.logsConsumer.ConsumeLogs(ctx, ld)
}

func (c *Consumer) waitUntilResumed() {
c.pauseMut.RLock()
pausedWg := c.pausedWg
c.pauseMut.RUnlock()
if pausedWg != nil {
pausedWg.Wait()
}
}

// SetConsumers updates the internal consumers that Consumer will forward data
// to. It is valid for any combination of m, l, and t to be nil.
func (c *Consumer) SetConsumers(t otelconsumer.Traces, m otelconsumer.Metrics, l otelconsumer.Logs) {
Expand Down Expand Up @@ -169,10 +166,10 @@ func (c *Consumer) Resume() {
defer c.pauseMut.Unlock()

if c.pausedWg == nil {
return // not paused
return // already resumed
}

c.pausedWg.Done()
c.pausedWg.Done() // release all waiting
c.pausedWg = nil
}

Expand Down
150 changes: 126 additions & 24 deletions internal/component/otelcol/internal/lazyconsumer/lazyconsumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,21 @@ package lazyconsumer

import (
"context"
"sync"
"testing"
"time"

"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/ptrace"
"go.uber.org/goleak"

"github.com/grafana/alloy/internal/runtime/componenttest"
)

func Test_PauseAndResume(t *testing.T) {
c := New(context.Background())
c := New(componenttest.TestContext(t))
require.False(t, c.IsPaused())
c.Pause()
require.True(t, c.IsPaused())
Expand All @@ -17,14 +25,14 @@ func Test_PauseAndResume(t *testing.T) {
}

func Test_NewPaused(t *testing.T) {
c := NewPaused(context.Background())
c := NewPaused(componenttest.TestContext(t))
require.True(t, c.IsPaused())
c.Resume()
require.False(t, c.IsPaused())
}

func Test_PauseResume_MultipleCalls(t *testing.T) {
c := New(context.Background())
c := New(componenttest.TestContext(t))
require.False(t, c.IsPaused())
c.Pause()
c.Pause()
Expand All @@ -36,24 +44,118 @@ func Test_PauseResume_MultipleCalls(t *testing.T) {
require.False(t, c.IsPaused())
}

// func Test_PauseResume_Multithreaded(t *testing.T) {
// TODO(thampiotr): implement this test
// ctx := componenttest.TestContext(t)
// routines := 10
//
// pauses
//
// for i := 0; i < routines; i++ {
// go func() {
// for {
// select {
// case <-ctx.Done():
// return
// }
// }
// }()
// }
//
//
//
// }
func Test_ConsumeWaitsForResume(t *testing.T) {
goleak.VerifyNone(t, goleak.IgnoreCurrent())
c := NewPaused(componenttest.TestContext(t))
require.True(t, c.IsPaused())

method := map[string]func(){
"ConsumeTraces": func() {
_ = c.ConsumeTraces(nil, ptrace.NewTraces())
},
"ConsumeMetrics": func() {
_ = c.ConsumeMetrics(nil, pmetric.NewMetrics())
},
"ConsumeLogs": func() {
_ = c.ConsumeLogs(nil, plog.NewLogs())
},
}

for name, fn := range method {
t.Run(name, func(t *testing.T) {
c.Pause()
require.True(t, c.IsPaused())

started := make(chan struct{})
finished := make(chan struct{})

// Start goroutine that attempts to run Consume* method
go func() {
started <- struct{}{}
fn()
finished <- struct{}{}
}()

// Wait to be started
select {
case <-started:
case <-time.After(5 * time.Second):
t.Fatal("consumer goroutine never started")
}

// Wait for a bit to ensure the consumer is blocking on Consume* function
select {
case <-finished:
t.Fatal("consumer should not have finished yet - it's paused")
case <-time.After(100 * time.Millisecond):
}

// Resume the consumer and verify the Consume* function unblocked
c.Resume()
select {
case <-finished:
case <-time.After(5 * time.Second):
t.Fatal("consumer should have finished after resuming")
}

})
}
}

func Test_PauseResume_Multithreaded(t *testing.T) {
goleak.VerifyNone(t, goleak.IgnoreCurrent())
ctx, cancel := context.WithCancel(componenttest.TestContext(t))
runs := 500
routines := 5
allDone := sync.WaitGroup{}

c := NewPaused(componenttest.TestContext(t))
require.True(t, c.IsPaused())

// Run goroutines that constantly try to call Consume* methods
for i := 0; i < routines; i++ {
allDone.Add(1)
go func() {
for {
select {
case <-ctx.Done():
allDone.Done()
return
default:
_ = c.ConsumeLogs(ctx, plog.NewLogs())
_ = c.ConsumeMetrics(ctx, pmetric.NewMetrics())
_ = c.ConsumeTraces(ctx, ptrace.NewTraces())
}
}
}()
}

// Run goroutines that Pause and then Resume in parallel.
// In particular, this verifies we can call .Pause() and .Resume() on an already paused or already resumed consumer.
workChan := make(chan struct{}, routines)
for i := 0; i < routines; i++ {
allDone.Add(1)
go func() {
for {
select {
case <-workChan:
c.Pause()
c.Resume()
case <-ctx.Done():
allDone.Done()
return
}
}
}()
}

for i := 0; i < runs; i++ {
workChan <- struct{}{}
}
cancel()

allDone.Wait()

// Should not be paused as last call will always be c.Resume()
require.False(t, c.IsPaused())
}
5 changes: 3 additions & 2 deletions internal/component/otelcol/internal/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ func New(l log.Logger) *Scheduler {

// NewWithPauseCallbacks is like New, but allows to specify onPause and onResume callbacks. The scheduler is assumed to
// start paused and only when its components are scheduled, it will call onResume. From then on, each update to running
// components via Schedule method will trigger a call to onPause and then onResume.
// components via Schedule method will trigger a call to onPause and then onResume. When scheduler is shutting down, it
// will call onResume as a last step.
func NewWithPauseCallbacks(l log.Logger, onPause func(), onResume func()) *Scheduler {
return &Scheduler{
log: l,
Expand Down Expand Up @@ -104,7 +105,7 @@ func (cs *Scheduler) Run(ctx context.Context) error {
cs.onPause()
}
cs.stopComponents(context.Background(), components...)
// We don't resume, as the scheduler is exiting.
cs.onResume()
}()

// Wait for a write to cs.newComponentsCh. The initial list of components is
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func TestScheduler(t *testing.T) {

require.EventuallyWithT(t, func(t *assert.CollectT) {
assert.Equal(t, 2, toInt(pauseCalls), "pause callback should be called on shutdown")
assert.Equal(t, 2, toInt(resumeCalls), "resume callback should not be called on shutdown")
assert.Equal(t, 3, toInt(resumeCalls), "resume callback should be called on shutdown")
}, 5*time.Second, 10*time.Millisecond, "pause/resume callbacks not called correctly")
})

Expand Down

0 comments on commit 3338ffd

Please sign in to comment.