Skip to content

Commit

Permalink
Merge pull request #3 from bi-zone/fix_schedule_race
Browse files Browse the repository at this point in the history
Fix .Schedule race
  • Loading branch information
yalegko authored Sep 2, 2021
2 parents 6a9aad7 + 3438a1b commit bd11807
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 2 deletions.
11 changes: 9 additions & 2 deletions group.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,11 @@ func New(ctx context.Context, opts ...Option) *Group {
ctx: ctx,
cancel: cancel,

addC: make(chan struct{}),
// N.B. We are making `addC` buffered, so `g.monitor` function won't miss
// a notification being busy. We could however get an unneeded push in `addC`
// when we already were notified by the `timer`, but one excess loop run
// seems irrelevant comparing to the tasks hanging in the heap due to a race.
addC: make(chan struct{}, 1),
lenC: make(chan int),
}

Expand Down Expand Up @@ -187,7 +191,10 @@ func (g *Group) monitor(ctx context.Context) {
// Start any tasks that are ready as of now.
next := g.trigger(now)
if !next.IsZero() {
// Wait until the next scheduled task is ready.
// Set an alarm when the next scheduled task is ready.
//
// Note, that we are resetting ticker incorrectly (according for the docs), but as long as
// we don't mind vs exceess loop spin ahead of time, it's easier to read than the canonical variant.
t.Reset(next.Sub(now))
tickC = t.C
} else {
Expand Down
47 changes: 47 additions & 0 deletions group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"context"
"fmt"
"log"
"runtime"
"sync"
"sync/atomic"
"testing"
Expand Down Expand Up @@ -376,6 +377,52 @@ func TestDeadLock(t *testing.T) {
_ = sg.Wait()
}

func TestScheduleWaitRace(t *testing.T) {
t.Parallel()

// Prior to that commit there was a race condition where `g.monitor` function
// going from `g.trigger(now)` to the `select` could miss a notification
// from the `g.addC`.
//
// Here we are going to ensure that whenever we are adding new values to
// the schedule -- they are going to be executed. For the check we will
// create a group and schedule the only task immediately after creation.
// Task delay doesn't matter, cos in our race we are missing it's scheduling.
testFunc := func() {
sg := schedgroup.New(context.Background())

done := make(chan struct{}, 1)
sg.Delay(0, func() {
close(done)
})
const deadline = time.Second
select {
case <-done:
case <-time.After(deadline):
panicf("Task with 0 delay failed to execute in %s!", deadline)
}

if err := sg.Wait(); err != nil {
panicf("Failed to wait: %v", err)
}
}

// Capturing a race is always flaky, so spawn multiple goroutines and
// repeat `testFunc` logic multiple times.
n := runtime.GOMAXPROCS(-1)
var wg sync.WaitGroup
wg.Add(n)
for i := 0; i < n; i++ {
go func() {
defer wg.Done()
for i := 0; i < 100_000; i++ {
testFunc()
}
}()
}
wg.Wait()
}

// This example demonstrates typical use of a Group.
func ExampleGroup_wait() {
// Create a Group which will not use a context for cancelation.
Expand Down

0 comments on commit bd11807

Please sign in to comment.