From c0fa09481923075544e7e1dd9bd848ab4dcb55bd Mon Sep 17 00:00:00 2001 From: plastikfan Date: Sat, 1 Jun 2024 14:51:46 +0100 Subject: [PATCH] ref(ants): de-duplicate pool/func-pool code (#277) --- internal/ants/pool-func.go | 145 ++------------------------------- internal/ants/pool.go | 151 ++--------------------------------- internal/ants/worker-pool.go | 150 ++++++++++++++++++++++++++++++++++ 3 files changed, 164 insertions(+), 282 deletions(-) create mode 100644 internal/ants/worker-pool.go diff --git a/internal/ants/pool-func.go b/internal/ants/pool-func.go index c270462..20f6704 100644 --- a/internal/ants/pool-func.go +++ b/internal/ants/pool-func.go @@ -34,44 +34,9 @@ import ( // PoolWithFunc accepts the tasks and process them concurrently, // it limits the total of goroutines to a given number by recycling goroutines. type PoolWithFunc struct { - // client defined context - ctx context.Context - // capacity of the pool. - capacity int32 - - // running is the number of the currently running goroutines. - running int32 - - // lock for protecting the worker queue. - lock sync.Locker - - // workers is a slice that store the available workers. - workers workerQueue - - // state is used to notice the pool to closed itself. - state int32 - - // cond for waiting to get an idle worker. - cond *sync.Cond - + workerPool // poolFunc is the function for processing tasks. poolFunc PoolFunc - - // workerCache speeds up the obtainment of a usable worker in function:retrieveWorker. - workerCache sync.Pool - - // waiting is the number of the goroutines already been blocked on pool.Invoke(), protected by pool.lock - waiting int32 - - purgeDone int32 - stopPurge context.CancelFunc - - ticktockDone int32 - stopTicktock context.CancelFunc - - now atomic.Value - - o *Options } // purgeStaleWorkers clears stale workers periodically, it runs in an individual goroutine, as a scavenger. @@ -191,11 +156,13 @@ func NewPoolWithFunc(ctx context.Context, } p := &PoolWithFunc{ - ctx: ctx, - capacity: int32(size), + workerPool: workerPool{ + ctx: ctx, + capacity: int32(size), + lock: async.NewSpinLock(), + o: opts, + }, poolFunc: pf, - lock: async.NewSpinLock(), - o: opts, } p.workerCache.New = func() interface{} { // interface{} => sync.Pool api return &goWorkerWithFunc{ @@ -239,96 +206,6 @@ func (p *PoolWithFunc) Invoke(job InputParam) error { return err } -// Running returns the number of workers currently running. -func (p *PoolWithFunc) Running() int { - return int(atomic.LoadInt32(&p.running)) -} - -// Free returns the number of available workers, -1 indicates this pool is unlimited. -func (p *PoolWithFunc) Free() int { - c := p.Cap() - if c < 0 { - return -1 - } - - return c - p.Running() -} - -// Waiting returns the number of tasks waiting to be executed. -func (p *PoolWithFunc) Waiting() int { - return int(atomic.LoadInt32(&p.waiting)) -} - -// Cap returns the capacity of this pool. -func (p *PoolWithFunc) Cap() int { - return int(atomic.LoadInt32(&p.capacity)) -} - -// Tune changes the capacity of this pool, note that it is noneffective to the -// infinite or pre-allocation pool. -func (p *PoolWithFunc) Tune(size int) { - capacity := p.Cap() - if capacity == -1 || size <= 0 || size == capacity || p.o.PreAlloc { - return - } - atomic.StoreInt32(&p.capacity, int32(size)) - if size > capacity { - if size-capacity == 1 { - p.cond.Signal() - return - } - p.cond.Broadcast() - } -} - -// IsClosed indicates whether the pool is closed. -func (p *PoolWithFunc) IsClosed() bool { - return atomic.LoadInt32(&p.state) == CLOSED -} - -// Release closes this pool and releases the worker queue. -func (p *PoolWithFunc) Release() { - if !atomic.CompareAndSwapInt32(&p.state, OPENED, CLOSED) { - return - } - - if p.stopPurge != nil { - p.stopPurge() - p.stopPurge = nil - } - p.stopTicktock() - p.stopTicktock = nil - - p.lock.Lock() - p.workers.reset(p.ctx) - p.lock.Unlock() - // There might be some callers waiting in retrieveWorker(), so we need to - // wake them up to prevent those callers blocking infinitely. - p.cond.Broadcast() -} - -// ReleaseTimeout is like Release but with a timeout, it waits all workers -// to exit before timing out. -func (p *PoolWithFunc) ReleaseTimeout(timeout time.Duration) error { - purge := (!p.o.DisablePurge && p.stopPurge == nil) - if p.IsClosed() || purge || p.stopTicktock == nil { - return ErrPoolClosed - } - p.Release() - - endTime := time.Now().Add(timeout) - for time.Now().Before(endTime) { - if p.Running() == 0 && - (p.o.DisablePurge || atomic.LoadInt32(&p.purgeDone) == 1) && - atomic.LoadInt32(&p.ticktockDone) == 1 { - return nil - } - time.Sleep(releaseTimeoutInterval * time.Millisecond) - } - - return ErrTimeout -} - // Reboot reboots a closed pool. func (p *PoolWithFunc) Reboot() { if atomic.CompareAndSwapInt32(&p.state, CLOSED, OPENED) { @@ -339,14 +216,6 @@ func (p *PoolWithFunc) Reboot() { } } -func (p *PoolWithFunc) addRunning(delta int) { - atomic.AddInt32(&p.running, int32(delta)) -} - -func (p *PoolWithFunc) addWaiting(delta int) { - atomic.AddInt32(&p.waiting, int32(delta)) -} - // retrieveWorker returns an available worker to run the tasks. func (p *PoolWithFunc) retrieveWorker() (w worker, err error) { p.lock.Lock() diff --git a/internal/ants/pool.go b/internal/ants/pool.go index 67855b8..8cc20e0 100644 --- a/internal/ants/pool.go +++ b/internal/ants/pool.go @@ -34,47 +34,7 @@ import ( // Pool accepts the tasks and process them concurrently, // it limits the total of goroutines to a given number by recycling goroutines. type Pool struct { - // client defined context - ctx context.Context - - // capacity of the pool, a negative value means that the capacity of pool is - // limitless, an infinite pool is used to avoid potential issue of endless - // blocking caused by nested usage of a pool: submitting a task to pool which - // submits a new task to the same pool. - capacity int32 - - // running is the number of the currently running goroutines. - running int32 - - // lock for protecting the worker queue. - lock sync.Locker - - // workers is a slice that store the available workers. - workers workerQueue - - // state is used to notice the pool to closed itself. - state int32 - - // cond for waiting to get an idle worker. - cond *sync.Cond - - // workerCache speeds up the obtainment of a usable worker in - // function:retrieveWorker. - workerCache sync.Pool - - // waiting is the number of goroutines already been blocked on - // pool.Submit(), protected by pool.lock - waiting int32 - - purgeDone int32 - stopPurge context.CancelFunc - - ticktockDone int32 - stopTicktock context.CancelFunc - - now atomic.Value - - o *Options + workerPool } // purgeStaleWorkers clears stale workers periodically, it runs in an @@ -189,10 +149,12 @@ func NewPool(ctx context.Context, size int, options ...Option) (*Pool, error) { } p := &Pool{ - ctx: ctx, - capacity: int32(size), - lock: async.NewSpinLock(), - o: opts, + workerPool: workerPool{ + ctx: ctx, + capacity: int32(size), + lock: async.NewSpinLock(), + o: opts, + }, } p.workerCache.New = func() interface{} { // interface{} => sync.Pool api @@ -239,97 +201,6 @@ func (p *Pool) Submit(ctx context.Context, task TaskFunc) error { return err } -// Running returns the number of workers currently running. -func (p *Pool) Running() int { - return int(atomic.LoadInt32(&p.running)) -} - -// Free returns the number of available workers, -1 indicates this pool -// is unlimited. -func (p *Pool) Free() int { - c := p.Cap() - if c < 0 { - return -1 - } - - return c - p.Running() -} - -// Waiting returns the number of tasks waiting to be executed. -func (p *Pool) Waiting() int { - return int(atomic.LoadInt32(&p.waiting)) -} - -// Cap returns the capacity of this pool. -func (p *Pool) Cap() int { - return int(atomic.LoadInt32(&p.capacity)) -} - -// Tune changes the capacity of this pool, note that it is noneffective to -// the infinite or pre-allocation pool. -func (p *Pool) Tune(size int) { - capacity := p.Cap() - if capacity == -1 || size <= 0 || size == capacity || p.o.PreAlloc { - return - } - atomic.StoreInt32(&p.capacity, int32(size)) - if size > capacity { - if size-capacity == 1 { - p.cond.Signal() - - return - } - p.cond.Broadcast() - } -} - -// IsClosed indicates whether the pool is closed. -func (p *Pool) IsClosed() bool { - return atomic.LoadInt32(&p.state) == CLOSED -} - -// Release closes this pool and releases the worker queue. -func (p *Pool) Release() { - if !atomic.CompareAndSwapInt32(&p.state, OPENED, CLOSED) { - return - } - - if p.stopPurge != nil { - p.stopPurge() - p.stopPurge = nil - } - p.stopTicktock() - p.stopTicktock = nil - - p.lock.Lock() - p.workers.reset(p.ctx) - p.lock.Unlock() - // There might be some callers waiting in retrieveWorker(), so we need - // to wake them up to prevent those callers blocking infinitely. - p.cond.Broadcast() -} - -// ReleaseTimeout is like Release but with a timeout, it waits all workers -// to exit before timing out. -func (p *Pool) ReleaseTimeout(timeout time.Duration) error { - if p.IsClosed() || (!p.o.DisablePurge && p.stopPurge == nil) || p.stopTicktock == nil { - return ErrPoolClosed - } - p.Release() - - endTime := time.Now().Add(timeout) - for time.Now().Before(endTime) { - if p.Running() == 0 && - (p.o.DisablePurge || atomic.LoadInt32(&p.purgeDone) == 1) && - atomic.LoadInt32(&p.ticktockDone) == 1 { - return nil - } - time.Sleep(releaseTimeoutInterval * time.Millisecond) - } - - return ErrTimeout -} - // Reboot reboots a closed pool. func (p *Pool) Reboot() { if atomic.CompareAndSwapInt32(&p.state, CLOSED, OPENED) { @@ -340,14 +211,6 @@ func (p *Pool) Reboot() { } } -func (p *Pool) addRunning(delta int) { - atomic.AddInt32(&p.running, int32(delta)) -} - -func (p *Pool) addWaiting(delta int) { - atomic.AddInt32(&p.waiting, int32(delta)) -} - // retrieveWorker returns an available worker to run the tasks. func (p *Pool) retrieveWorker() (w worker, err error) { p.lock.Lock() // why isn't the unlock just deferred? diff --git a/internal/ants/worker-pool.go b/internal/ants/worker-pool.go new file mode 100644 index 0000000..a1f97ec --- /dev/null +++ b/internal/ants/worker-pool.go @@ -0,0 +1,150 @@ +package ants + +import ( + "context" + "sync" + "sync/atomic" + "time" +) + +type workerPool struct { + // client defined context + ctx context.Context + // capacity of the pool. + capacity int32 + + // running is the number of the currently running goroutines. + running int32 + + // lock for protecting the worker queue. + lock sync.Locker + + // workers is a slice that store the available workers. + workers workerQueue + + // state is used to notice the pool to closed itself. + state int32 + + // cond for waiting to get an idle worker. + cond *sync.Cond + + // workerCache speeds up the obtainment of a usable worker in function:retrieveWorker. + workerCache sync.Pool + + // waiting is the number of the goroutines already been blocked on pool.Invoke(), protected by pool.lock + waiting int32 + + purgeDone int32 + stopPurge context.CancelFunc + + ticktockDone int32 + stopTicktock context.CancelFunc + + now atomic.Value + + o *Options +} + +// Running returns the number of workers currently running. +func (p *workerPool) Running() int { + return int(atomic.LoadInt32(&p.running)) +} + +// Free returns the number of available workers, -1 indicates this pool +// is unlimited. +func (p *workerPool) Free() int { + c := p.Cap() + if c < 0 { + return -1 + } + + return c - p.Running() +} + +// Waiting returns the number of tasks waiting to be executed. +func (p *workerPool) Waiting() int { + return int(atomic.LoadInt32(&p.waiting)) +} + +// Cap returns the capacity of this pool. +func (p *workerPool) Cap() int { + return int(atomic.LoadInt32(&p.capacity)) +} + +// Tune changes the capacity of this pool, note that it is noneffective to +// the infinite or pre-allocation pool. +func (p *workerPool) Tune(size int) { + capacity := p.Cap() + if capacity == -1 || size <= 0 || size == capacity || p.o.PreAlloc { + return + } + atomic.StoreInt32(&p.capacity, int32(size)) + if size > capacity { + if size-capacity == 1 { + p.cond.Signal() + + return + } + p.cond.Broadcast() + } +} + +// IsClosed indicates whether the pool is closed. +func (p *workerPool) IsClosed() bool { + return atomic.LoadInt32(&p.state) == CLOSED +} + +// Release closes this pool and releases the worker queue. +func (p *workerPool) Release() { + if !atomic.CompareAndSwapInt32(&p.state, OPENED, CLOSED) { + return + } + + if p.stopPurge != nil { + p.stopPurge() + p.stopPurge = nil + } + p.stopTicktock() + p.stopTicktock = nil + + p.lock.Lock() + p.workers.reset(p.ctx) + p.lock.Unlock() + // There might be some callers waiting in retrieveWorker(), so we need to + // wake them up to prevent those callers blocking infinitely. + p.cond.Broadcast() +} + +// ReleaseTimeout is like Release but with a timeout, it waits all workers +// to exit before timing out. +func (p *workerPool) ReleaseTimeout(timeout time.Duration) error { + purge := (!p.o.DisablePurge && p.stopPurge == nil) + if p.IsClosed() || purge || p.stopTicktock == nil { + return ErrPoolClosed + } + p.Release() + + endTime := time.Now().Add(timeout) + for time.Now().Before(endTime) { + if p.Running() == 0 && + (p.o.DisablePurge || atomic.LoadInt32(&p.purgeDone) == 1) && + atomic.LoadInt32(&p.ticktockDone) == 1 { + return nil + } + time.Sleep(releaseTimeoutInterval * time.Millisecond) + } + + return ErrTimeout +} + +func (p *workerPool) addRunning(delta int) { + atomic.AddInt32(&p.running, int32(delta)) +} + +func (p *workerPool) addWaiting(delta int) { + atomic.AddInt32(&p.waiting, int32(delta)) +} + +func (p *workerPool) GetOptions() *Options { + return p.o +}