diff --git a/.gitignore b/.gitignore index 73390a3..f35432f 100644 --- a/.gitignore +++ b/.gitignore @@ -20,6 +20,9 @@ coverage.out ginkgo.report .task/ +.env +.DS_Store +thumbs.db i18n/out/en-US/active.en-GB.json diff --git a/.golangci.yml b/.golangci.yml index 8f83a56..5e9539f 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -68,6 +68,7 @@ issues: fix: true exclude: - "cuddle" + - "go statements can only invoke functions assigned on line above" run: issues-exit-code: 1 diff --git a/boost/base-pool.go b/boost/base-pool.go index 9c71894..b064815 100644 --- a/boost/base-pool.go +++ b/boost/base-pool.go @@ -7,11 +7,11 @@ import ( type ( basePool[I, O any] struct { - wg *sync.WaitGroup - sequence int32 - inputDupCh *Duplex[I] - outputDupCh *Duplex[JobOutput[O]] - ending bool + wg *sync.WaitGroup + sequence int32 + inputDupCh *Duplex[I] + oi *outputInfo[O] + ending bool } ) @@ -19,6 +19,16 @@ func (p *basePool[I, O]) next() int32 { return atomic.AddInt32(&p.sequence, int32(1)) } +// Observe func (p *basePool[I, O]) Observe() JobOutputStreamR[O] { - return p.outputDupCh.ReaderCh + return p.oi.outputDupCh.ReaderCh +} + +// CancelCh +func (p *basePool[I, O]) CancelCh() CancelStreamR { + if p.oi != nil { + return p.oi.cancelDupCh.ReaderCh + } + + return nil } diff --git a/boost/boost-public-api.go b/boost/boost-public-api.go index f2e9236..3abd373 100644 --- a/boost/boost-public-api.go +++ b/boost/boost-public-api.go @@ -56,6 +56,8 @@ type ( // Next is a sequential unique id generator func type Next func() string + + OnCancel func() ) type ExecutiveFunc[I, O any] func(j Job[I]) (JobOutput[O], error) diff --git a/boost/cancellation-monitor.go b/boost/cancellation-monitor.go new file mode 100644 index 0000000..c451e1c --- /dev/null +++ b/boost/cancellation-monitor.go @@ -0,0 +1,30 @@ +package boost + +import ( + "context" + "sync" +) + +// StartCancellationMonitor +func StartCancellationMonitor(ctx context.Context, + cancel context.CancelFunc, + wg *sync.WaitGroup, + cancelCh CancelStreamR, + on OnCancel, +) { + wg.Add(1) + go func(ctx context.Context, + cancel context.CancelFunc, + wg *sync.WaitGroup, + cancelCh CancelStreamR, + ) { + defer wg.Done() + + select { + case <-cancelCh: + on() + cancel() + case <-ctx.Done(): + } + }(ctx, cancel, wg, cancelCh) +} diff --git a/boost/examples/alpha/main.go b/boost/examples/fp-throttled/main.go similarity index 100% rename from boost/examples/alpha/main.go rename to boost/examples/fp-throttled/main.go diff --git a/boost/examples/gamma/main.go b/boost/examples/mf-all-output-consumed-by-range/main.go similarity index 95% rename from boost/examples/gamma/main.go rename to boost/examples/mf-all-output-consumed-by-range/main.go index c44a151..d5be00f 100644 --- a/boost/examples/gamma/main.go +++ b/boost/examples/mf-all-output-consumed-by-range/main.go @@ -14,6 +14,41 @@ import ( // output channel is created through which the client receives // all generated outputs. +func main() { + var wg sync.WaitGroup + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + pool, err := boost.NewManifoldFuncPool( + ctx, AntsSize, func(input int) (int, error) { + time.Sleep(time.Duration(input) * time.Millisecond) + + return n + 1, nil + }, &wg, + boost.WithOutput(OutputChSize, CheckCloseInterval, TimeoutOnSend), + ) + + defer pool.Release(ctx) + + if err != nil { + fmt.Printf("๐Ÿ”ฅ error creating pool: '%v'\n", err) + return + } + + wg.Add(1) + go produce(ctx, pool, &wg) + + wg.Add(1) + go consume(ctx, pool, &wg) + + fmt.Printf("pool with func, no of running workers:%d\n", + pool.Running(), + ) + wg.Wait() + fmt.Println("๐Ÿ (manifold-func-pool) FINISHED") +} + const ( AntsSize = 1000 n = 100000 @@ -21,6 +56,7 @@ const ( Param = 100 OutputChTimeout = time.Second / 2 // do not use a value that is similar to CheckCloseInterval CheckCloseInterval = time.Second / 10 + TimeoutOnSend = time.Second * 2 ) func produce(ctx context.Context, @@ -77,38 +113,3 @@ func consume(_ context.Context, ) } } - -func main() { - var wg sync.WaitGroup - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - pool, err := boost.NewManifoldFuncPool( - ctx, AntsSize, func(input int) (int, error) { - time.Sleep(time.Duration(input) * time.Millisecond) - - return n + 1, nil - }, &wg, - boost.WithOutput(OutputChSize, CheckCloseInterval), - ) - - defer pool.Release(ctx) - - if err != nil { - fmt.Printf("๐Ÿ”ฅ error creating pool: '%v'\n", err) - return - } - - wg.Add(1) - go produce(ctx, pool, &wg) //nolint:wsl // pendant - - wg.Add(1) - go consume(ctx, pool, &wg) //nolint:wsl // pendant - - fmt.Printf("pool with func, no of running workers:%d\n", - pool.Running(), - ) - wg.Wait() - fmt.Println("๐Ÿ (manifold-func-pool) FINISHED") -} diff --git a/boost/examples/mf-err-missing-consumer/main.go b/boost/examples/mf-err-missing-consumer/main.go new file mode 100644 index 0000000..667266c --- /dev/null +++ b/boost/examples/mf-err-missing-consumer/main.go @@ -0,0 +1,74 @@ +package main + +import ( + "context" + "fmt" + "sync" + "time" + + "github.com/snivilised/lorax/boost" +) + +// Demonstrates Timeout On Send as a result of not consuming the output +func main() { + var wg sync.WaitGroup + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + fmt.Printf("โฑ๏ธ timeout on send, missing consumer: '%.2f's\n", TimeoutOnSend.Seconds()) + pool, err := boost.NewManifoldFuncPool( + ctx, AntsSize, func(input int) (int, error) { + time.Sleep(time.Duration(input) * time.Millisecond) + + return n + 1, nil + }, &wg, + boost.WithOutput(OutputChSize, CheckCloseInterval, TimeoutOnSend), + ) + + defer pool.Release(ctx) + if cc := pool.CancelCh(); cc != nil { + boost.StartCancellationMonitor(ctx, cancel, &wg, cc, func() { + fmt.Print("๐Ÿ”ด cancellation received, cancelling...\n") + }) + } + + if err != nil { + fmt.Printf("๐Ÿ”ฅ error creating pool: '%v'\n", err) + return + } + + wg.Add(1) + go produce(ctx, pool, &wg) + + fmt.Printf("pool with func, no of running workers:%d\n", + pool.Running(), + ) + wg.Wait() + fmt.Println("๐Ÿ (manifold-func-pool, missing consumer) FINISHED") +} + +const ( + AntsSize = 1000 + n = 100000 + OutputChSize = 10 + Param = 100 + CheckCloseInterval = time.Second / 10 + TimeoutOnSend = time.Second * 3 +) + +func produce(ctx context.Context, + pool *boost.ManifoldFuncPool[int, int], + wg *sync.WaitGroup, +) { + defer wg.Done() + + for i, n := 0, 100; i < n; i++ { + _ = pool.Post(ctx, Param) + } + + // required to inform the worker pool that no more jobs will be submitted. + // failure to invoke Conclude will result in a never ending worker pool. + // + pool.Conclude(ctx) +} diff --git a/boost/examples/mf-err-timeout-on-send/main.go b/boost/examples/mf-err-timeout-on-send/main.go new file mode 100644 index 0000000..9dc618a --- /dev/null +++ b/boost/examples/mf-err-timeout-on-send/main.go @@ -0,0 +1,111 @@ +package main + +import ( + "context" + "fmt" + "sync" + "time" + + "github.com/snivilised/lorax/boost" + "github.com/snivilised/lorax/internal/lo" +) + +// Demonstrates Timeout On Send +func main() { + var wg sync.WaitGroup + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + fmt.Printf("โฑ๏ธ timeout on send: '%.2f's\n", TimeoutOnSend.Seconds()) + pool, err := boost.NewManifoldFuncPool( + ctx, AntsSize, func(input int) (int, error) { + time.Sleep(time.Duration(input) * time.Millisecond) + + return n + 1, nil + }, &wg, + boost.WithOutput(OutputChSize, CheckCloseInterval, TimeoutOnSend), + ) + + defer pool.Release(ctx) + if cc := pool.CancelCh(); cc != nil { + boost.StartCancellationMonitor(ctx, cancel, &wg, cc, func() { + fmt.Print("๐Ÿ”ด cancellation received, cancelling...\n") + }) + } + + if err != nil { + fmt.Printf("๐Ÿ”ฅ error creating pool: '%v'\n", err) + return + } + + wg.Add(1) + go produce(ctx, pool, &wg) + + wg.Add(1) + go consume(ctx, pool, &wg) + + fmt.Printf("pool with func, no of running workers:%d\n", + pool.Running(), + ) + wg.Wait() + fmt.Println("๐Ÿ (manifold-func-pool, timeout on send) FINISHED") +} + +const ( + AntsSize = 1000 + n = 100000 + OutputChSize = 10 + Param = 100 + CheckCloseInterval = time.Second / 10 + TimeoutOnSend = time.Second +) + +func produce(ctx context.Context, + pool *boost.ManifoldFuncPool[int, int], + wg *sync.WaitGroup, +) { + defer wg.Done() + + for i, n := 0, 100; i < n; i++ { + _ = pool.Post(ctx, Param) + } + + // required to inform the worker pool that no more jobs will be submitted. + // failure to invoke Conclude will result in a never ending worker pool. + // + pool.Conclude(ctx) +} + +func consume(ctx context.Context, + pool *boost.ManifoldFuncPool[int, int], + wg *sync.WaitGroup, +) { + defer wg.Done() + + const ( + fast = time.Second / 10 + slow = time.Second * 3 + barrier = 10 + ) + + for count := 0; ; count++ { + // Slow consumer after 10 iterations, resulting in a timeout + // + time.Sleep( + lo.Ternary(count > barrier, slow, fast), + ) + + // NB: can not range over the observe channel since range + // is non-preempt-able and therefore does not react to + // ctx.Done. + select { + case output := <-pool.Observe(): + fmt.Printf("๐Ÿ’ payload: '%v', id: '%v', seq: '%v' (e: '%v')\n", + output.Payload, output.ID, output.SequenceNo, output.Error, + ) + case <-ctx.Done(): + return + } + } +} diff --git a/boost/examples/delta/main.go b/boost/examples/mf-input-injected-via-chan/main.go similarity index 93% rename from boost/examples/delta/main.go rename to boost/examples/mf-input-injected-via-chan/main.go index f0d8576..d4d2a3b 100644 --- a/boost/examples/delta/main.go +++ b/boost/examples/mf-input-injected-via-chan/main.go @@ -14,6 +14,42 @@ import ( // Submission to the pool occurs via an input channel as opposed // directly invoking Post on the pool. +func main() { + var wg sync.WaitGroup + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + pool, err := boost.NewManifoldFuncPool( + ctx, AntsSize, func(input int) (int, error) { + time.Sleep(time.Duration(input) * time.Millisecond) + + return n + 1, nil + }, &wg, + boost.WithInput(InputChSize), + boost.WithOutput(OutputChSize, CheckCloseInterval, TimeoutOnSend), + ) + + defer pool.Release(ctx) + + if err != nil { + fmt.Printf("๐Ÿ”ฅ error creating pool: '%v'\n", err) + return + } + + wg.Add(1) + go inject(ctx, pool, &wg) + + wg.Add(1) + go consume(ctx, pool, &wg) + + fmt.Printf("pool with func, no of running workers:%d\n", + pool.Running(), + ) + wg.Wait() + fmt.Println("๐Ÿ (manifold-func-pool) FINISHED") +} + const ( AntsSize = 1000 n = 100000 @@ -22,6 +58,7 @@ const ( Param = 100 OutputChTimeout = time.Second / 2 // do not use a value that is similar to interval CheckCloseInterval = time.Second / 10 + TimeoutOnSend = time.Second * 2 ) func inject(ctx context.Context, @@ -61,39 +98,3 @@ func consume(_ context.Context, ) } } - -func main() { - var wg sync.WaitGroup - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - pool, err := boost.NewManifoldFuncPool( - ctx, AntsSize, func(input int) (int, error) { - time.Sleep(time.Duration(input) * time.Millisecond) - - return n + 1, nil - }, &wg, - boost.WithInput(InputChSize), - boost.WithOutput(OutputChSize, CheckCloseInterval), - ) - - defer pool.Release(ctx) - - if err != nil { - fmt.Printf("๐Ÿ”ฅ error creating pool: '%v'\n", err) - return - } - - wg.Add(1) - go inject(ctx, pool, &wg) //nolint:wsl // pendant - - wg.Add(1) - go consume(ctx, pool, &wg) //nolint:wsl // pendant - - fmt.Printf("pool with func, no of running workers:%d\n", - pool.Running(), - ) - wg.Wait() - fmt.Println("๐Ÿ (manifold-func-pool) FINISHED") -} diff --git a/boost/examples/beta/main.go b/boost/examples/tp-throttled/main.go similarity index 100% rename from boost/examples/beta/main.go rename to boost/examples/tp-throttled/main.go diff --git a/boost/generic-pool.go b/boost/generic-pool.go index 7a13046..bc1c732 100644 --- a/boost/generic-pool.go +++ b/boost/generic-pool.go @@ -2,9 +2,12 @@ package boost import ( "context" + "errors" "sync" + "time" "github.com/snivilised/lorax/internal/ants" + "github.com/snivilised/lorax/internal/lo" ) // functionalPool @@ -12,18 +15,22 @@ type functionalPool struct { pool *ants.PoolWithFunc } +// Post submits a task to the pool. func (p *functionalPool) Post(ctx context.Context, job InputParam) error { return p.pool.Invoke(ctx, job) } +// Release closes this pool and releases the worker queue. func (p *functionalPool) Release(ctx context.Context) { p.pool.Release(ctx) } +// Running returns the number of workers currently running. func (p *functionalPool) Running() int { return p.pool.Running() } +// Waiting returns the number of tasks waiting to be executed. func (p *functionalPool) Waiting() int { return p.pool.Waiting() } @@ -33,18 +40,22 @@ type taskPool struct { pool *ants.Pool } +// Post submits a task to the pool. func (p *taskPool) Post(ctx context.Context, task TaskFunc) error { return p.pool.Submit(ctx, task) } +// Release closes this pool and releases the worker queue. func (p *taskPool) Release(ctx context.Context) { p.pool.Release(ctx) } +// Running returns the number of workers currently running. func (p *taskPool) Running() int { return p.pool.Running() } +// Waiting returns the number of tasks waiting to be executed. func (p *taskPool) Waiting() int { return p.pool.Waiting() } @@ -57,7 +68,7 @@ func source[I any](ctx context.Context, inputDupCh := NewDuplex(make(SourceStream[I], o.Input.BufferSize)) wg.Add(1) - go func(ctx context.Context) { //nolint:wsl // pedant + go func(ctx context.Context) { defer func() { closable.terminate() wg.Done() @@ -79,3 +90,53 @@ func source[I any](ctx context.Context, return inputDupCh } + +func newOutputInfo[O any](o *Options) *outputInfo[O] { + if o.Output == nil { + return nil + } + + return &outputInfo[O]{ + cancelDupCh: NewDuplex(make(CancelStream, 1)), + outputDupCh: NewDuplex(make(JobOutputStream[O], o.Output.BufferSize)), + } +} + +// fromOutputs assumes o.Output is defined +func fromOutputInfo[O any](o *Options, oi *outputInfo[O]) *outputInfoW[O] { + const never = time.Hour * 50000 + + timeout := lo.TernaryF(o.Output != nil, + func() time.Duration { + return max(o.Output.TimeoutOnSend, ants.MinimumTimeoutOnSend) + }, + func() time.Duration { + return never + }, + ) + + return &outputInfoW[O]{ + cancelCh: oi.cancelDupCh.WriterCh, + outputCh: oi.outputDupCh.WriterCh, + timeoutOnSend: timeout, + } +} + +func respond[O any](ctx context.Context, wi *outputInfoW[O], output *JobOutput[O]) (err error) { + select { + case wi.outputCh <- *output: + return nil + case <-time.After(wi.timeoutOnSend): + select { + case <-ctx.Done(): + err = ctx.Err() + case wi.cancelCh <- CancelWorkSignal{}: + err = errors.New("timeout") + } + + case <-ctx.Done(): + err = ctx.Err() + } + + return err +} diff --git a/boost/options.go b/boost/options.go index ddd33b4..83d6e34 100644 --- a/boost/options.go +++ b/boost/options.go @@ -1,11 +1,5 @@ package boost -import ( - "time" - - "github.com/samber/lo" -) - // withDefaults prepends boost withDefaults to the sequence of options func withDefaults(options ...Option) []Option { const ( @@ -19,15 +13,3 @@ func withDefaults(options ...Option) []Option { return o } - -func GetValidatedCheckCloseInterval(o *Options) time.Duration { - return lo.TernaryF( - o.Output != nil && o.Output.CheckCloseInterval > minimumCheckCloseInterval, - func() time.Duration { - return o.Output.CheckCloseInterval - }, - func() time.Duration { - return minimumCheckCloseInterval - }, - ) -} diff --git a/boost/pool-defs-internal.go b/boost/pool-defs-internal.go index 5bfd0c5..99eff03 100644 --- a/boost/pool-defs-internal.go +++ b/boost/pool-defs-internal.go @@ -63,6 +63,17 @@ func (f terminator) terminate() { f() } +type outputInfo[O any] struct { + outputDupCh *Duplex[JobOutput[O]] + cancelDupCh *Duplex[CancelWorkSignal] +} + +type outputInfoW[O any] struct { + outputCh JobOutputStreamW[O] + cancelCh CancelStreamW + timeoutOnSend time.Duration +} + // Worker pool types: // // ๐Ÿบ ManifoldFuncPool (to be used by traverse): diff --git a/boost/support_test.go b/boost/support_test.go index e3f797b..224d233 100644 --- a/boost/support_test.go +++ b/boost/support_test.go @@ -30,6 +30,7 @@ const ( BenchParam = 10 DefaultExpiredTime = 10 * time.Second CheckCloseInterval = time.Second / 100 + TimeoutOnSend = time.Second ) var curMem uint64 diff --git a/boost/worker-pool-func-manifold.go b/boost/worker-pool-func-manifold.go index 264094b..e123584 100644 --- a/boost/worker-pool-func-manifold.go +++ b/boost/worker-pool-func-manifold.go @@ -6,7 +6,6 @@ import ( "time" "github.com/snivilised/lorax/internal/ants" - "github.com/snivilised/lorax/internal/lo" ) type ( @@ -33,29 +32,24 @@ func NewManifoldFuncPool[I, O any](ctx context.Context, wg *sync.WaitGroup, options ...Option, ) (*ManifoldFuncPool[I, O], error) { - var outputDupCh *Duplex[JobOutput[O]] - o := ants.LoadOptions(withDefaults(options...)...) - if o.Output != nil { - outputDupCh = NewDuplex(make(JobOutputStream[O], o.Output.BufferSize)) + var ( + oi *outputInfo[O] + wi *outputInfoW[O] + o = ants.LoadOptions(withDefaults(options...)...) + ) + + if oi = newOutputInfo[O](o); oi != nil { + wi = fromOutputInfo(o, oi) } pool, err := ants.NewPoolWithFunc(ctx, size, func(input ants.InputParam) { - wch := lo.TernaryF(outputDupCh != nil, - func() JobOutputStreamW[O] { - return outputDupCh.WriterCh - }, - func() JobOutputStreamW[O] { - return nil - }, - ) - - manifoldFuncResponse(ctx, mf, input, wch) + manifoldFuncResponse(ctx, mf, input, wi) }, ants.WithOptions(*o)) return &ManifoldFuncPool[I, O]{ basePool: basePool[I, O]{ - wg: wg, - outputDupCh: outputDupCh, + wg: wg, + oi: oi, }, functionalPool: functionalPool{ pool: pool, @@ -98,21 +92,21 @@ func (p *ManifoldFuncPool[I, O]) Source(ctx context.Context, } // Conclude signifies to the worker pool that no more work will be -// submitted to the pool. Submitting to the pool directly using the +// submitted. When submitting to the pool directly using the // Post method, the client must call this method. Failure to do so // will result in a pool that never ends. When the client elects -// to use an input channel, but invoking Source, then Conclude will +// to use an input channel, by invoking Source, then Conclude will // be called automatically as long as the input channel has been closed. // Failure to close the channel will again result in a never ending // worker pool. func (p *ManifoldFuncPool[I, O]) Conclude(ctx context.Context) { - if p.outputDupCh != nil && !p.ending { + if p.oi != nil && !p.ending { p.ending = true o := p.pool.GetOptions() - interval := GetValidatedCheckCloseInterval(o) + interval := max(o.Output.CheckCloseInterval, ants.MinimumCheckCloseInterval) p.wg.Add(1) - go func(ctx context.Context, //nolint:wsl // pendant + go func(ctx context.Context, pool *ManifoldFuncPool[I, O], wg *sync.WaitGroup, interval time.Duration, @@ -126,7 +120,7 @@ func (p *ManifoldFuncPool[I, O]) Conclude(ctx context.Context) { case <-time.After(interval): if pool.Running() == 0 && pool.Waiting() == 0 { - close(p.outputDupCh.Channel) + close(p.oi.outputDupCh.Channel) return } } @@ -136,8 +130,9 @@ func (p *ManifoldFuncPool[I, O]) Conclude(ctx context.Context) { } func manifoldFuncResponse[I, O any](ctx context.Context, - mf ManifoldFunc[I, O], input ants.InputParam, - outputCh JobOutputStreamW[O], + mf ManifoldFunc[I, O], + input ants.InputParam, + wi *outputInfoW[O], ) { if job, ok := input.(Job[I]); ok { payload, e := mf(job.Input) @@ -149,12 +144,8 @@ func manifoldFuncResponse[I, O any](ctx context.Context, Error: e, } - if outputCh != nil { - select { - case outputCh <- output: - // TODO: add a timeout case - case <-ctx.Done(): - } + if wi != nil { + _ = respond(ctx, wi, &output) } } } diff --git a/boost/worker-pool-func-manifold_test.go b/boost/worker-pool-func-manifold_test.go index 65b7d96..63fca1f 100644 --- a/boost/worker-pool-func-manifold_test.go +++ b/boost/worker-pool-func-manifold_test.go @@ -3,7 +3,6 @@ package boost_test import ( "context" "sync" - "time" . "github.com/onsi/ginkgo/v2" //nolint:revive // ginkgo ok . "github.com/onsi/gomega" //nolint:revive // gomega ok @@ -64,7 +63,7 @@ var _ = Describe("WorkerPoolFuncManifold", func() { pool, err := boost.NewManifoldFuncPool( ctx, AntsSize, demoPoolManifoldFunc, &wg, - boost.WithOutput(10, time.Second/100), + boost.WithOutput(10, CheckCloseInterval, TimeoutOnSend), ) defer pool.Release(ctx) @@ -123,7 +122,7 @@ var _ = Describe("WorkerPoolFuncManifold", func() { pool, err := boost.NewManifoldFuncPool( ctx, AntsSize, demoPoolManifoldFunc, &wg, boost.WithInput(InputBufferSize), - boost.WithOutput(10, CheckCloseInterval), + boost.WithOutput(10, CheckCloseInterval, TimeoutOnSend), ) defer pool.Release(ctx) @@ -187,6 +186,43 @@ var _ = Describe("WorkerPoolFuncManifold", func() { }) }) }) + + Context("timeout on send, with cancellation monitor", func() { + When("output requested, but accidentally not consumed by client", func() { + It("๐Ÿงช should: cancel context and terminate", func(specCtx SpecContext) { + // TestNonblockingSubmit + var wg sync.WaitGroup + + ctx, cancel := context.WithCancel(specCtx) + defer cancel() + + pool, err := boost.NewManifoldFuncPool( + ctx, AntsSize, demoPoolManifoldFunc, &wg, + boost.WithInput(InputBufferSize), + boost.WithOutput(10, CheckCloseInterval, TimeoutOnSend), + ) + + defer pool.Release(ctx) + + wg.Add(1) + go inject(ctx, pool, &wg) + + boost.StartCancellationMonitor(ctx, + cancel, + &wg, + pool.CancelCh(), + func() {}, + ) + wg.Wait() + GinkgoWriter.Printf("pool with func, no of running workers:%d\n", + pool.Running(), + ) + ShowMemStats() + + Expect(err).To(Succeed()) + }) + }) + }) }) }) }) diff --git a/internal/ants/options.go b/internal/ants/options.go index f245215..f82ffe2 100644 --- a/internal/ants/options.go +++ b/internal/ants/options.go @@ -61,6 +61,22 @@ type InputOptions struct { BufferSize uint } +const ( + // MinimumCheckCloseInterval denotes the minimum duration of how long to wait + // in between successive attempts to check wether the output channel can be + // closed when the source of the workload indicates no more jobs will be + // submitted, either by closing the input stream or invoking Conclude on the pool. + // + MinimumCheckCloseInterval = time.Millisecond * 10 + + // MinimumTimeoutOnSend denotes the minimum duration of how long to allow for + // when sending output. When this timeout occurs, the worker will send a + // cancellation request back to the client via the cancellation channel at which + // point it can cancel the whole worker pool. + // + MinimumTimeoutOnSend = time.Millisecond * 10 +) + type OutputOptions struct { // BufferSize BufferSize uint @@ -71,6 +87,13 @@ type OutputOptions struct { // by closing the input stream or invoking Conclude on the pool. // CheckCloseInterval time.Duration + + // TimeoutOnSend denotes how long to allow for when sending output. + // When this timeout occurs, the worker will send a cancellation + // request back to the client via the cancellation channel at which + // point it can cancel the whole worker pool. + // + TimeoutOnSend time.Duration } // WithOptions accepts the whole options config. @@ -148,11 +171,12 @@ func WithInput(size uint) Option { } } -func WithOutput(size uint, interval time.Duration) Option { +func WithOutput(size uint, interval, timeout time.Duration) Option { return func(opts *Options) { opts.Output = &OutputOptions{ BufferSize: size, - CheckCloseInterval: interval, + CheckCloseInterval: max(interval, MinimumCheckCloseInterval), + TimeoutOnSend: max(timeout, MinimumTimeoutOnSend), } } }