Skip to content

Commit

Permalink
ref(ants): de-duplicate pool/func-pool code (#277)
Browse files Browse the repository at this point in the history
  • Loading branch information
plastikfan committed Jun 2, 2024
1 parent e6516b7 commit c0fa094
Show file tree
Hide file tree
Showing 3 changed files with 164 additions and 282 deletions.
145 changes: 7 additions & 138 deletions internal/ants/pool-func.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,44 +34,9 @@ import (
// PoolWithFunc accepts the tasks and process them concurrently,
// it limits the total of goroutines to a given number by recycling goroutines.
type PoolWithFunc struct {
// client defined context
ctx context.Context
// capacity of the pool.
capacity int32

// running is the number of the currently running goroutines.
running int32

// lock for protecting the worker queue.
lock sync.Locker

// workers is a slice that store the available workers.
workers workerQueue

// state is used to notice the pool to closed itself.
state int32

// cond for waiting to get an idle worker.
cond *sync.Cond

workerPool
// poolFunc is the function for processing tasks.
poolFunc PoolFunc

// workerCache speeds up the obtainment of a usable worker in function:retrieveWorker.
workerCache sync.Pool

// waiting is the number of the goroutines already been blocked on pool.Invoke(), protected by pool.lock
waiting int32

purgeDone int32
stopPurge context.CancelFunc

ticktockDone int32
stopTicktock context.CancelFunc

now atomic.Value

o *Options
}

// purgeStaleWorkers clears stale workers periodically, it runs in an individual goroutine, as a scavenger.
Expand Down Expand Up @@ -191,11 +156,13 @@ func NewPoolWithFunc(ctx context.Context,
}

p := &PoolWithFunc{
ctx: ctx,
capacity: int32(size),
workerPool: workerPool{
ctx: ctx,
capacity: int32(size),
lock: async.NewSpinLock(),
o: opts,
},
poolFunc: pf,
lock: async.NewSpinLock(),
o: opts,
}
p.workerCache.New = func() interface{} { // interface{} => sync.Pool api
return &goWorkerWithFunc{
Expand Down Expand Up @@ -239,96 +206,6 @@ func (p *PoolWithFunc) Invoke(job InputParam) error {
return err
}

// Running returns the number of workers currently running.
func (p *PoolWithFunc) Running() int {
return int(atomic.LoadInt32(&p.running))
}

// Free returns the number of available workers, -1 indicates this pool is unlimited.
func (p *PoolWithFunc) Free() int {
c := p.Cap()
if c < 0 {
return -1
}

return c - p.Running()
}

// Waiting returns the number of tasks waiting to be executed.
func (p *PoolWithFunc) Waiting() int {
return int(atomic.LoadInt32(&p.waiting))
}

// Cap returns the capacity of this pool.
func (p *PoolWithFunc) Cap() int {
return int(atomic.LoadInt32(&p.capacity))
}

// Tune changes the capacity of this pool, note that it is noneffective to the
// infinite or pre-allocation pool.
func (p *PoolWithFunc) Tune(size int) {
capacity := p.Cap()
if capacity == -1 || size <= 0 || size == capacity || p.o.PreAlloc {
return
}
atomic.StoreInt32(&p.capacity, int32(size))
if size > capacity {
if size-capacity == 1 {
p.cond.Signal()
return
}
p.cond.Broadcast()
}
}

// IsClosed indicates whether the pool is closed.
func (p *PoolWithFunc) IsClosed() bool {
return atomic.LoadInt32(&p.state) == CLOSED
}

// Release closes this pool and releases the worker queue.
func (p *PoolWithFunc) Release() {
if !atomic.CompareAndSwapInt32(&p.state, OPENED, CLOSED) {
return
}

if p.stopPurge != nil {
p.stopPurge()
p.stopPurge = nil
}
p.stopTicktock()
p.stopTicktock = nil

p.lock.Lock()
p.workers.reset(p.ctx)
p.lock.Unlock()
// There might be some callers waiting in retrieveWorker(), so we need to
// wake them up to prevent those callers blocking infinitely.
p.cond.Broadcast()
}

// ReleaseTimeout is like Release but with a timeout, it waits all workers
// to exit before timing out.
func (p *PoolWithFunc) ReleaseTimeout(timeout time.Duration) error {
purge := (!p.o.DisablePurge && p.stopPurge == nil)
if p.IsClosed() || purge || p.stopTicktock == nil {
return ErrPoolClosed
}
p.Release()

endTime := time.Now().Add(timeout)
for time.Now().Before(endTime) {
if p.Running() == 0 &&
(p.o.DisablePurge || atomic.LoadInt32(&p.purgeDone) == 1) &&
atomic.LoadInt32(&p.ticktockDone) == 1 {
return nil
}
time.Sleep(releaseTimeoutInterval * time.Millisecond)
}

return ErrTimeout
}

// Reboot reboots a closed pool.
func (p *PoolWithFunc) Reboot() {
if atomic.CompareAndSwapInt32(&p.state, CLOSED, OPENED) {
Expand All @@ -339,14 +216,6 @@ func (p *PoolWithFunc) Reboot() {
}
}

func (p *PoolWithFunc) addRunning(delta int) {
atomic.AddInt32(&p.running, int32(delta))
}

func (p *PoolWithFunc) addWaiting(delta int) {
atomic.AddInt32(&p.waiting, int32(delta))
}

// retrieveWorker returns an available worker to run the tasks.
func (p *PoolWithFunc) retrieveWorker() (w worker, err error) {
p.lock.Lock()
Expand Down
151 changes: 7 additions & 144 deletions internal/ants/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,47 +34,7 @@ import (
// Pool accepts the tasks and process them concurrently,
// it limits the total of goroutines to a given number by recycling goroutines.
type Pool struct {
// client defined context
ctx context.Context

// capacity of the pool, a negative value means that the capacity of pool is
// limitless, an infinite pool is used to avoid potential issue of endless
// blocking caused by nested usage of a pool: submitting a task to pool which
// submits a new task to the same pool.
capacity int32

// running is the number of the currently running goroutines.
running int32

// lock for protecting the worker queue.
lock sync.Locker

// workers is a slice that store the available workers.
workers workerQueue

// state is used to notice the pool to closed itself.
state int32

// cond for waiting to get an idle worker.
cond *sync.Cond

// workerCache speeds up the obtainment of a usable worker in
// function:retrieveWorker.
workerCache sync.Pool

// waiting is the number of goroutines already been blocked on
// pool.Submit(), protected by pool.lock
waiting int32

purgeDone int32
stopPurge context.CancelFunc

ticktockDone int32
stopTicktock context.CancelFunc

now atomic.Value

o *Options
workerPool
}

// purgeStaleWorkers clears stale workers periodically, it runs in an
Expand Down Expand Up @@ -189,10 +149,12 @@ func NewPool(ctx context.Context, size int, options ...Option) (*Pool, error) {
}

p := &Pool{
ctx: ctx,
capacity: int32(size),
lock: async.NewSpinLock(),
o: opts,
workerPool: workerPool{
ctx: ctx,
capacity: int32(size),
lock: async.NewSpinLock(),
o: opts,
},
}

p.workerCache.New = func() interface{} { // interface{} => sync.Pool api
Expand Down Expand Up @@ -239,97 +201,6 @@ func (p *Pool) Submit(ctx context.Context, task TaskFunc) error {
return err
}

// Running returns the number of workers currently running.
func (p *Pool) Running() int {
return int(atomic.LoadInt32(&p.running))
}

// Free returns the number of available workers, -1 indicates this pool
// is unlimited.
func (p *Pool) Free() int {
c := p.Cap()
if c < 0 {
return -1
}

return c - p.Running()
}

// Waiting returns the number of tasks waiting to be executed.
func (p *Pool) Waiting() int {
return int(atomic.LoadInt32(&p.waiting))
}

// Cap returns the capacity of this pool.
func (p *Pool) Cap() int {
return int(atomic.LoadInt32(&p.capacity))
}

// Tune changes the capacity of this pool, note that it is noneffective to
// the infinite or pre-allocation pool.
func (p *Pool) Tune(size int) {
capacity := p.Cap()
if capacity == -1 || size <= 0 || size == capacity || p.o.PreAlloc {
return
}
atomic.StoreInt32(&p.capacity, int32(size))
if size > capacity {
if size-capacity == 1 {
p.cond.Signal()

return
}
p.cond.Broadcast()
}
}

// IsClosed indicates whether the pool is closed.
func (p *Pool) IsClosed() bool {
return atomic.LoadInt32(&p.state) == CLOSED
}

// Release closes this pool and releases the worker queue.
func (p *Pool) Release() {
if !atomic.CompareAndSwapInt32(&p.state, OPENED, CLOSED) {
return
}

if p.stopPurge != nil {
p.stopPurge()
p.stopPurge = nil
}
p.stopTicktock()
p.stopTicktock = nil

p.lock.Lock()
p.workers.reset(p.ctx)
p.lock.Unlock()
// There might be some callers waiting in retrieveWorker(), so we need
// to wake them up to prevent those callers blocking infinitely.
p.cond.Broadcast()
}

// ReleaseTimeout is like Release but with a timeout, it waits all workers
// to exit before timing out.
func (p *Pool) ReleaseTimeout(timeout time.Duration) error {
if p.IsClosed() || (!p.o.DisablePurge && p.stopPurge == nil) || p.stopTicktock == nil {
return ErrPoolClosed
}
p.Release()

endTime := time.Now().Add(timeout)
for time.Now().Before(endTime) {
if p.Running() == 0 &&
(p.o.DisablePurge || atomic.LoadInt32(&p.purgeDone) == 1) &&
atomic.LoadInt32(&p.ticktockDone) == 1 {
return nil
}
time.Sleep(releaseTimeoutInterval * time.Millisecond)
}

return ErrTimeout
}

// Reboot reboots a closed pool.
func (p *Pool) Reboot() {
if atomic.CompareAndSwapInt32(&p.state, CLOSED, OPENED) {
Expand All @@ -340,14 +211,6 @@ func (p *Pool) Reboot() {
}
}

func (p *Pool) addRunning(delta int) {
atomic.AddInt32(&p.running, int32(delta))
}

func (p *Pool) addWaiting(delta int) {
atomic.AddInt32(&p.waiting, int32(delta))
}

// retrieveWorker returns an available worker to run the tasks.
func (p *Pool) retrieveWorker() (w worker, err error) {
p.lock.Lock() // why isn't the unlock just deferred?
Expand Down
Loading

0 comments on commit c0fa094

Please sign in to comment.