diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml
index 8a560b71..c9fb974e 100644
--- a/.github/workflows/test.yml
+++ b/.github/workflows/test.yml
@@ -55,7 +55,7 @@ jobs:
strategy:
fail-fast: false
matrix:
- go: [1.16, 1.23]
+ go: [1.18, 1.23]
os: [ubuntu-latest, macos-latest, windows-latest]
name: Go ${{ matrix.go }} @ ${{ matrix.os }}
runs-on: ${{ matrix.os}}
diff --git a/README.md b/README.md
index 78c77e43..ccdff4ad 100644
--- a/README.md
+++ b/README.md
@@ -7,7 +7,7 @@
-
+
@@ -78,7 +78,7 @@ import (
var sum int32
-func myFunc(i interface{}) {
+func myFunc(i any) {
n := i.(int32)
atomic.AddInt32(&sum, n)
fmt.Printf("run with %d\n", n)
@@ -110,7 +110,7 @@ func main() {
// Use the pool with a function,
// set 10 to the capacity of goroutine pool and 1 second for expired duration.
- p, _ := ants.NewPoolWithFunc(10, func(i interface{}) {
+ p, _ := ants.NewPoolWithFunc(10, func(i any) {
myFunc(i)
wg.Done()
})
@@ -141,7 +141,7 @@ func main() {
fmt.Printf("finish all tasks.\n")
// Use the MultiPoolFunc and set the capacity of 10 goroutine pools to (runTimes/10).
- mpf, _ := ants.NewMultiPoolWithFunc(10, runTimes/10, func(i interface{}) {
+ mpf, _ := ants.NewMultiPoolWithFunc(10, runTimes/10, func(i any) {
myFunc(i)
wg.Done()
}, ants.LeastTasks)
@@ -186,7 +186,7 @@ type Options struct {
// PanicHandler is used to handle panics from each worker goroutine.
// if nil, panics will be thrown out again from worker goroutines.
- PanicHandler func(interface{})
+ PanicHandler func(any)
// Logger is the customized logger for logging info, if it is not set,
// default standard logger from log package is used.
@@ -229,7 +229,7 @@ func WithNonblocking(nonblocking bool) Option {
}
// WithPanicHandler sets up panic handler.
-func WithPanicHandler(panicHandler func(interface{})) Option {
+func WithPanicHandler(panicHandler func(any)) Option {
return func(opts *Options) {
opts.PanicHandler = panicHandler
}
diff --git a/README_ZH.md b/README_ZH.md
index e422f74c..275e539c 100644
--- a/README_ZH.md
+++ b/README_ZH.md
@@ -7,7 +7,7 @@
-
+
@@ -78,7 +78,7 @@ import (
var sum int32
-func myFunc(i interface{}) {
+func myFunc(i any) {
n := i.(int32)
atomic.AddInt32(&sum, n)
fmt.Printf("run with %d\n", n)
@@ -110,7 +110,7 @@ func main() {
// Use the pool with a function,
// set 10 to the capacity of goroutine pool and 1 second for expired duration.
- p, _ := ants.NewPoolWithFunc(10, func(i interface{}) {
+ p, _ := ants.NewPoolWithFunc(10, func(i any) {
myFunc(i)
wg.Done()
})
@@ -141,7 +141,7 @@ func main() {
fmt.Printf("finish all tasks.\n")
// Use the MultiPoolFunc and set the capacity of 10 goroutine pools to (runTimes/10).
- mpf, _ := ants.NewMultiPoolWithFunc(10, runTimes/10, func(i interface{}) {
+ mpf, _ := ants.NewMultiPoolWithFunc(10, runTimes/10, func(i any) {
myFunc(i)
wg.Done()
}, ants.LeastTasks)
@@ -186,7 +186,7 @@ type Options struct {
// PanicHandler is used to handle panics from each worker goroutine.
// if nil, panics will be thrown out again from worker goroutines.
- PanicHandler func(interface{})
+ PanicHandler func(any)
// Logger is the customized logger for logging info, if it is not set,
// default standard logger from log package is used.
@@ -229,7 +229,7 @@ func WithNonblocking(nonblocking bool) Option {
}
// WithPanicHandler sets up panic handler.
-func WithPanicHandler(panicHandler func(interface{})) Option {
+func WithPanicHandler(panicHandler func(any)) Option {
return func(opts *Options) {
opts.PanicHandler = panicHandler
}
diff --git a/ants.go b/ants.go
index fa68482c..d67ab5c7 100644
--- a/ants.go
+++ b/ants.go
@@ -30,12 +30,17 @@
package ants
import (
+ "context"
"errors"
"log"
"math"
"os"
"runtime"
+ "sync"
+ "sync/atomic"
"time"
+
+ syncx "github.com/panjf2000/ants/v2/pkg/sync"
)
const (
@@ -101,14 +106,6 @@ var (
defaultAntsPool, _ = NewPool(DefaultAntsPoolSize)
)
-const nowTimeUpdateInterval = 500 * time.Millisecond
-
-// Logger is used for logging formatted messages.
-type Logger interface {
- // Printf must have the same semantics as log.Printf.
- Printf(format string, args ...interface{})
-}
-
// Submit submits a task to pool.
func Submit(task func()) error {
return defaultAntsPool.Submit(task)
@@ -143,3 +140,382 @@ func ReleaseTimeout(timeout time.Duration) error {
func Reboot() {
defaultAntsPool.Reboot()
}
+
+// Logger is used for logging formatted messages.
+type Logger interface {
+ // Printf must have the same semantics as log.Printf.
+ Printf(format string, args ...any)
+}
+
+// poolCommon contains all common fields for other sophisticated pools.
+type poolCommon struct {
+ // capacity of the pool, a negative value means that the capacity of pool is limitless, an infinite pool is used to
+ // avoid potential issue of endless blocking caused by nested usage of a pool: submitting a task to pool
+ // which submits a new task to the same pool.
+ capacity int32
+
+ // running is the number of the currently running goroutines.
+ running int32
+
+ // lock for protecting the worker queue.
+ lock sync.Locker
+
+ // workers is a slice that store the available workers.
+ workers workerQueue
+
+ // state is used to notice the pool to closed itself.
+ state int32
+
+ // cond for waiting to get an idle worker.
+ cond *sync.Cond
+
+ // done is used to indicate that all workers are done.
+ allDone chan struct{}
+ // once is used to make sure the pool is closed just once.
+ once *sync.Once
+
+ // workerCache speeds up the obtainment of a usable worker in function:retrieveWorker.
+ workerCache sync.Pool
+
+ // waiting is the number of goroutines already been blocked on pool.Submit(), protected by pool.lock
+ waiting int32
+
+ purgeDone int32
+ purgeCtx context.Context
+ stopPurge context.CancelFunc
+
+ ticktockDone int32
+ ticktockCtx context.Context
+ stopTicktock context.CancelFunc
+
+ now atomic.Value
+
+ options *Options
+}
+
+func newPool(size int, options ...Option) (*poolCommon, error) {
+ if size <= 0 {
+ size = -1
+ }
+
+ opts := loadOptions(options...)
+
+ if !opts.DisablePurge {
+ if expiry := opts.ExpiryDuration; expiry < 0 {
+ return nil, ErrInvalidPoolExpiry
+ } else if expiry == 0 {
+ opts.ExpiryDuration = DefaultCleanIntervalTime
+ }
+ }
+
+ if opts.Logger == nil {
+ opts.Logger = defaultLogger
+ }
+
+ p := &poolCommon{
+ capacity: int32(size),
+ allDone: make(chan struct{}),
+ lock: syncx.NewSpinLock(),
+ once: &sync.Once{},
+ options: opts,
+ }
+ if p.options.PreAlloc {
+ if size == -1 {
+ return nil, ErrInvalidPreAllocSize
+ }
+ p.workers = newWorkerQueue(queueTypeLoopQueue, size)
+ } else {
+ p.workers = newWorkerQueue(queueTypeStack, 0)
+ }
+
+ p.cond = sync.NewCond(p.lock)
+
+ p.goPurge()
+ p.goTicktock()
+
+ return p, nil
+}
+
+// purgeStaleWorkers clears stale workers periodically, it runs in an individual goroutine, as a scavenger.
+func (p *poolCommon) purgeStaleWorkers() {
+ ticker := time.NewTicker(p.options.ExpiryDuration)
+
+ defer func() {
+ ticker.Stop()
+ atomic.StoreInt32(&p.purgeDone, 1)
+ }()
+
+ purgeCtx := p.purgeCtx // copy to the local variable to avoid race from Reboot()
+ for {
+ select {
+ case <-purgeCtx.Done():
+ return
+ case <-ticker.C:
+ }
+
+ if p.IsClosed() {
+ break
+ }
+
+ var isDormant bool
+ p.lock.Lock()
+ staleWorkers := p.workers.refresh(p.options.ExpiryDuration)
+ n := p.Running()
+ isDormant = n == 0 || n == len(staleWorkers)
+ p.lock.Unlock()
+
+ // Clean up the stale workers.
+ for i := range staleWorkers {
+ staleWorkers[i].finish()
+ staleWorkers[i] = nil
+ }
+
+ // There might be a situation where all workers have been cleaned up (no worker is running),
+ // while some invokers still are stuck in p.cond.Wait(), then we need to awake those invokers.
+ if isDormant && p.Waiting() > 0 {
+ p.cond.Broadcast()
+ }
+ }
+}
+
+const nowTimeUpdateInterval = 500 * time.Millisecond
+
+// ticktock is a goroutine that updates the current time in the pool regularly.
+func (p *poolCommon) ticktock() {
+ ticker := time.NewTicker(nowTimeUpdateInterval)
+ defer func() {
+ ticker.Stop()
+ atomic.StoreInt32(&p.ticktockDone, 1)
+ }()
+
+ ticktockCtx := p.ticktockCtx // copy to the local variable to avoid race from Reboot()
+ for {
+ select {
+ case <-ticktockCtx.Done():
+ return
+ case <-ticker.C:
+ }
+
+ if p.IsClosed() {
+ break
+ }
+
+ p.now.Store(time.Now())
+ }
+}
+
+func (p *poolCommon) goPurge() {
+ if p.options.DisablePurge {
+ return
+ }
+
+ // Start a goroutine to clean up expired workers periodically.
+ p.purgeCtx, p.stopPurge = context.WithCancel(context.Background())
+ go p.purgeStaleWorkers()
+}
+
+func (p *poolCommon) goTicktock() {
+ p.now.Store(time.Now())
+ p.ticktockCtx, p.stopTicktock = context.WithCancel(context.Background())
+ go p.ticktock()
+}
+
+func (p *poolCommon) nowTime() time.Time {
+ return p.now.Load().(time.Time)
+}
+
+// Running returns the number of workers currently running.
+func (p *poolCommon) Running() int {
+ return int(atomic.LoadInt32(&p.running))
+}
+
+// Free returns the number of available workers, -1 indicates this pool is unlimited.
+func (p *poolCommon) Free() int {
+ c := p.Cap()
+ if c < 0 {
+ return -1
+ }
+ return c - p.Running()
+}
+
+// Waiting returns the number of tasks waiting to be executed.
+func (p *poolCommon) Waiting() int {
+ return int(atomic.LoadInt32(&p.waiting))
+}
+
+// Cap returns the capacity of this pool.
+func (p *poolCommon) Cap() int {
+ return int(atomic.LoadInt32(&p.capacity))
+}
+
+// Tune changes the capacity of this pool, note that it is noneffective to the infinite or pre-allocation pool.
+func (p *poolCommon) Tune(size int) {
+ capacity := p.Cap()
+ if capacity == -1 || size <= 0 || size == capacity || p.options.PreAlloc {
+ return
+ }
+ atomic.StoreInt32(&p.capacity, int32(size))
+ if size > capacity {
+ if size-capacity == 1 {
+ p.cond.Signal()
+ return
+ }
+ p.cond.Broadcast()
+ }
+}
+
+// IsClosed indicates whether the pool is closed.
+func (p *poolCommon) IsClosed() bool {
+ return atomic.LoadInt32(&p.state) == CLOSED
+}
+
+// Release closes this pool and releases the worker queue.
+func (p *poolCommon) Release() {
+ if !atomic.CompareAndSwapInt32(&p.state, OPENED, CLOSED) {
+ return
+ }
+
+ if p.stopPurge != nil {
+ p.stopPurge()
+ p.stopPurge = nil
+ }
+ if p.stopTicktock != nil {
+ p.stopTicktock()
+ p.stopTicktock = nil
+ }
+
+ p.lock.Lock()
+ p.workers.reset()
+ p.lock.Unlock()
+ // There might be some callers waiting in retrieveWorker(), so we need to wake them up to prevent
+ // those callers blocking infinitely.
+ p.cond.Broadcast()
+}
+
+// ReleaseTimeout is like Release but with a timeout, it waits all workers to exit before timing out.
+func (p *poolCommon) ReleaseTimeout(timeout time.Duration) error {
+ if p.IsClosed() || (!p.options.DisablePurge && p.stopPurge == nil) || p.stopTicktock == nil {
+ return ErrPoolClosed
+ }
+
+ p.Release()
+
+ var purgeCh <-chan struct{}
+ if !p.options.DisablePurge {
+ purgeCh = p.purgeCtx.Done()
+ } else {
+ purgeCh = p.allDone
+ }
+
+ if p.Running() == 0 {
+ p.once.Do(func() {
+ close(p.allDone)
+ })
+ }
+
+ timer := time.NewTimer(timeout)
+ defer timer.Stop()
+ for {
+ select {
+ case <-timer.C:
+ return ErrTimeout
+ case <-p.allDone:
+ <-purgeCh
+ <-p.ticktockCtx.Done()
+ if p.Running() == 0 &&
+ (p.options.DisablePurge || atomic.LoadInt32(&p.purgeDone) == 1) &&
+ atomic.LoadInt32(&p.ticktockDone) == 1 {
+ return nil
+ }
+ }
+ }
+}
+
+// Reboot reboots a closed pool, it does nothing if the pool is not closed.
+// If you intend to reboot a closed pool, use ReleaseTimeout() instead of
+// Release() to ensure that all workers are stopped and resource are released
+// before rebooting, otherwise you may run into data race.
+func (p *poolCommon) Reboot() {
+ if atomic.CompareAndSwapInt32(&p.state, CLOSED, OPENED) {
+ atomic.StoreInt32(&p.purgeDone, 0)
+ p.goPurge()
+ atomic.StoreInt32(&p.ticktockDone, 0)
+ p.goTicktock()
+ p.allDone = make(chan struct{})
+ p.once = &sync.Once{}
+ }
+}
+
+func (p *poolCommon) addRunning(delta int) int {
+ return int(atomic.AddInt32(&p.running, int32(delta)))
+}
+
+func (p *poolCommon) addWaiting(delta int) {
+ atomic.AddInt32(&p.waiting, int32(delta))
+}
+
+// retrieveWorker returns an available worker to run the tasks.
+func (p *poolCommon) retrieveWorker() (w worker, err error) {
+ p.lock.Lock()
+
+retry:
+ // First try to fetch the worker from the queue.
+ if w = p.workers.detach(); w != nil {
+ p.lock.Unlock()
+ return
+ }
+
+ // If the worker queue is empty, and we don't run out of the pool capacity,
+ // then just spawn a new worker goroutine.
+ if capacity := p.Cap(); capacity == -1 || capacity > p.Running() {
+ p.lock.Unlock()
+ w = p.workerCache.Get().(worker)
+ w.run()
+ return
+ }
+
+ // Bail out early if it's in nonblocking mode or the number of pending callers reaches the maximum limit value.
+ if p.options.Nonblocking || (p.options.MaxBlockingTasks != 0 && p.Waiting() >= p.options.MaxBlockingTasks) {
+ p.lock.Unlock()
+ return nil, ErrPoolOverload
+ }
+
+ // Otherwise, we'll have to keep them blocked and wait for at least one worker to be put back into pool.
+ p.addWaiting(1)
+ p.cond.Wait() // block and wait for an available worker
+ p.addWaiting(-1)
+
+ if p.IsClosed() {
+ p.lock.Unlock()
+ return nil, ErrPoolClosed
+ }
+
+ goto retry
+}
+
+// revertWorker puts a worker back into free pool, recycling the goroutines.
+func (p *poolCommon) revertWorker(worker worker) bool {
+ if capacity := p.Cap(); (capacity > 0 && p.Running() > capacity) || p.IsClosed() {
+ p.cond.Broadcast()
+ return false
+ }
+
+ worker.setLastUsedTime(p.nowTime())
+
+ p.lock.Lock()
+ // To avoid memory leaks, add a double check in the lock scope.
+ // Issue: https://github.com/panjf2000/ants/issues/113
+ if p.IsClosed() {
+ p.lock.Unlock()
+ return false
+ }
+ if err := p.workers.insert(worker); err != nil {
+ p.lock.Unlock()
+ return false
+ }
+ // Notify the invoker stuck in 'retrieveWorker()' of there is an available worker in the worker queue.
+ p.cond.Signal()
+ p.lock.Unlock()
+
+ return true
+}
diff --git a/ants_benchmark_test.go b/ants_benchmark_test.go
index f39b0e67..1dcc8dde 100644
--- a/ants_benchmark_test.go
+++ b/ants_benchmark_test.go
@@ -43,7 +43,7 @@ func demoFunc() {
time.Sleep(time.Duration(BenchParam) * time.Millisecond)
}
-func demoPoolFunc(args interface{}) {
+func demoPoolFunc(args any) {
n := args.(int)
time.Sleep(time.Duration(n) * time.Millisecond)
}
@@ -58,7 +58,7 @@ func longRunningFunc() {
var stopLongRunningPoolFunc int32
-func longRunningPoolFunc(arg interface{}) {
+func longRunningPoolFunc(arg any) {
if ch, ok := arg.(chan struct{}); ok {
<-ch
return
diff --git a/ants_test.go b/ants_test.go
index 52972437..7909ea2a 100644
--- a/ants_test.go
+++ b/ants_test.go
@@ -93,7 +93,7 @@ func TestAntsPoolWaitToGetWorkerPreMalloc(t *testing.T) {
// TestAntsPoolWithFuncWaitToGetWorker is used to test waiting to get worker.
func TestAntsPoolWithFuncWaitToGetWorker(t *testing.T) {
var wg sync.WaitGroup
- p, _ := NewPoolWithFunc(AntsSize, func(i interface{}) {
+ p, _ := NewPoolWithFunc(AntsSize, func(i any) {
demoPoolFunc(i)
wg.Done()
})
@@ -113,7 +113,7 @@ func TestAntsPoolWithFuncWaitToGetWorker(t *testing.T) {
func TestAntsPoolWithFuncWaitToGetWorkerPreMalloc(t *testing.T) {
var wg sync.WaitGroup
- p, _ := NewPoolWithFunc(AntsSize, func(i interface{}) {
+ p, _ := NewPoolWithFunc(AntsSize, func(i any) {
demoPoolFunc(i)
wg.Done()
}, WithPreAlloc(true))
@@ -227,7 +227,7 @@ func TestAntsPool(t *testing.T) {
func TestPanicHandler(t *testing.T) {
var panicCounter int64
var wg sync.WaitGroup
- p0, err := NewPool(10, WithPanicHandler(func(p interface{}) {
+ p0, err := NewPool(10, WithPanicHandler(func(p any) {
defer wg.Done()
atomic.AddInt64(&panicCounter, 1)
t.Logf("catch panic with PanicHandler: %v", p)
@@ -242,7 +242,7 @@ func TestPanicHandler(t *testing.T) {
c := atomic.LoadInt64(&panicCounter)
assert.EqualValuesf(t, 1, c, "panic handler didn't work, panicCounter: %d", c)
assert.EqualValues(t, 0, p0.Running(), "pool should be empty after panic")
- p1, err := NewPoolWithFunc(10, func(p interface{}) { panic(p) }, WithPanicHandler(func(_ interface{}) {
+ p1, err := NewPoolWithFunc(10, func(p any) { panic(p) }, WithPanicHandler(func(_ any) {
defer wg.Done()
atomic.AddInt64(&panicCounter, 1)
}))
@@ -259,7 +259,7 @@ func TestPanicHandler(t *testing.T) {
func TestPanicHandlerPreMalloc(t *testing.T) {
var panicCounter int64
var wg sync.WaitGroup
- p0, err := NewPool(10, WithPreAlloc(true), WithPanicHandler(func(p interface{}) {
+ p0, err := NewPool(10, WithPreAlloc(true), WithPanicHandler(func(p any) {
defer wg.Done()
atomic.AddInt64(&panicCounter, 1)
t.Logf("catch panic with PanicHandler: %v", p)
@@ -274,7 +274,7 @@ func TestPanicHandlerPreMalloc(t *testing.T) {
c := atomic.LoadInt64(&panicCounter)
assert.EqualValuesf(t, 1, c, "panic handler didn't work, panicCounter: %d", c)
assert.EqualValues(t, 0, p0.Running(), "pool should be empty after panic")
- p1, err := NewPoolWithFunc(10, func(p interface{}) { panic(p) }, WithPanicHandler(func(_ interface{}) {
+ p1, err := NewPoolWithFunc(10, func(p any) { panic(p) }, WithPanicHandler(func(_ any) {
defer wg.Done()
atomic.AddInt64(&panicCounter, 1)
}))
@@ -296,7 +296,7 @@ func TestPoolPanicWithoutHandler(t *testing.T) {
panic("Oops!")
})
- p1, err := NewPoolWithFunc(10, func(p interface{}) {
+ p1, err := NewPoolWithFunc(10, func(p any) {
panic(p)
})
assert.NoErrorf(t, err, "create new pool with func failed: %v", err)
@@ -312,7 +312,7 @@ func TestPoolPanicWithoutHandlerPreMalloc(t *testing.T) {
panic("Oops!")
})
- p1, err := NewPoolWithFunc(10, func(p interface{}) {
+ p1, err := NewPoolWithFunc(10, func(p any) {
panic(p)
})
@@ -345,7 +345,7 @@ func TestPurgePool(t *testing.T) {
assert.Equalf(t, 0, p.Running(), "pool should be empty after purge, but got %d", p.Running())
ch = make(chan struct{})
- f := func(i interface{}) {
+ f := func(i any) {
<-ch
d := i.(int) % 100
time.Sleep(time.Duration(d) * time.Millisecond)
@@ -445,7 +445,7 @@ func TestMaxBlockingSubmit(t *testing.T) {
func TestNonblockingSubmitWithFunc(t *testing.T) {
poolSize := 10
var wg sync.WaitGroup
- p, err := NewPoolWithFunc(poolSize, func(i interface{}) {
+ p, err := NewPoolWithFunc(poolSize, func(i any) {
longRunningPoolFunc(i)
wg.Done()
}, WithNonblocking(true))
@@ -537,7 +537,7 @@ func TestRebootNewPool(t *testing.T) {
assert.NoError(t, p.Submit(func() { wg.Done() }), "pool should be rebooted")
wg.Wait()
- p1, err := NewPoolWithFunc(10, func(i interface{}) {
+ p1, err := NewPoolWithFunc(10, func(i any) {
demoPoolFunc(i)
wg.Done()
})
@@ -667,7 +667,7 @@ func TestWithDisablePurgePoolFunc(t *testing.T) {
var wg1, wg2 sync.WaitGroup
wg1.Add(numWorker)
wg2.Add(numWorker)
- p, _ := NewPoolWithFunc(numWorker, func(_ interface{}) {
+ p, _ := NewPoolWithFunc(numWorker, func(_ any) {
wg1.Done()
<-sig
wg2.Done()
@@ -682,7 +682,7 @@ func TestWithDisablePurgeAndWithExpirationPoolFunc(t *testing.T) {
wg1.Add(numWorker)
wg2.Add(numWorker)
expiredDuration := time.Millisecond * 100
- p, _ := NewPoolWithFunc(numWorker, func(_ interface{}) {
+ p, _ := NewPoolWithFunc(numWorker, func(_ any) {
wg1.Done()
<-sig
wg2.Done()
@@ -692,7 +692,7 @@ func TestWithDisablePurgeAndWithExpirationPoolFunc(t *testing.T) {
func TestInfinitePoolWithFunc(t *testing.T) {
c := make(chan struct{})
- p, _ := NewPoolWithFunc(-1, func(i interface{}) {
+ p, _ := NewPoolWithFunc(-1, func(i any) {
demoPoolFunc(i)
<-c
})
@@ -759,7 +759,7 @@ func TestReleaseWhenRunningPool(t *testing.T) {
func TestReleaseWhenRunningPoolWithFunc(t *testing.T) {
var wg sync.WaitGroup
- p, _ := NewPoolWithFunc(1, func(i interface{}) {
+ p, _ := NewPoolWithFunc(1, func(i any) {
t.Log("do task", i)
time.Sleep(1 * time.Second)
})
@@ -914,7 +914,7 @@ func TestPoolTuneScaleUp(t *testing.T) {
p.Release()
// test PoolWithFunc
- pf, _ := NewPoolWithFunc(2, func(_ interface{}) {
+ pf, _ := NewPoolWithFunc(2, func(_ any) {
<-c
})
for i := 0; i < 2; i++ {
@@ -962,7 +962,7 @@ func TestReleaseTimeout(t *testing.T) {
assert.NoError(t, err)
var pf *PoolWithFunc
- pf, _ = NewPoolWithFunc(10, func(i interface{}) {
+ pf, _ = NewPoolWithFunc(10, func(i any) {
dur := i.(time.Duration)
time.Sleep(dur)
})
diff --git a/examples/main.go b/examples/main.go
index b9670986..bc00ef83 100644
--- a/examples/main.go
+++ b/examples/main.go
@@ -33,7 +33,7 @@ import (
var sum int32
-func myFunc(i interface{}) {
+func myFunc(i any) {
n := i.(int32)
atomic.AddInt32(&sum, n)
fmt.Printf("run with %d\n", n)
@@ -65,7 +65,7 @@ func main() {
// Use the pool with a function,
// set 10 to the capacity of goroutine pool and 1 second for expired duration.
- p, _ := ants.NewPoolWithFunc(10, func(i interface{}) {
+ p, _ := ants.NewPoolWithFunc(10, func(i any) {
myFunc(i)
wg.Done()
})
@@ -96,7 +96,7 @@ func main() {
fmt.Printf("finish all tasks.\n")
// Use the MultiPoolFunc and set the capacity of 10 goroutine pools to (runTimes/10).
- mpf, _ := ants.NewMultiPoolWithFunc(10, runTimes/10, func(i interface{}) {
+ mpf, _ := ants.NewMultiPoolWithFunc(10, runTimes/10, func(i any) {
myFunc(i)
wg.Done()
}, ants.LeastTasks)
diff --git a/go.mod b/go.mod
index 2e75d8f6..9ce61a62 100644
--- a/go.mod
+++ b/go.mod
@@ -1,8 +1,14 @@
module github.com/panjf2000/ants/v2
-go 1.16
+go 1.18
require (
github.com/stretchr/testify v1.8.2
golang.org/x/sync v0.3.0
)
+
+require (
+ github.com/davecgh/go-spew v1.1.1 // indirect
+ github.com/pmezard/go-difflib v1.0.0 // indirect
+ gopkg.in/yaml.v3 v3.0.1 // indirect
+)
diff --git a/multipool_func.go b/multipool_func.go
index 868c0dea..ed7e1dc2 100644
--- a/multipool_func.go
+++ b/multipool_func.go
@@ -46,7 +46,7 @@ type MultiPoolWithFunc struct {
// NewMultiPoolWithFunc instantiates a MultiPoolWithFunc with a size of the pool list and a size
// per pool, and the load-balancing strategy.
-func NewMultiPoolWithFunc(size, sizePerPool int, fn func(interface{}), lbs LoadBalancingStrategy, options ...Option) (*MultiPoolWithFunc, error) {
+func NewMultiPoolWithFunc(size, sizePerPool int, fn func(any), lbs LoadBalancingStrategy, options ...Option) (*MultiPoolWithFunc, error) {
if lbs != RoundRobin && lbs != LeastTasks {
return nil, ErrInvalidLoadBalancingStrategy
}
@@ -82,7 +82,7 @@ func (mp *MultiPoolWithFunc) next(lbs LoadBalancingStrategy) (idx int) {
}
// Invoke submits a task to a pool selected by the load-balancing strategy.
-func (mp *MultiPoolWithFunc) Invoke(args interface{}) (err error) {
+func (mp *MultiPoolWithFunc) Invoke(args any) (err error) {
if mp.IsClosed() {
return ErrPoolClosed
}
diff --git a/options.go b/options.go
index 90d1ad51..b859bef3 100644
--- a/options.go
+++ b/options.go
@@ -34,7 +34,7 @@ type Options struct {
// PanicHandler is used to handle panics from each worker goroutine.
// if nil, panics will be thrown out again from worker goroutines.
- PanicHandler func(interface{})
+ PanicHandler func(any)
// Logger is the customized logger for logging info, if it is not set,
// default standard logger from log package is used.
@@ -80,7 +80,7 @@ func WithNonblocking(nonblocking bool) Option {
}
// WithPanicHandler sets up panic handler.
-func WithPanicHandler(panicHandler func(interface{})) Option {
+func WithPanicHandler(panicHandler func(any)) Option {
return func(opts *Options) {
opts.PanicHandler = panicHandler
}
diff --git a/pool.go b/pool.go
index 8361928b..b1dfa991 100644
--- a/pool.go
+++ b/pool.go
@@ -22,203 +22,13 @@
package ants
-import (
- "context"
- "sync"
- "sync/atomic"
- "time"
-
- syncx "github.com/panjf2000/ants/v2/pkg/sync"
-)
-
-type poolCommon struct {
- // capacity of the pool, a negative value means that the capacity of pool is limitless, an infinite pool is used to
- // avoid potential issue of endless blocking caused by nested usage of a pool: submitting a task to pool
- // which submits a new task to the same pool.
- capacity int32
-
- // running is the number of the currently running goroutines.
- running int32
-
- // lock for protecting the worker queue.
- lock sync.Locker
-
- // workers is a slice that store the available workers.
- workers workerQueue
-
- // state is used to notice the pool to closed itself.
- state int32
-
- // cond for waiting to get an idle worker.
- cond *sync.Cond
-
- // done is used to indicate that all workers are done.
- allDone chan struct{}
- // once is used to make sure the pool is closed just once.
- once *sync.Once
-
- // workerCache speeds up the obtainment of a usable worker in function:retrieveWorker.
- workerCache sync.Pool
-
- // waiting is the number of goroutines already been blocked on pool.Submit(), protected by pool.lock
- waiting int32
-
- purgeDone int32
- purgeCtx context.Context
- stopPurge context.CancelFunc
-
- ticktockDone int32
- ticktockCtx context.Context
- stopTicktock context.CancelFunc
-
- now atomic.Value
-
- options *Options
-}
-
-// Pool accepts the tasks and process them concurrently,
-// it limits the total of goroutines to a given number by recycling goroutines.
+// Pool is a goroutine pool that limits and recycles a mass of goroutines.
+// The pool capacity can be fixed or unlimited.
type Pool struct {
- poolCommon
-}
-
-// purgeStaleWorkers clears stale workers periodically, it runs in an individual goroutine, as a scavenger.
-func (p *Pool) purgeStaleWorkers() {
- ticker := time.NewTicker(p.options.ExpiryDuration)
-
- defer func() {
- ticker.Stop()
- atomic.StoreInt32(&p.purgeDone, 1)
- }()
-
- purgeCtx := p.purgeCtx // copy to the local variable to avoid race from Reboot()
- for {
- select {
- case <-purgeCtx.Done():
- return
- case <-ticker.C:
- }
-
- if p.IsClosed() {
- break
- }
-
- var isDormant bool
- p.lock.Lock()
- staleWorkers := p.workers.refresh(p.options.ExpiryDuration)
- n := p.Running()
- isDormant = n == 0 || n == len(staleWorkers)
- p.lock.Unlock()
-
- // Clean up the stale workers.
- for i := range staleWorkers {
- staleWorkers[i].finish()
- staleWorkers[i] = nil
- }
-
- // There might be a situation where all workers have been cleaned up (no worker is running),
- // while some invokers still are stuck in p.cond.Wait(), then we need to awake those invokers.
- if isDormant && p.Waiting() > 0 {
- p.cond.Broadcast()
- }
- }
-}
-
-// ticktock is a goroutine that updates the current time in the pool regularly.
-func (p *Pool) ticktock() {
- ticker := time.NewTicker(nowTimeUpdateInterval)
- defer func() {
- ticker.Stop()
- atomic.StoreInt32(&p.ticktockDone, 1)
- }()
-
- ticktockCtx := p.ticktockCtx // copy to the local variable to avoid race from Reboot()
- for {
- select {
- case <-ticktockCtx.Done():
- return
- case <-ticker.C:
- }
-
- if p.IsClosed() {
- break
- }
-
- p.now.Store(time.Now())
- }
-}
-
-func (p *Pool) goPurge() {
- if p.options.DisablePurge {
- return
- }
-
- // Start a goroutine to clean up expired workers periodically.
- p.purgeCtx, p.stopPurge = context.WithCancel(context.Background())
- go p.purgeStaleWorkers()
-}
-
-func (p *Pool) goTicktock() {
- p.now.Store(time.Now())
- p.ticktockCtx, p.stopTicktock = context.WithCancel(context.Background())
- go p.ticktock()
-}
-
-func (p *Pool) nowTime() time.Time {
- return p.now.Load().(time.Time)
+ *poolCommon
}
-// NewPool instantiates a Pool with customized options.
-func NewPool(size int, options ...Option) (*Pool, error) {
- if size <= 0 {
- size = -1
- }
-
- opts := loadOptions(options...)
-
- if !opts.DisablePurge {
- if expiry := opts.ExpiryDuration; expiry < 0 {
- return nil, ErrInvalidPoolExpiry
- } else if expiry == 0 {
- opts.ExpiryDuration = DefaultCleanIntervalTime
- }
- }
-
- if opts.Logger == nil {
- opts.Logger = defaultLogger
- }
-
- p := &Pool{poolCommon: poolCommon{
- capacity: int32(size),
- allDone: make(chan struct{}),
- lock: syncx.NewSpinLock(),
- once: &sync.Once{},
- options: opts,
- }}
- p.workerCache.New = func() interface{} {
- return &goWorker{
- pool: p,
- task: make(chan func(), workerChanCap),
- }
- }
- if p.options.PreAlloc {
- if size == -1 {
- return nil, ErrInvalidPreAllocSize
- }
- p.workers = newWorkerQueue(queueTypeLoopQueue, size)
- } else {
- p.workers = newWorkerQueue(queueTypeStack, 0)
- }
-
- p.cond = sync.NewCond(p.lock)
-
- p.goPurge()
- p.goTicktock()
-
- return p, nil
-}
-
-// Submit submits a task to this pool.
+// Submit submits a task to the pool.
//
// Note that you are allowed to call Pool.Submit() from the current Pool.Submit(),
// but what calls for special attention is that you will get blocked with the last
@@ -236,198 +46,20 @@ func (p *Pool) Submit(task func()) error {
return err
}
-// Running returns the number of workers currently running.
-func (p *Pool) Running() int {
- return int(atomic.LoadInt32(&p.running))
-}
-
-// Free returns the number of available workers, -1 indicates this pool is unlimited.
-func (p *Pool) Free() int {
- c := p.Cap()
- if c < 0 {
- return -1
- }
- return c - p.Running()
-}
-
-// Waiting returns the number of tasks waiting to be executed.
-func (p *Pool) Waiting() int {
- return int(atomic.LoadInt32(&p.waiting))
-}
-
-// Cap returns the capacity of this pool.
-func (p *Pool) Cap() int {
- return int(atomic.LoadInt32(&p.capacity))
-}
-
-// Tune changes the capacity of this pool, note that it is noneffective to the infinite or pre-allocation pool.
-func (p *Pool) Tune(size int) {
- capacity := p.Cap()
- if capacity == -1 || size <= 0 || size == capacity || p.options.PreAlloc {
- return
- }
- atomic.StoreInt32(&p.capacity, int32(size))
- if size > capacity {
- if size-capacity == 1 {
- p.cond.Signal()
- return
- }
- p.cond.Broadcast()
- }
-}
-
-// IsClosed indicates whether the pool is closed.
-func (p *Pool) IsClosed() bool {
- return atomic.LoadInt32(&p.state) == CLOSED
-}
-
-// Release closes this pool and releases the worker queue.
-func (p *Pool) Release() {
- if !atomic.CompareAndSwapInt32(&p.state, OPENED, CLOSED) {
- return
- }
-
- if p.stopPurge != nil {
- p.stopPurge()
- p.stopPurge = nil
- }
- if p.stopTicktock != nil {
- p.stopTicktock()
- p.stopTicktock = nil
- }
-
- p.lock.Lock()
- p.workers.reset()
- p.lock.Unlock()
- // There might be some callers waiting in retrieveWorker(), so we need to wake them up to prevent
- // those callers blocking infinitely.
- p.cond.Broadcast()
-}
-
-// ReleaseTimeout is like Release but with a timeout, it waits all workers to exit before timing out.
-func (p *Pool) ReleaseTimeout(timeout time.Duration) error {
- if p.IsClosed() || (!p.options.DisablePurge && p.stopPurge == nil) || p.stopTicktock == nil {
- return ErrPoolClosed
- }
-
- p.Release()
-
- var purgeCh <-chan struct{}
- if !p.options.DisablePurge {
- purgeCh = p.purgeCtx.Done()
- } else {
- purgeCh = p.allDone
- }
-
- if p.Running() == 0 {
- p.once.Do(func() {
- close(p.allDone)
- })
+// NewPool instantiates a Pool with customized options.
+func NewPool(size int, options ...Option) (*Pool, error) {
+ pc, err := newPool(size, options...)
+ if err != nil {
+ return nil, err
}
- timer := time.NewTimer(timeout)
- defer timer.Stop()
- for {
- select {
- case <-timer.C:
- return ErrTimeout
- case <-p.allDone:
- <-purgeCh
- <-p.ticktockCtx.Done()
- if p.Running() == 0 &&
- (p.options.DisablePurge || atomic.LoadInt32(&p.purgeDone) == 1) &&
- atomic.LoadInt32(&p.ticktockDone) == 1 {
- return nil
- }
+ pool := &Pool{poolCommon: pc}
+ pool.workerCache.New = func() any {
+ return &goWorker{
+ pool: pool,
+ task: make(chan func(), workerChanCap),
}
}
-}
-
-// Reboot reboots a closed pool, it does nothing if the pool is not closed.
-// If you intend to reboot a closed pool, use ReleaseTimeout() instead of
-// Release() to ensure that all workers are stopped and resource are released
-// before rebooting, otherwise you may run into data race.
-func (p *Pool) Reboot() {
- if atomic.CompareAndSwapInt32(&p.state, CLOSED, OPENED) {
- atomic.StoreInt32(&p.purgeDone, 0)
- p.goPurge()
- atomic.StoreInt32(&p.ticktockDone, 0)
- p.goTicktock()
- p.allDone = make(chan struct{})
- p.once = &sync.Once{}
- }
-}
-
-func (p *Pool) addRunning(delta int) int {
- return int(atomic.AddInt32(&p.running, int32(delta)))
-}
-
-func (p *Pool) addWaiting(delta int) {
- atomic.AddInt32(&p.waiting, int32(delta))
-}
-
-// retrieveWorker returns an available worker to run the tasks.
-func (p *Pool) retrieveWorker() (w worker, err error) {
- p.lock.Lock()
-
-retry:
- // First try to fetch the worker from the queue.
- if w = p.workers.detach(); w != nil {
- p.lock.Unlock()
- return
- }
-
- // If the worker queue is empty, and we don't run out of the pool capacity,
- // then just spawn a new worker goroutine.
- if capacity := p.Cap(); capacity == -1 || capacity > p.Running() {
- p.lock.Unlock()
- w = p.workerCache.Get().(*goWorker)
- w.run()
- return
- }
-
- // Bail out early if it's in nonblocking mode or the number of pending callers reaches the maximum limit value.
- if p.options.Nonblocking || (p.options.MaxBlockingTasks != 0 && p.Waiting() >= p.options.MaxBlockingTasks) {
- p.lock.Unlock()
- return nil, ErrPoolOverload
- }
-
- // Otherwise, we'll have to keep them blocked and wait for at least one worker to be put back into pool.
- p.addWaiting(1)
- p.cond.Wait() // block and wait for an available worker
- p.addWaiting(-1)
-
- if p.IsClosed() {
- p.lock.Unlock()
- return nil, ErrPoolClosed
- }
-
- goto retry
-}
-
-// revertWorker puts a worker back into free pool, recycling the goroutines.
-func (p *Pool) revertWorker(worker *goWorker) bool {
- if capacity := p.Cap(); (capacity > 0 && p.Running() > capacity) || p.IsClosed() {
- p.cond.Broadcast()
- return false
- }
-
- worker.lastUsed = p.nowTime()
-
- p.lock.Lock()
- // To avoid memory leaks, add a double check in the lock scope.
- // Issue: https://github.com/panjf2000/ants/issues/113
- if p.IsClosed() {
- p.lock.Unlock()
- return false
- }
- if err := p.workers.insert(worker); err != nil {
- p.lock.Unlock()
- return false
- }
- // Notify the invoker stuck in 'retrieveWorker()' of there is an available worker in the worker queue.
- p.cond.Signal()
- p.lock.Unlock()
- return true
+ return pool, nil
}
diff --git a/pool_func.go b/pool_func.go
index f3d341d4..70f5fae2 100644
--- a/pool_func.go
+++ b/pool_func.go
@@ -22,173 +22,21 @@
package ants
-import (
- "context"
- "sync"
- "sync/atomic"
- "time"
-
- syncx "github.com/panjf2000/ants/v2/pkg/sync"
-)
-
-// PoolWithFunc accepts the tasks and process them concurrently,
-// it limits the total of goroutines to a given number by recycling goroutines.
+// PoolWithFunc is like Pool but accepts a unified function for all goroutines to execute.
type PoolWithFunc struct {
- poolCommon
-
- // poolFunc is the function for processing tasks.
- poolFunc func(interface{})
-}
-
-// purgeStaleWorkers clears stale workers periodically, it runs in an individual goroutine, as a scavenger.
-func (p *PoolWithFunc) purgeStaleWorkers() {
- ticker := time.NewTicker(p.options.ExpiryDuration)
- defer func() {
- ticker.Stop()
- atomic.StoreInt32(&p.purgeDone, 1)
- }()
-
- purgeCtx := p.purgeCtx // copy to the local variable to avoid race from Reboot()
- for {
- select {
- case <-purgeCtx.Done():
- return
- case <-ticker.C:
- }
-
- if p.IsClosed() {
- break
- }
-
- var isDormant bool
- p.lock.Lock()
- staleWorkers := p.workers.refresh(p.options.ExpiryDuration)
- n := p.Running()
- isDormant = n == 0 || n == len(staleWorkers)
- p.lock.Unlock()
-
- // Clean up the stale workers.
- for i := range staleWorkers {
- staleWorkers[i].finish()
- staleWorkers[i] = nil
- }
-
- // There might be a situation where all workers have been cleaned up (no worker is running),
- // while some invokers still are stuck in p.cond.Wait(), then we need to awake those invokers.
- if isDormant && p.Waiting() > 0 {
- p.cond.Broadcast()
- }
- }
-}
-
-// ticktock is a goroutine that updates the current time in the pool regularly.
-func (p *PoolWithFunc) ticktock() {
- ticker := time.NewTicker(nowTimeUpdateInterval)
- defer func() {
- ticker.Stop()
- atomic.StoreInt32(&p.ticktockDone, 1)
- }()
-
- ticktockCtx := p.ticktockCtx // copy to the local variable to avoid race from Reboot()
- for {
- select {
- case <-ticktockCtx.Done():
- return
- case <-ticker.C:
- }
-
- if p.IsClosed() {
- break
- }
-
- p.now.Store(time.Now())
- }
-}
-
-func (p *PoolWithFunc) goPurge() {
- if p.options.DisablePurge {
- return
- }
-
- // Start a goroutine to clean up expired workers periodically.
- p.purgeCtx, p.stopPurge = context.WithCancel(context.Background())
- go p.purgeStaleWorkers()
-}
+ *poolCommon
-func (p *PoolWithFunc) goTicktock() {
- p.now.Store(time.Now())
- p.ticktockCtx, p.stopTicktock = context.WithCancel(context.Background())
- go p.ticktock()
+ // poolFunc is the unified function for processing tasks.
+ poolFunc func(any)
}
-func (p *PoolWithFunc) nowTime() time.Time {
- return p.now.Load().(time.Time)
-}
-
-// NewPoolWithFunc instantiates a PoolWithFunc with customized options.
-func NewPoolWithFunc(size int, pf func(interface{}), options ...Option) (*PoolWithFunc, error) {
- if size <= 0 {
- size = -1
- }
-
- if pf == nil {
- return nil, ErrLackPoolFunc
- }
-
- opts := loadOptions(options...)
-
- if !opts.DisablePurge {
- if expiry := opts.ExpiryDuration; expiry < 0 {
- return nil, ErrInvalidPoolExpiry
- } else if expiry == 0 {
- opts.ExpiryDuration = DefaultCleanIntervalTime
- }
- }
-
- if opts.Logger == nil {
- opts.Logger = defaultLogger
- }
-
- p := &PoolWithFunc{
- poolCommon: poolCommon{
- capacity: int32(size),
- allDone: make(chan struct{}),
- lock: syncx.NewSpinLock(),
- once: &sync.Once{},
- options: opts,
- },
- poolFunc: pf,
- }
- p.workerCache.New = func() interface{} {
- return &goWorkerWithFunc{
- pool: p,
- args: make(chan interface{}, workerChanCap),
- }
- }
- if p.options.PreAlloc {
- if size == -1 {
- return nil, ErrInvalidPreAllocSize
- }
- p.workers = newWorkerQueue(queueTypeLoopQueue, size)
- } else {
- p.workers = newWorkerQueue(queueTypeStack, 0)
- }
-
- p.cond = sync.NewCond(p.lock)
-
- p.goPurge()
- p.goTicktock()
-
- return p, nil
-}
-
-// Invoke submits a task to pool.
+// Invoke passes arguments to the pool.
//
// Note that you are allowed to call Pool.Invoke() from the current Pool.Invoke(),
// but what calls for special attention is that you will get blocked with the last
// Pool.Invoke() call once the current Pool runs out of its capacity, and to avoid this,
// you should instantiate a PoolWithFunc with ants.WithNonblocking(true).
-func (p *PoolWithFunc) Invoke(args interface{}) error {
+func (p *PoolWithFunc) Invoke(args any) error {
if p.IsClosed() {
return ErrPoolClosed
}
@@ -200,198 +48,28 @@ func (p *PoolWithFunc) Invoke(args interface{}) error {
return err
}
-// Running returns the number of workers currently running.
-func (p *PoolWithFunc) Running() int {
- return int(atomic.LoadInt32(&p.running))
-}
-
-// Free returns the number of available workers, -1 indicates this pool is unlimited.
-func (p *PoolWithFunc) Free() int {
- c := p.Cap()
- if c < 0 {
- return -1
- }
- return c - p.Running()
-}
-
-// Waiting returns the number of tasks waiting to be executed.
-func (p *PoolWithFunc) Waiting() int {
- return int(atomic.LoadInt32(&p.waiting))
-}
-
-// Cap returns the capacity of this pool.
-func (p *PoolWithFunc) Cap() int {
- return int(atomic.LoadInt32(&p.capacity))
-}
-
-// Tune changes the capacity of this pool, note that it is noneffective to the infinite or pre-allocation pool.
-func (p *PoolWithFunc) Tune(size int) {
- capacity := p.Cap()
- if capacity == -1 || size <= 0 || size == capacity || p.options.PreAlloc {
- return
- }
- atomic.StoreInt32(&p.capacity, int32(size))
- if size > capacity {
- if size-capacity == 1 {
- p.cond.Signal()
- return
- }
- p.cond.Broadcast()
- }
-}
-
-// IsClosed indicates whether the pool is closed.
-func (p *PoolWithFunc) IsClosed() bool {
- return atomic.LoadInt32(&p.state) == CLOSED
-}
-
-// Release closes this pool and releases the worker queue.
-func (p *PoolWithFunc) Release() {
- if !atomic.CompareAndSwapInt32(&p.state, OPENED, CLOSED) {
- return
- }
-
- if p.stopPurge != nil {
- p.stopPurge()
- p.stopPurge = nil
- }
- if p.stopTicktock != nil {
- p.stopTicktock()
- p.stopTicktock = nil
- }
-
- p.lock.Lock()
- p.workers.reset()
- p.lock.Unlock()
- // There might be some callers waiting in retrieveWorker(), so we need to wake them up to prevent
- // those callers blocking infinitely.
- p.cond.Broadcast()
-}
-
-// ReleaseTimeout is like Release but with a timeout, it waits all workers to exit before timing out.
-func (p *PoolWithFunc) ReleaseTimeout(timeout time.Duration) error {
- if p.IsClosed() || (!p.options.DisablePurge && p.stopPurge == nil) || p.stopTicktock == nil {
- return ErrPoolClosed
+// NewPoolWithFunc instantiates a PoolWithFunc with customized options.
+func NewPoolWithFunc(size int, pf func(any), options ...Option) (*PoolWithFunc, error) {
+ if pf == nil {
+ return nil, ErrLackPoolFunc
}
- p.Release()
-
- var purgeCh <-chan struct{}
- if !p.options.DisablePurge {
- purgeCh = p.purgeCtx.Done()
- } else {
- purgeCh = p.allDone
+ pc, err := newPool(size, options...)
+ if err != nil {
+ return nil, err
}
- if p.Running() == 0 {
- p.once.Do(func() {
- close(p.allDone)
- })
+ pool := &PoolWithFunc{
+ poolCommon: pc,
+ poolFunc: pf,
}
- timer := time.NewTimer(timeout)
- defer timer.Stop()
- for {
- select {
- case <-timer.C:
- return ErrTimeout
- case <-p.allDone:
- <-purgeCh
- <-p.ticktockCtx.Done()
- if p.Running() == 0 &&
- (p.options.DisablePurge || atomic.LoadInt32(&p.purgeDone) == 1) &&
- atomic.LoadInt32(&p.ticktockDone) == 1 {
- return nil
- }
+ pool.workerCache.New = func() any {
+ return &goWorkerWithFunc{
+ pool: pool,
+ args: make(chan any, workerChanCap),
}
}
-}
-
-// Reboot reboots a closed pool, it does nothing if the pool is not closed.
-// If you intend to reboot a closed pool, use ReleaseTimeout() instead of
-// Release() to ensure that all workers are stopped and resource are released
-// before rebooting, otherwise you may run into data race.
-func (p *PoolWithFunc) Reboot() {
- if atomic.CompareAndSwapInt32(&p.state, CLOSED, OPENED) {
- atomic.StoreInt32(&p.purgeDone, 0)
- p.goPurge()
- atomic.StoreInt32(&p.ticktockDone, 0)
- p.goTicktock()
- p.allDone = make(chan struct{})
- p.once = &sync.Once{}
- }
-}
-
-func (p *PoolWithFunc) addRunning(delta int) int {
- return int(atomic.AddInt32(&p.running, int32(delta)))
-}
-
-func (p *PoolWithFunc) addWaiting(delta int) {
- atomic.AddInt32(&p.waiting, int32(delta))
-}
-
-// retrieveWorker returns an available worker to run the tasks.
-func (p *PoolWithFunc) retrieveWorker() (w worker, err error) {
- p.lock.Lock()
-
-retry:
- // First try to fetch the worker from the queue.
- if w = p.workers.detach(); w != nil {
- p.lock.Unlock()
- return
- }
-
- // If the worker queue is empty, and we don't run out of the pool capacity,
- // then just spawn a new worker goroutine.
- if capacity := p.Cap(); capacity == -1 || capacity > p.Running() {
- p.lock.Unlock()
- w = p.workerCache.Get().(*goWorkerWithFunc)
- w.run()
- return
- }
-
- // Bail out early if it's in nonblocking mode or the number of pending callers reaches the maximum limit value.
- if p.options.Nonblocking || (p.options.MaxBlockingTasks != 0 && p.Waiting() >= p.options.MaxBlockingTasks) {
- p.lock.Unlock()
- return nil, ErrPoolOverload
- }
-
- // Otherwise, we'll have to keep them blocked and wait for at least one worker to be put back into pool.
- p.addWaiting(1)
- p.cond.Wait() // block and wait for an available worker
- p.addWaiting(-1)
-
- if p.IsClosed() {
- p.lock.Unlock()
- return nil, ErrPoolClosed
- }
-
- goto retry
-}
-
-// revertWorker puts a worker back into free pool, recycling the goroutines.
-func (p *PoolWithFunc) revertWorker(worker *goWorkerWithFunc) bool {
- if capacity := p.Cap(); (capacity > 0 && p.Running() > capacity) || p.IsClosed() {
- p.cond.Broadcast()
- return false
- }
-
- worker.lastUsed = p.nowTime()
-
- p.lock.Lock()
- // To avoid memory leaks, add a double check in the lock scope.
- // Issue: https://github.com/panjf2000/ants/issues/113
- if p.IsClosed() {
- p.lock.Unlock()
- return false
- }
- if err := p.workers.insert(worker); err != nil {
- p.lock.Unlock()
- return false
- }
- // Notify the invoker stuck in 'retrieveWorker()' of there is an available worker in the worker queue.
- p.cond.Signal()
- p.lock.Unlock()
- return true
+ return pool, nil
}
diff --git a/worker.go b/worker.go
index 73166f80..f8dd6506 100644
--- a/worker.go
+++ b/worker.go
@@ -84,10 +84,14 @@ func (w *goWorker) lastUsedTime() time.Time {
return w.lastUsed
}
+func (w *goWorker) setLastUsedTime(t time.Time) {
+ w.lastUsed = t
+}
+
func (w *goWorker) inputFunc(fn func()) {
w.task <- fn
}
-func (w *goWorker) inputParam(interface{}) {
+func (w *goWorker) inputParam(any) {
panic("unreachable")
}
diff --git a/worker_func.go b/worker_func.go
index a25f4f9e..76c697ac 100644
--- a/worker_func.go
+++ b/worker_func.go
@@ -35,7 +35,7 @@ type goWorkerWithFunc struct {
pool *PoolWithFunc
// args is a job should be done.
- args chan interface{}
+ args chan any
// lastUsed will be updated when putting a worker back into queue.
lastUsed time.Time
@@ -84,10 +84,14 @@ func (w *goWorkerWithFunc) lastUsedTime() time.Time {
return w.lastUsed
}
+func (w *goWorkerWithFunc) setLastUsedTime(t time.Time) {
+ w.lastUsed = t
+}
+
func (w *goWorkerWithFunc) inputFunc(func()) {
panic("unreachable")
}
-func (w *goWorkerWithFunc) inputParam(arg interface{}) {
+func (w *goWorkerWithFunc) inputParam(arg any) {
w.args <- arg
}
diff --git a/worker_loop_queue_test.go b/worker_loop_queue_test.go
index 3bd495e5..755cf156 100644
--- a/worker_loop_queue_test.go
+++ b/worker_loop_queue_test.go
@@ -1,5 +1,4 @@
//go:build !windows
-// +build !windows
package ants
diff --git a/worker_queue.go b/worker_queue.go
index bcb74807..1c44ee64 100644
--- a/worker_queue.go
+++ b/worker_queue.go
@@ -17,8 +17,9 @@ type worker interface {
run()
finish()
lastUsedTime() time.Time
+ setLastUsedTime(t time.Time)
inputFunc(func())
- inputParam(interface{})
+ inputParam(any)
}
type workerQueue interface {
diff --git a/worker_stack_test.go b/worker_stack_test.go
index 6fd3d762..453d6e3a 100644
--- a/worker_stack_test.go
+++ b/worker_stack_test.go
@@ -1,5 +1,4 @@
//go:build !windows
-// +build !windows
package ants