From 212f6232de3f04de91e17be86128503876e13ddc Mon Sep 17 00:00:00 2001 From: plastikfan Date: Sun, 2 Jun 2024 12:26:52 +0100 Subject: [PATCH] ref(ants,boost): avoid storing context (#279) --- boost/examples/alpha/main.go | 4 ++-- boost/examples/beta/main.go | 4 ++-- boost/pool-defs-internal.go | 2 -- boost/worker-pool-func-manifold.go | 1 - boost/worker-pool-func.go | 9 ++++----- boost/worker-pool-func_test.go | 18 +++++++++--------- boost/worker-pool-task.go | 9 ++++----- boost/worker-pool-task_test.go | 20 ++++++++++---------- internal/ants/ants_test.go | 12 ++++++------ internal/ants/pool-func.go | 23 +++++++++++------------ internal/ants/pool.go | 29 ++++++++++++++--------------- internal/ants/worker-pool.go | 10 ++++------ 12 files changed, 66 insertions(+), 75 deletions(-) diff --git a/boost/examples/alpha/main.go b/boost/examples/alpha/main.go index 5293d2f..657a6d0 100644 --- a/boost/examples/alpha/main.go +++ b/boost/examples/alpha/main.go @@ -51,11 +51,11 @@ func main() { time.Sleep(time.Second) }, &wg, ants.WithNonblocking(false)) - defer pool.Release() + defer pool.Release(ctx) for i := 0; i < 30; i++ { // producer fmt.Printf("PRE: <--- (n: %v) [%v] 🍋 \n", i, time.Now().Format(time.TimeOnly)) - _ = pool.Post(i) + _ = pool.Post(ctx, i) fmt.Printf("POST: <--- (n: %v) [%v] 🍊 \n", i, time.Now().Format(time.TimeOnly)) } diff --git a/boost/examples/beta/main.go b/boost/examples/beta/main.go index 38179b2..bea50f1 100644 --- a/boost/examples/beta/main.go +++ b/boost/examples/beta/main.go @@ -44,11 +44,11 @@ func main() { pool, _ := boost.NewTaskPool[int, int](ctx, NoW, &wg) - defer pool.Release() + defer pool.Release(ctx) for i := 0; i < 30; i++ { // producer fmt.Printf("PRE: <--- (n: %v) [%v] 🍋 \n", i, time.Now().Format(time.TimeOnly)) - _ = pool.Post(func() { + _ = pool.Post(ctx, func() { fmt.Printf("=> running: '%v')\n", pool.Running()) fmt.Printf("<--- (n: %v)🍒 \n", i) time.Sleep(time.Second) diff --git a/boost/pool-defs-internal.go b/boost/pool-defs-internal.go index de62fdb..b67be36 100644 --- a/boost/pool-defs-internal.go +++ b/boost/pool-defs-internal.go @@ -1,7 +1,6 @@ package boost import ( - "context" "sync" "github.com/snivilised/lorax/internal/ants" @@ -32,7 +31,6 @@ type ( workersCollectionL[I, O any] map[workerID]*workerWrapperL[I, O] basePool struct { - ctx context.Context wg *sync.WaitGroup idGen IDGenerator } diff --git a/boost/worker-pool-func-manifold.go b/boost/worker-pool-func-manifold.go index 356cd6e..f58d6a0 100644 --- a/boost/worker-pool-func-manifold.go +++ b/boost/worker-pool-func-manifold.go @@ -29,7 +29,6 @@ func NewManifoldFuncPool[I, O any](ctx context.Context, return &ManifoldFuncPool[I, O]{ basePool: basePool{ - ctx: ctx, idGen: &Sequential{}, wg: wg, }, diff --git a/boost/worker-pool-func.go b/boost/worker-pool-func.go index e594e70..ef828d0 100644 --- a/boost/worker-pool-func.go +++ b/boost/worker-pool-func.go @@ -30,7 +30,6 @@ func NewFuncPool[I, O any](ctx context.Context, return &FuncPool[I, O]{ basePool: basePool{ - ctx: ctx, wg: wg, idGen: &Sequential{}, }, @@ -40,14 +39,14 @@ func NewFuncPool[I, O any](ctx context.Context, }, err } -func (p *FuncPool[I, O]) Post(job ants.InputParam) error { - return p.pool.Invoke(job) +func (p *FuncPool[I, O]) Post(ctx context.Context, job ants.InputParam) error { + return p.pool.Invoke(ctx, job) } func (p *FuncPool[I, O]) Running() int { return p.pool.Running() } -func (p *FuncPool[I, O]) Release() { - p.pool.Release() +func (p *FuncPool[I, O]) Release(ctx context.Context) { + p.pool.Release(ctx) } diff --git a/boost/worker-pool-func_test.go b/boost/worker-pool-func_test.go index 6413ebb..01c0388 100644 --- a/boost/worker-pool-func_test.go +++ b/boost/worker-pool-func_test.go @@ -24,10 +24,10 @@ var _ = Describe("WorkerPoolFunc", func() { pool, err := boost.NewFuncPool[int, int](ctx, AntsSize, demoPoolFunc, &wg) - defer pool.Release() + defer pool.Release(ctx) for i := 0; i < n; i++ { - _ = pool.Post(Param) + _ = pool.Post(ctx, Param) } wg.Wait() GinkgoWriter.Printf("pool with func, running workers number:%d\n", @@ -48,10 +48,10 @@ var _ = Describe("WorkerPoolFunc", func() { pool, err := boost.NewFuncPool[int, int](ctx, AntsSize, demoPoolFunc, &wg) - defer pool.Release() + defer pool.Release(ctx) for i := 0; i < n; i++ { - _ = pool.Post(Param) + _ = pool.Post(ctx, Param) if i > 10 { cancel() @@ -82,18 +82,18 @@ var _ = Describe("WorkerPoolFunc", func() { ) Expect(err).To(Succeed(), "create TimingPool failed") - defer pool.Release() + defer pool.Release(ctx) By("👾 POOL-CREATED\n") for i := 0; i < PoolSize-1; i++ { - Expect(pool.Post(Param)).To(Succeed(), + Expect(pool.Post(ctx, Param)).To(Succeed(), "submit when pool is not full shouldn't return error", ) } ch := make(chan struct{}) // pool is full now. - Expect(pool.Post(ch)).To(Succeed(), + Expect(pool.Post(ctx, ch)).To(Succeed(), "submit when pool is not full shouldn't return error", ) @@ -103,7 +103,7 @@ var _ = Describe("WorkerPoolFunc", func() { go func() { // should be blocked. blocking num == 1 - if err := pool.Post(Param); err != nil { + if err := pool.Post(ctx, Param); err != nil { errCh <- err } By("👾 Producer complete\n") @@ -111,7 +111,7 @@ var _ = Describe("WorkerPoolFunc", func() { }() time.Sleep(1 * time.Second) // already reached max blocking limit - Expect(pool.Post(Param)).To(MatchError(ants.ErrPoolOverload.Error()), + Expect(pool.Post(ctx, Param)).To(MatchError(ants.ErrPoolOverload.Error()), "blocking submit when pool reach max blocking submit should return ErrPoolOverload", ) diff --git a/boost/worker-pool-task.go b/boost/worker-pool-task.go index bc83bc2..97a70e0 100644 --- a/boost/worker-pool-task.go +++ b/boost/worker-pool-task.go @@ -46,7 +46,6 @@ func NewTaskPool[I, O any](ctx context.Context, return &TaskPool[I, O]{ basePool: basePool{ - ctx: ctx, wg: wg, idGen: &Sequential{}, }, @@ -56,14 +55,14 @@ func NewTaskPool[I, O any](ctx context.Context, }, err } -func (p *TaskPool[I, O]) Post(task ants.TaskFunc) error { - return p.pool.Submit(p.ctx, task) +func (p *TaskPool[I, O]) Post(ctx context.Context, task ants.TaskFunc) error { + return p.pool.Submit(ctx, task) } func (p *TaskPool[I, O]) Running() int { return p.pool.Running() } -func (p *TaskPool[I, O]) Release() { - p.pool.Release() +func (p *TaskPool[I, O]) Release(ctx context.Context) { + p.pool.Release(ctx) } diff --git a/boost/worker-pool-task_test.go b/boost/worker-pool-task_test.go index 520e815..202c05e 100644 --- a/boost/worker-pool-task_test.go +++ b/boost/worker-pool-task_test.go @@ -25,13 +25,13 @@ var _ = Describe("WorkerPoolTask", func() { pool, err := boost.NewTaskPool[int, int](ctx, PoolSize, &wg, boost.WithNonblocking(true), ) - defer pool.Release() + defer pool.Release(ctx) Expect(err).To(Succeed()) Expect(pool).NotTo(BeNil()) for i := 0; i < PoolSize-1; i++ { - Expect(pool.Post(longRunningFunc)).To(Succeed(), + Expect(pool.Post(ctx, longRunningFunc)).To(Succeed(), "nonblocking submit when pool is not full shouldn't return error", ) } @@ -43,16 +43,16 @@ var _ = Describe("WorkerPoolTask", func() { close(secondCh) } // pool is full now. - Expect(pool.Post(fn)).To(Succeed(), + Expect(pool.Post(ctx, fn)).To(Succeed(), "nonblocking submit when pool is not full shouldn't return error", ) - Expect(pool.Post(demoFunc)).To(MatchError(ants.ErrPoolOverload.Error()), + Expect(pool.Post(ctx, demoFunc)).To(MatchError(ants.ErrPoolOverload.Error()), "nonblocking submit when pool is full should get an ErrPoolOverload", ) // interrupt fn to get an available worker close(firstCh) <-secondCh - Expect(pool.Post(demoFunc)).To(Succeed(), + Expect(pool.Post(ctx, demoFunc)).To(Succeed(), "nonblocking submit when pool is not full shouldn't return error", ) }) @@ -70,11 +70,11 @@ var _ = Describe("WorkerPoolTask", func() { boost.WithMaxBlockingTasks(1), ) Expect(err).To(Succeed(), "create TimingPool failed") - defer pool.Release() + defer pool.Release(ctx) By("👾 POOL-CREATED\n") for i := 0; i < PoolSize-1; i++ { - Expect(pool.Post(longRunningFunc)).To(Succeed(), + Expect(pool.Post(ctx, longRunningFunc)).To(Succeed(), "submit when pool is not full shouldn't return error", ) } @@ -83,7 +83,7 @@ var _ = Describe("WorkerPoolTask", func() { <-ch } // pool is full now. - Expect(pool.Post(fn)).To(Succeed(), + Expect(pool.Post(ctx, fn)).To(Succeed(), "submit when pool is not full shouldn't return error", ) @@ -92,7 +92,7 @@ var _ = Describe("WorkerPoolTask", func() { errCh := make(chan error, 1) go func() { // should be blocked. blocking num == 1 - if err := pool.Post(demoFunc); err != nil { + if err := pool.Post(ctx, demoFunc); err != nil { errCh <- err } By("👾 Producer complete\n") @@ -103,7 +103,7 @@ var _ = Describe("WorkerPoolTask", func() { time.Sleep(1 * time.Second) // already reached max blocking limit - Expect(pool.Post(demoFunc)).To(MatchError(ants.ErrPoolOverload.Error()), + Expect(pool.Post(ctx, demoFunc)).To(MatchError(ants.ErrPoolOverload.Error()), "blocking submit when pool reach max blocking submit should return ErrPoolOverload", ) diff --git a/internal/ants/ants_test.go b/internal/ants/ants_test.go index ac1492b..56a2a56 100644 --- a/internal/ants/ants_test.go +++ b/internal/ants/ants_test.go @@ -25,7 +25,7 @@ var _ = Describe("Ants", func() { pool, err := ants.NewPool(ctx, poolSize, ants.WithNonblocking(true)) Expect(err).To(Succeed(), "create TimingPool failed") - defer pool.Release() + defer pool.Release(ctx) for i := 0; i < poolSize-1; i++ { Expect(pool.Submit(ctx, longRunningFunc)).To(Succeed(), @@ -66,7 +66,7 @@ var _ = Describe("Ants", func() { pool, err := ants.NewPool(ctx, poolSize, ants.WithMaxBlockingTasks(1)) Expect(err).To(Succeed(), "create TimingPool failed") - defer pool.Release() + defer pool.Release(ctx) for i := 0; i < poolSize-1; i++ { Expect(pool.Submit(ctx, longRunningFunc)).To(Succeed(), @@ -126,11 +126,11 @@ var _ = Describe("Ants", func() { demoPoolFunc(i) wg.Done() }) - defer pool.Release() + defer pool.Release(ctx) for i := 0; i < n; i++ { wg.Add(1) - _ = pool.Invoke(Param) + _ = pool.Invoke(ctx, Param) } wg.Wait() GinkgoWriter.Printf("pool with func, running workers number:%d\n", @@ -152,11 +152,11 @@ var _ = Describe("Ants", func() { demoPoolFunc(i) wg.Done() }, ants.WithPreAlloc(true)) - defer pool.Release() + defer pool.Release(ctx) for i := 0; i < n; i++ { wg.Add(1) - _ = pool.Invoke(Param) + _ = pool.Invoke(ctx, Param) } wg.Wait() GinkgoWriter.Printf("pool with func, running workers number:%d\n", diff --git a/internal/ants/pool-func.go b/internal/ants/pool-func.go index 20f6704..b96cc70 100644 --- a/internal/ants/pool-func.go +++ b/internal/ants/pool-func.go @@ -105,21 +105,21 @@ func (p *PoolWithFunc) ticktock(ticktockCtx context.Context) { } } -func (p *PoolWithFunc) goPurge() { +func (p *PoolWithFunc) goPurge(ctx context.Context) { if p.o.DisablePurge { return } // Start a goroutine to clean up expired workers periodically. var purgeCtx context.Context - purgeCtx, p.stopPurge = context.WithCancel(p.ctx) + purgeCtx, p.stopPurge = context.WithCancel(ctx) go p.purgeStaleWorkers(purgeCtx) } -func (p *PoolWithFunc) goTicktock() { +func (p *PoolWithFunc) goTicktock(ctx context.Context) { p.now.Store(time.Now()) var ticktockCtx context.Context - ticktockCtx, p.stopTicktock = context.WithCancel(p.ctx) + ticktockCtx, p.stopTicktock = context.WithCancel(ctx) go p.ticktock(ticktockCtx) } @@ -157,7 +157,6 @@ func NewPoolWithFunc(ctx context.Context, p := &PoolWithFunc{ workerPool: workerPool{ - ctx: ctx, capacity: int32(size), lock: async.NewSpinLock(), o: opts, @@ -181,8 +180,8 @@ func NewPoolWithFunc(ctx context.Context, p.cond = sync.NewCond(p.lock) - p.goPurge() - p.goTicktock() + p.goPurge(ctx) + p.goTicktock(ctx) return p, nil } @@ -193,26 +192,26 @@ func NewPoolWithFunc(ctx context.Context, // but what calls for special attention is that you will get blocked with the last // Pool.Invoke() call once the current Pool runs out of its capacity, and to avoid this, // you should instantiate a PoolWithFunc with ants.WithNonblocking(true). -func (p *PoolWithFunc) Invoke(job InputParam) error { +func (p *PoolWithFunc) Invoke(ctx context.Context, job InputParam) error { if p.IsClosed() { return ErrPoolClosed } w, err := p.retrieveWorker() if w != nil { - w.sendParam(p.ctx, job) + w.sendParam(ctx, job) } return err } // Reboot reboots a closed pool. -func (p *PoolWithFunc) Reboot() { +func (p *PoolWithFunc) Reboot(ctx context.Context) { if atomic.CompareAndSwapInt32(&p.state, CLOSED, OPENED) { atomic.StoreInt32(&p.purgeDone, 0) - p.goPurge() + p.goPurge(ctx) atomic.StoreInt32(&p.ticktockDone, 0) - p.goTicktock() + p.goTicktock(ctx) } } diff --git a/internal/ants/pool.go b/internal/ants/pool.go index 8cc20e0..040b1d9 100644 --- a/internal/ants/pool.go +++ b/internal/ants/pool.go @@ -70,7 +70,7 @@ func (p *Pool) purgeStaleWorkers(purgeCtx context.Context) { // may be blocking and may consume a lot of time if many workers // are located on non-local CPUs. for i := range staleWorkers { - staleWorkers[i].finish(p.ctx) + staleWorkers[i].finish(purgeCtx) staleWorkers[i] = nil } @@ -106,22 +106,22 @@ func (p *Pool) ticktock(ticktockCtx context.Context) { } } -func (p *Pool) goPurge() { +func (p *Pool) goPurge(ctx context.Context) { if p.o.DisablePurge { return } // Start a goroutine to clean up expired workers periodically. - var ctx context.Context - ctx, p.stopPurge = context.WithCancel(p.ctx) - go p.purgeStaleWorkers(ctx) + var purgeCtx context.Context + purgeCtx, p.stopPurge = context.WithCancel(ctx) + go p.purgeStaleWorkers(purgeCtx) } -func (p *Pool) goTicktock() { +func (p *Pool) goTicktock(ctx context.Context) { p.now.Store(time.Now()) - var ctx context.Context - ctx, p.stopTicktock = context.WithCancel(p.ctx) - go p.ticktock(ctx) + var ticktockCtx context.Context + ticktockCtx, p.stopTicktock = context.WithCancel(ctx) + go p.ticktock(ticktockCtx) } func (p *Pool) nowTime() time.Time { @@ -150,7 +150,6 @@ func NewPool(ctx context.Context, size int, options ...Option) (*Pool, error) { p := &Pool{ workerPool: workerPool{ - ctx: ctx, capacity: int32(size), lock: async.NewSpinLock(), o: opts, @@ -175,8 +174,8 @@ func NewPool(ctx context.Context, size int, options ...Option) (*Pool, error) { p.cond = sync.NewCond(p.lock) - p.goPurge() - p.goTicktock() + p.goPurge(ctx) + p.goTicktock(ctx) return p, nil } @@ -202,12 +201,12 @@ func (p *Pool) Submit(ctx context.Context, task TaskFunc) error { } // Reboot reboots a closed pool. -func (p *Pool) Reboot() { +func (p *Pool) Reboot(ctx context.Context) { if atomic.CompareAndSwapInt32(&p.state, CLOSED, OPENED) { atomic.StoreInt32(&p.purgeDone, 0) - p.goPurge() + p.goPurge(ctx) atomic.StoreInt32(&p.ticktockDone, 0) - p.goTicktock() + p.goTicktock(ctx) } } diff --git a/internal/ants/worker-pool.go b/internal/ants/worker-pool.go index a1f97ec..5756675 100644 --- a/internal/ants/worker-pool.go +++ b/internal/ants/worker-pool.go @@ -8,8 +8,6 @@ import ( ) type workerPool struct { - // client defined context - ctx context.Context // capacity of the pool. capacity int32 @@ -95,7 +93,7 @@ func (p *workerPool) IsClosed() bool { } // Release closes this pool and releases the worker queue. -func (p *workerPool) Release() { +func (p *workerPool) Release(ctx context.Context) { if !atomic.CompareAndSwapInt32(&p.state, OPENED, CLOSED) { return } @@ -108,7 +106,7 @@ func (p *workerPool) Release() { p.stopTicktock = nil p.lock.Lock() - p.workers.reset(p.ctx) + p.workers.reset(ctx) p.lock.Unlock() // There might be some callers waiting in retrieveWorker(), so we need to // wake them up to prevent those callers blocking infinitely. @@ -117,12 +115,12 @@ func (p *workerPool) Release() { // ReleaseTimeout is like Release but with a timeout, it waits all workers // to exit before timing out. -func (p *workerPool) ReleaseTimeout(timeout time.Duration) error { +func (p *workerPool) ReleaseTimeout(ctx context.Context, timeout time.Duration) error { purge := (!p.o.DisablePurge && p.stopPurge == nil) if p.IsClosed() || purge || p.stopTicktock == nil { return ErrPoolClosed } - p.Release() + p.Release(ctx) endTime := time.Now().Add(timeout) for time.Now().Before(endTime) {