Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add Concurrency entity for worker #1405

Closed
wants to merge 13 commits into from
7 changes: 5 additions & 2 deletions internal/worker/concurrency.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
"context"
"fmt"
"sync"
"time"

"github.com/marusama/semaphore/v2"
)
Expand Down Expand Up @@ -56,14 +57,16 @@
}

// Acquire is blocking until a permit is acquired or returns error after context is done
// Remember to call Release(count) to release the permit after usage
func (p *permit) Acquire(ctx context.Context, count int) error {
if err := p.sem.Acquire(ctx, count); err != nil {
return fmt.Errorf("failed to acquire permit before context is done: %w", err)
}
return nil
}

// AcquireChan returns a permit ready channel. It's closed then permit is acquired
// AcquireChan returns a permit ready channel. Similar to Acquire, but non-blocking.
// Remember to call Release(1) to release the permit after usage
func (p *permit) AcquireChan(ctx context.Context, wg *sync.WaitGroup) <-chan struct{} {
ch := make(chan struct{})
wg.Add(1)
Expand All @@ -74,8 +77,8 @@
}
select { // try to send to channel, but don't block if listener is gone
case ch <- struct{}{}:
default:
case <-time.After(10 * time.Millisecond): // wait time is needed to avoid race condition of channel sending
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

found a race condition of channel sending. Adding a wait time would be more reliable

Copy link
Member

@Groxx Groxx Nov 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm :\ unfortunately this means that if processing is delayed by 10ms it will block the chan-reader forever. that's not too unlikely with big CPU spikes, and definitely not impossible.

tbh I think that might rule this impl out entirely. though I think it's possible to build a AcquireChan(...) (<-chan struct{}, cancel func()) that doesn't have this issue, and that might be worth doing.

or we might have to embrace the atomic-like behavior around this and add retries to (*baseWorker).runPoller / anything using AcquireChan. that wouldn't be a fatal constraint afaict, though it's not ideal.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we expect chan-reader to always consume from the returned ch unless ctx is canceled then we can replace this goroutine implementation with

defer wg.Done()
if err := p.sem.Acquire(ctx, 1); err != nil { 
   return // assuming Acquire only returns err if ctx.Done
}
select {
case ch <- struct{}{}:
case <-ctx.Done():
  p.sem.Release(1)
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed through Taylan's suggestion. It should be safe now.

p.sem.Release(1)

Check warning on line 81 in internal/worker/concurrency.go

View check run for this annotation

Codecov / codecov/patch

internal/worker/concurrency.go#L80-L81

Added lines #L80 - L81 were not covered by tests
}
}()
return ch
Expand Down
75 changes: 42 additions & 33 deletions internal/worker/concurrency_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,50 +36,58 @@ func TestPermit_Simulation(t *testing.T) {
tests := []struct {
name string
capacity []int // update every 50ms
goroutines int // each would block on acquiring 2-6 tokens for 100ms
goroutinesAcquireChan int // each would block using AcquireChan for 100ms
goroutines int // each would block on acquiring 1-2 tokens for 100-150ms
goroutinesAcquireChan int // each would block using AcquireChan for 100-150ms
maxTestDuration time.Duration
expectFailures int
expectFailuresAtLeast int
expectFailuresRange []int // range of failures, inclusive [min, max]
}{
{
name: "enough permit, no blocking",
maxTestDuration: 100 * time.Millisecond,
capacity: []int{1000},
maxTestDuration: 200 * time.Millisecond, // at most need 150 ms, add 50 ms buffer
capacity: []int{10000},
goroutines: 100,
goroutinesAcquireChan: 100,
expectFailures: 0,
expectFailuresRange: []int{0, 0},
},
{
name: "not enough permit, blocking but all acquire",
maxTestDuration: 1 * time.Second,
maxTestDuration: 1200 * time.Millisecond, // at most need 150ms * (1000 + 500) / 200 = 1125ms to acquire all permit
capacity: []int{200},
goroutines: 500,
goroutinesAcquireChan: 500,
expectFailures: 0,
goroutines: 500, // at most 1000 tokens
goroutinesAcquireChan: 500, // 500 tokens
expectFailuresRange: []int{0, 0},
},
{
name: "not enough permit for some to acquire, fail some",
maxTestDuration: 300 * time.Millisecond,
capacity: []int{100},
maxTestDuration: 250 * time.Millisecond, // at least need 100ms * (500 + 500) / 200 = 250ms to acquire all permit
capacity: []int{200},
goroutines: 500,
goroutinesAcquireChan: 500,
expectFailuresAtLeast: 1,
expectFailuresRange: []int{1, 999}, // should at least pass some acquires
},
{
name: "not enough permit at beginning but due to capacity change, blocking but all acquire",
maxTestDuration: 300 * time.Millisecond,
capacity: []int{100, 300, 500},
maxTestDuration: 250 * time.Millisecond,
capacity: []int{200, 400, 600},
goroutines: 500,
goroutinesAcquireChan: 500,
expectFailuresRange: []int{0, 0},
},
{
name: "enough permit at beginning but due to capacity change, some would fail",
maxTestDuration: 250 * time.Millisecond,
capacity: []int{600, 400, 200},
goroutines: 500,
goroutinesAcquireChan: 500,
expectFailures: 0,
expectFailuresRange: []int{1, 999},
},
{
name: "not enough permit for any acquire, fail all",
maxTestDuration: 300 * time.Millisecond,
capacity: []int{0},
goroutines: 1000,
expectFailures: 1000,
name: "not enough permit for any acquire, fail all",
maxTestDuration: 300 * time.Millisecond,
capacity: []int{0},
goroutines: 500,
goroutinesAcquireChan: 500,
expectFailuresRange: []int{1000, 1000},
},
}

Expand All @@ -102,13 +110,12 @@ func TestPermit_Simulation(t *testing.T) {
wg.Add(1)
go func() {
defer wg.Done()
num := rand.Intn(2) + 2
// num := 1
num := rand.Intn(2) + 1
if err := permit.Acquire(ctx, num); err != nil {
failures.Add(1)
failures.Inc()
return
}
time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
time.Sleep(time.Duration(100+rand.Intn(50)) * time.Millisecond)
permit.Release(num)
}()
}
Expand All @@ -118,22 +125,24 @@ func TestPermit_Simulation(t *testing.T) {
defer wg.Done()
select {
case <-permit.AcquireChan(ctx, wg):
time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
time.Sleep(time.Duration(100+rand.Intn(50)) * time.Millisecond)
permit.Release(1)
case <-ctx.Done():
failures.Add(1)
failures.Inc()
}
}()
}

wg.Wait()
// sanity check
assert.Equal(t, 0, permit.Count())
if tt.expectFailuresAtLeast > 0 {
assert.LessOrEqual(t, tt.expectFailuresAtLeast, int(failures.Load()))
} else {
assert.Equal(t, tt.expectFailures, int(failures.Load()))
}
assert.Equal(t, tt.capacity[len(tt.capacity)-1], permit.Quota())

// expect failures in range
expectFailureMin := tt.expectFailuresRange[0]
expectFailureMax := tt.expectFailuresRange[1]
assert.GreaterOrEqual(t, int(failures.Load()), expectFailureMin)
assert.LessOrEqual(t, int(failures.Load()), expectFailureMax)
})
}
}