From a2dca17e9b5b7448c467ab715ae543b2a6e58d1b Mon Sep 17 00:00:00 2001 From: plastikfan Date: Tue, 8 Aug 2023 08:50:08 +0100 Subject: [PATCH] feat(async): change worker gr lifetime (#4) --- .vscode/settings.json | 10 +- async/pool-defs-internal.go | 13 +- async/pool-defs.go | 10 ++ async/worker-pool.go | 270 ++++++++++++++++++++++-------- async/worker-pool_test.go | 256 +++++++++++----------------- async/worker.go | 50 ++++-- go.mod | 2 + go.sum | 2 + internal/helpers/test-consumer.go | 28 ++-- internal/helpers/test-producer.go | 58 ++++--- 10 files changed, 416 insertions(+), 283 deletions(-) diff --git a/.vscode/settings.json b/.vscode/settings.json index 34dbfe1..bcfed64 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -16,11 +16,13 @@ "exportloopref", "extendio", "fieldalignment", + "gobby", "goconst", "gocritic", "gocyclo", "gofmt", "goimports", + "goleak", "gomnd", "gosec", "gosimple", @@ -28,13 +30,18 @@ "graffico", "ineffassign", "jibberjabber", + "leaktest", "linters", + "lorax", "nakedret", + "nolint", "nolintlint", "pixa", "prealloc", "repotoken", "sidewalk", + "skeletor", + "smaug", "staticcheck", "structcheck", "stylecheck", @@ -45,6 +52,7 @@ "unparam", "varcheck", "watchvc", - "watchvi" + "watchvi", + "xenomorph" ] } diff --git a/async/pool-defs-internal.go b/async/pool-defs-internal.go index 898725b..5a4ef12 100644 --- a/async/pool-defs-internal.go +++ b/async/pool-defs-internal.go @@ -1,14 +1,15 @@ package async -type workerInfo[I, R any] struct { - job Job[I] - resultsOut ResultStreamOut[R] - finishedOut FinishedStreamOut -} - const ( // TODO: This is just temporary, channel size definition still needs to be // fine tuned // DefaultChSize = 100 ) + +type workerWrapper[I any, R any] struct { + cancelChOut chan<- CancelWorkSignal + core *worker[I, R] +} + +type workersCollection[I, R any] map[WorkerID]*workerWrapper[I, R] diff --git a/async/pool-defs.go b/async/pool-defs.go index d52bb20..7c9e2ee 100644 --- a/async/pool-defs.go +++ b/async/pool-defs.go @@ -1,5 +1,9 @@ package async +const ( + MaxWorkers = 100 +) + // Job, this definition is very rudimentary and bears no resemblance to the final // version. The job definition should be data driven not functionally driven. We // could have a bind function/method that would bind data to the job fn. @@ -11,6 +15,7 @@ package async // type Job[I any] struct { + ID string Input I } @@ -39,3 +44,8 @@ type WorkerID string type FinishedStream = chan WorkerID type FinishedStreamIn = <-chan WorkerID type FinishedStreamOut = chan<- WorkerID + +// joinChannelsFunc allows reader channel to be joined to the writer channel. This +// function is called when entry is found on the input and forwarded to the +// output. +type JoinChannelsFunc[T any] func(inCh <-chan T, outCh chan<- T) diff --git a/async/worker-pool.go b/async/worker-pool.go index 15c3c3c..f380f92 100644 --- a/async/worker-pool.go +++ b/async/worker-pool.go @@ -125,119 +125,251 @@ is of no value. In this case, the pool must decide how to define closure. Perhap a dummy consumer. */ -// The WorkerPool owns the resultOut channel, because it is the only entity that knows +// privateWpInfo contains any state that needs to be mutated in a non concurrent manner +// and therefore should be exclusively accessed by a single go routine. Actually, due to +// our ability to compose functionality with channels as opposed to shared state, the +// pool does not contain any state that is accessed directly or indirectly from other +// go routines. But in the case of the actual core pool, it is mutated without synchronisation +// and hence should only ever be accessed by the worker pool GR in contrast to all the +// other members of WorkerPool. This is an experimental pattern, the purpose of which +// is the clearly indicate what state can be accessed in different concurrency contexts, +// to ensure future updates can be applied with minimal cognitive overload. +// +// There is another purpose for privateWpInfo and that is to do with "confinement" as +// described on page 86 of CiG. The aim here is to use "lexical confinement" for +// duplex channel definitions, so although a channel is thread safe so ordinarily +// would not be a candidate member of privateWpInfo, a duplex channel ought to be +// protected from accidentally being used incorrectly, ie trying to write to a channel +// that is meant to be read only. So methods that use a channel should now receive the +// channel through a method parameter (defined as either chan<-, or <-chan), rather +// than be expected to simply access the member variable directly. This clearly signals +// that any channel defined in privateWpInfo should never to accessed directly (other +// than for passing it to another method). This is an experimental convention that +// I'm establishing for all snivilised projects. +type privateWpInfo[I, R any] struct { + pool workersCollection[I, R] + workersJobsCh chan Job[I] + finishedCh FinishedStream + cancelCh CancelStream +} + +// WorkerPool owns the resultOut channel, because it is the only entity that knows // when all workers have completed their work due to the finished channel, which it also // owns. - type WorkerPool[I, R any] struct { - fn Executive[I, R] - noWorkers int - JobsCh JobStream[I] - ResultsCh ResultStream[R] - CancelCh CancelStream - Quit *sync.WaitGroup - pool workersCollection[I, R] - finishedCh FinishedStream + private privateWpInfo[I, R] + fn Executive[I, R] + noWorkers int + SourceJobsChIn <-chan Job[I] + + Quit *sync.WaitGroup } type NewWorkerPoolParams[I, R any] struct { - Exec Executive[I, R] - JobsCh JobStream[I] - Cancel CancelStream - Quit *sync.WaitGroup + NoWorkers int + Exec Executive[I, R] + JobsCh chan Job[I] + CancelCh CancelStream + Quit *sync.WaitGroup } func NewWorkerPool[I, R any](params *NewWorkerPoolParams[I, R]) *WorkerPool[I, R] { + noWorkers := runtime.NumCPU() + if params.NoWorkers > 1 && params.NoWorkers <= MaxWorkers { + noWorkers = params.NoWorkers + } + wp := &WorkerPool[I, R]{ - fn: params.Exec, - noWorkers: runtime.NumCPU(), - JobsCh: params.JobsCh, - CancelCh: params.Cancel, - Quit: params.Quit, - - // workers collection might not be necessary; only using here at the - // moment, so it is easy to track how many workers are running at - // any 1 time. - // - pool: make(workersCollection[I, R]), - finishedCh: make(FinishedStream, DefaultChSize), + private: privateWpInfo[I, R]{ + pool: make(workersCollection[I, R], noWorkers), + workersJobsCh: make(chan Job[I], noWorkers), + finishedCh: make(FinishedStream, noWorkers), + cancelCh: params.CancelCh, + }, + fn: params.Exec, + noWorkers: noWorkers, + SourceJobsChIn: params.JobsCh, + + Quit: params.Quit, } return wp } -// Run -func (p *WorkerPool[I, R]) Run(ctx context.Context, resultsOut ResultStreamOut[R]) { +// This helps to visualise the activity of the different work threads. Its easier to +// eyeball emojis than worker IDs. +var eyeballs = []string{ + "❀️", "πŸ’™", "πŸ’š", "πŸ’œ", "πŸ’›", "🀍", "πŸ’–", "πŸ’—", "πŸ’", +} + +func (p *WorkerPool[I, R]) composeID() WorkerID { + n := len(p.private.pool) + 1 + emoji := eyeballs[(n-1)%p.noWorkers] + + return WorkerID(fmt.Sprintf("(%v)WORKER-ID-%v:%v", emoji, n, uuid.NewString())) +} + +func (p *WorkerPool[I, R]) Start( + ctx context.Context, + resultsChOut ResultStreamOut[R], +) { + p.run(ctx, p.private.workersJobsCh, resultsChOut) +} + +func (p *WorkerPool[I, R]) run( + ctx context.Context, + forwardChOut chan<- Job[I], + resultsChOut ResultStreamOut[R], +) { defer func() { - fmt.Printf("<--- WorkerPool finished (Quit). 🧊🧊🧊\n") + close(resultsChOut) p.Quit.Done() - close(resultsOut) + fmt.Printf("<--- WorkerPool.run (QUIT). 🧊🧊🧊\n") }() - fmt.Println("---> 🧊 WorkerPool.Run") + fmt.Println("===> 🧊 WorkerPool.run") for running := true; running; { select { case <-ctx.Done(): - fmt.Println("---> 🧊 WorkerPool.Run - done received ☒️☒️☒️") + fmt.Println("===> 🧊 WorkerPool.run - done received ☒️☒️☒️") + p.cancelWorkers() running = false - case job, ok := <-p.JobsCh: + case job, ok := <-p.SourceJobsChIn: if ok { - fmt.Println("---> 🧊 WorkerPool.Run - new job received") - - p.dispatch(ctx, &workerInfo[I, R]{ - job: job, - resultsOut: resultsOut, - finishedOut: p.finishedCh, - }) + fmt.Printf("===> 🧊 (#workers: '%v') WorkerPool.run - new job received\n", + len(p.private.pool), + ) + + if len(p.private.pool) < p.noWorkers { + p.spawn(ctx, p.private.workersJobsCh, resultsChOut, p.private.finishedCh) + } + select { + case forwardChOut <- job: + fmt.Printf("===> 🧊 WorkerPool.run - forwarded job 🧿🧿🧿(%v)\n", job.ID) + case <-ctx.Done(): // ☣️☣️☣️ CHECK THIS, IT MIGHT BE INVALID + fmt.Printf("===> 🧊 (#workers: '%v') WorkerPool.run - done received ☒️☒️☒️\n", + len(p.private.pool), + ) + } } else { + // ⚠️ This close is essential. Since the pool acts as a bridge between + // 2 channels (p.SourceJobsChIn and p.private.workersJobsCh), when the + // producer closes p.SourceJobsChIn, we need to delegate that closure + // to p.private.workersJobsCh, otherwise we end up in a deadlock. + // + close(p.private.workersJobsCh) + fmt.Printf("===> πŸš€ WorkerPool.run(source jobs chan closed) πŸŸ₯πŸŸ₯πŸŸ₯\n") running = false } - - case workerID := <-p.finishedCh: - fmt.Printf("---> 🧊 WorkerPool.Run - worker(%v) finished\n", workerID) - delete(p.pool, workerID) } } - // we still need to wait for all workers to finish ... + // We still need to wait for all workers to finish ... Note how we + // don't pass in the context's Done() channel as it already been consumed + // in the run loop, and is now closed. // - p.drain(ctx) + p.drain(p.private.finishedCh) + + fmt.Printf("===> 🧊 WorkerPool.run - drain complete (workers count: '%v'). πŸŽƒπŸŽƒπŸŽƒ\n", + len(p.private.pool), + ) } -func (p *WorkerPool[I, R]) drain(ctx context.Context) { - // The remaining number of workers displayed here is not necessarily - // accurate. - // +func (p *WorkerPool[I, R]) spawn( + ctx context.Context, + jobsInCh <-chan Job[I], + resultsChOut ResultStreamOut[R], + finishedChOut FinishedStreamOut, +) { + cancelCh := make(chan CancelWorkSignal, 1) + + w := &workerWrapper[I, R]{ + core: &worker[I, R]{ + id: p.composeID(), + fn: p.fn, + jobsInCh: jobsInCh, + resultsOutCh: resultsChOut, + finishedChOut: finishedChOut, + cancelChIn: cancelCh, + }, + cancelChOut: cancelCh, + } + + p.private.pool[w.core.id] = w + go w.core.run(ctx) + fmt.Printf("===> 🧊 WorkerPool.spawned new worker: '%v' πŸŽ€πŸŽ€πŸŽ€\n", w.core.id) +} + +func (p *WorkerPool[I, R]) drain(finishedChIn FinishedStreamIn) { fmt.Printf( "!!!! 🧊 WorkerPool.drain - waiting for remaining workers: %v (#GRs: %v); 🧊🧊🧊 \n", - len(p.pool), runtime.NumGoroutine(), + len(p.private.pool), runtime.NumGoroutine(), ) for running := true; running; { - select { - case <-ctx.Done(): - running = false - - case workerID := <-p.finishedCh: - fmt.Printf("---> 🧊 WorkerPool.drain - worker(%v) finished\n", workerID) - delete(p.pool, workerID) + // πŸ“ Here, we don't access the finishedChIn channel in a pre-emptive way via + // the ctx.Done() channel. This is because in a unit test, we define a timeout as + // part of the test spec using SpecTimeout. When this fires, this is handled by the + // run loop, which ends that loop then enters drain. When this happens, you can't + // reuse that same done channel as it will immediately return the value already + // handled. This has the effect of short-circuiting this loop meaning that + // workerID := <-finishedChIn never has a chance to be selected and the drain loop + // exits early. The end result of which means that the p.private.pool collection is + // never depleted. + // + // ⚠️ So an important lesson to be learnt here is that once a ctx.Done() has fired, + // you can't reuse tha same channel in another select statement as it will simply + // return immediately, bypassing all the others cases in the select statement. + // + // Some noteworthy points: + // + // πŸ’Ž Safe Access: Accessing the Done() channel concurrently from multiple goroutines + // is safe. Reading from a closed channel is well-defined behaviour in Go and won't + // cause panics or issues. + // + // πŸ’Ž Cancellation Handling: When a context is canceled, the Done() channel is closed, + // and any goroutine waiting on the channel will be unblocked. Each goroutine needs to + // have its own select statement to handle the context's cancellation event properly. + // + // πŸ’Ž Synchronisation: If multiple goroutines are going to react to the context's + // cancellation, you need to make sure that any shared resources accessed by these + // goroutines are synchronized properly to avoid race conditions. This might involve + // using mutexes or other synchronization primitives. + // + // πŸ’Ž Propagation: If a goroutine creates a child context using context.WithCancel + // or context.WithTimeout, the child goroutines should use the child context for their + // operations instead of the parent context. This ensures that the child context's + // cancellation doesn't affect unrelated goroutines. + // + // πŸ’Ž Lifetime Management: Be aware of the lifetimes of the contexts and goroutines. + // If a goroutine outlives its context or keeps references to closed Done() channels, + // it might not behave as expected. + // + workerID := <-finishedChIn + delete(p.private.pool, workerID) - if len(p.pool) == 0 { - running = false - } + if len(p.private.pool) == 0 { + running = false } + + fmt.Printf("!!!! 🧊 WorkerPool.drain - worker(%v) finished, remaining: '%v' πŸŸ₯\n", + workerID, len(p.private.pool), + ) } } -func (p *WorkerPool[I, R]) dispatch(ctx context.Context, info *workerInfo[I, R]) { - w := &worker[I, R]{ - id: WorkerID("WORKER-ID:" + uuid.NewString()), - fn: p.fn, +func (p *WorkerPool[I, R]) cancelWorkers() { + // perhaps, we can replace this with another broadcast mechanism such as sync.Cond + // + n := len(p.private.pool) + for k, w := range p.private.pool { + fmt.Printf("===> 🧊 cancelling worker '%v' of %v πŸ“›πŸ“›πŸ“›... \n", k, n) + // shouldn't need to be preemptable because it is a buffered single item channel + // which should only ever be accessed by the work pool GR and therefore should + // never be a position where its competing to send on that channel + // + w.cancelChOut <- CancelWorkSignal{} } - p.pool[w.id] = w - fmt.Printf("---> 🧊 (pool-size: %v) dispatch worker: id-'%v'\n", len(p.pool), w.id) - - go w.accept(ctx, info) // BREAKS: when cancellation occurs, send on closed chan } diff --git a/async/worker-pool_test.go b/async/worker-pool_test.go index 58d10ff..f8ba312 100644 --- a/async/worker-pool_test.go +++ b/async/worker-pool_test.go @@ -7,6 +7,7 @@ import ( "sync" "time" + "github.com/fortytw2/leaktest" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" @@ -14,12 +15,31 @@ import ( "github.com/snivilised/lorax/internal/helpers" ) +func init() { rand.Seed(time.Now().Unix()) } + const ( JobChSize = 10 ResultChSize = 10 Delay = 750 ) +var audience = []string{ + "πŸ‘» caspar", + "πŸ§™ gandalf", + "😺 garfield", + "πŸ‘Ί gobby", + "πŸ‘Ώ nick", + "πŸ‘Ή ogre", + "πŸ‘½ paul", + "πŸ¦„ pegasus", + "πŸ’© poo", + "πŸ€– rusty", + "πŸ’€ skeletor", + "πŸ‰ smaug", + "πŸ§›β€β™€οΈ vampire", + "πŸ‘Ύ xenomorph", +} + type TestJobInput struct { sequenceNo int // allocated by observer Recipient string @@ -30,7 +50,7 @@ func (i TestJobInput) SequenceNo() int { } type TestJobResult = string -type TestResultChan chan async.JobResult[TestJobResult] +type TestResultStream chan async.JobResult[TestJobResult] type exec struct { } @@ -41,199 +61,121 @@ func (e *exec) Invoke(j async.Job[TestJobInput]) (async.JobResult[TestJobResult] time.Sleep(delay) result := async.JobResult[TestJobResult]{ - Payload: fmt.Sprintf(" ---> exec.Invoke [Seq: %v]πŸ‰ Hello: '%v'", + Payload: fmt.Sprintf(" ---> πŸ‰πŸ‰πŸ‰ [Seq: %v] Hello: '%v'", j.Input.SequenceNo(), j.Input.Recipient, ), } - fmt.Println(result.Payload) return result, nil } -var _ = Describe("WorkerPool", func() { - Context("producer/consumer", func() { - When("given: a stream of jobs", func() { - It("πŸ§ͺ should: receive and process all", func(specCtx SpecContext) { - var ( - wg sync.WaitGroup - ) - sequence := 0 +type pipeline[I, R any] struct { + wg sync.WaitGroup + sequence int + resultsCh chan async.JobResult[R] + provider helpers.ProviderFn[I] + producer *helpers.Producer[I, R] + pool *async.WorkerPool[I, R] + consumer *helpers.Consumer[R] +} - resultsCh := make(chan async.JobResult[TestJobResult], ResultChSize) +func start[I, R any]() *pipeline[I, R] { + resultsCh := make(chan async.JobResult[R], ResultChSize) - wg.Add(1) - By("πŸ‘Ύ WAIT-GROUP ADD(producer)") + pipe := &pipeline[I, R]{ + resultsCh: resultsCh, + } - provider := func() TestJobInput { - sequence++ - return TestJobInput{ - sequenceNo: sequence, - Recipient: "jimmy 🦊", - } - } - - producer := helpers.NewProducer[TestJobInput, TestJobResult](specCtx, &wg, JobChSize, provider, Delay) - pool := async.NewWorkerPool[TestJobInput, TestJobResult](&async.NewWorkerPoolParams[TestJobInput, TestJobResult]{ - Exec: &exec{}, - JobsCh: producer.JobsCh, - Cancel: make(async.CancelStream), - Quit: &wg, - }) + return pipe +} - wg.Add(1) - By("πŸ‘Ύ WAIT-GROUP ADD(worker-pool)\n") +func (p *pipeline[I, R]) startProducer(ctx context.Context, provider helpers.ProviderFn[I]) { + p.producer = helpers.StartProducer[I, R]( + ctx, + &p.wg, + JobChSize, + provider, + Delay, + ) - go pool.Run(specCtx, resultsCh) + p.wg.Add(1) +} - wg.Add(1) - By("πŸ‘Ύ WAIT-GROUP ADD(consumer)") +func (p *pipeline[I, R]) startPool(ctx context.Context, executive async.Executive[I, R]) { + p.pool = async.NewWorkerPool[I, R]( + &async.NewWorkerPoolParams[I, R]{ + NoWorkers: 5, + Exec: executive, + JobsCh: p.producer.JobsCh, + CancelCh: make(async.CancelStream), + Quit: &p.wg, + }) - consumer := helpers.NewConsumer(specCtx, &wg, resultsCh) + go p.pool.Start(ctx, p.resultsCh) - go func() { - snooze := time.Second / 5 - fmt.Printf(" >>> πŸ’€ Sleeping before requesting stop (%v) ...\n", snooze) - time.Sleep(snooze) - producer.Stop() - fmt.Printf(" >>> 🍧🍧🍧 stop submitted.\n") - }() + p.wg.Add(1) +} - wg.Wait() - fmt.Printf("<--- orpheus(alpha) finished Counts >>> (Producer: '%v', Consumer: '%v'). 🎯🎯🎯\n", - producer.Count, - consumer.Count, - ) +func (p *pipeline[I, R]) startConsumer(ctx context.Context) { + p.consumer = helpers.StartConsumer(ctx, + &p.wg, + p.resultsCh, + ) - Expect(producer.Count).To(Equal(consumer.Count)) - Eventually(specCtx, resultsCh).WithTimeout(time.Second * 2).Should(BeClosed()) - Eventually(specCtx, producer.JobsCh).WithTimeout(time.Second * 2).Should(BeClosed()) - }, SpecTimeout(time.Second*2)) - }) + p.wg.Add(1) +} - When("given: cancellation invoked before end of work", func() { - XIt("πŸ§ͺ should: close down gracefully", func(specCtx SpecContext) { - // this case shows that worker pool needs a redesign. Each worker - // go routine needs to have a lifetime that spans the lifetime of - // the session, rather than a short lifetime that matches that of - // an individual job. This will make processing more reliable, - // especially when it comes to cancellation. As it is, since the - // worker GR only exists for the lifetime of the job, when the - // job is short (in duration), it is very unlikely it will see - // the cancellation request and therefore and therefore likely - // to send to a closed channel (the result channel). - // - var ( - wg sync.WaitGroup - ) - sequence := 0 +func (p *pipeline[I, R]) stopProducerAfter(ctx context.Context, after time.Duration) { + go helpers.StopProducerAfter( + ctx, + p.producer, + after, + ) +} - resultsCh := make(chan async.JobResult[TestJobResult], ResultChSize) +var _ = Describe("WorkerPool", func() { + When("given: a stream of jobs", func() { + Context("and: Stopped", func() { + It("πŸ§ͺ should: receive and process all", func(ctx SpecContext) { + defer leaktest.Check(GinkgoT())() + pipe := start[TestJobInput, TestJobResult]() - wg.Add(1) By("πŸ‘Ύ WAIT-GROUP ADD(producer)") - - provider := func() TestJobInput { + sequence := 0 + pipe.startProducer(ctx, func() TestJobInput { + recipient := rand.Intn(len(audience)) //nolint:gosec // trivial sequence++ return TestJobInput{ sequenceNo: sequence, - Recipient: "johnny 😈", + Recipient: audience[recipient], } - } - ctx, cancel := context.WithCancel(specCtx) - - producer := helpers.NewProducer[TestJobInput, TestJobResult](ctx, &wg, JobChSize, provider, Delay) - pool := async.NewWorkerPool[TestJobInput, TestJobResult](&async.NewWorkerPoolParams[TestJobInput, TestJobResult]{ - Exec: &exec{}, - JobsCh: producer.JobsCh, - Cancel: make(async.CancelStream), - Quit: &wg, }) - wg.Add(1) By("πŸ‘Ύ WAIT-GROUP ADD(worker-pool)\n") + pipe.startPool(ctx, &exec{}) - go pool.Run(ctx, resultsCh) - - wg.Add(1) By("πŸ‘Ύ WAIT-GROUP ADD(consumer)") + pipe.startConsumer(ctx) - consumer := helpers.NewConsumer(ctx, &wg, resultsCh) - - go func() { - snooze := time.Second / 10 - fmt.Printf(" >>> πŸ’€ Sleeping before requesting cancellation (%v) ...\n", snooze) - time.Sleep(snooze) - cancel() - fmt.Printf(" >>> 🍧🍧🍧 cancel submitted.\n") - }() + By("πŸ‘Ύ NOW AWAITING TERMINATION") + pipe.stopProducerAfter(ctx, time.Second/5) + pipe.wg.Wait() - wg.Wait() fmt.Printf("<--- orpheus(alpha) finished Counts >>> (Producer: '%v', Consumer: '%v'). 🎯🎯🎯\n", - producer.Count, - consumer.Count, + pipe.producer.Count, + pipe.consumer.Count, ) - Eventually(specCtx, resultsCh).WithTimeout(time.Second * 2).Should(BeClosed()) - Eventually(specCtx, producer.JobsCh).WithTimeout(time.Second * 2).Should(BeClosed()) - }, SpecTimeout(time.Second*2)) + Expect(pipe.producer.Count).To(Equal(pipe.consumer.Count)) + Eventually(ctx, pipe.resultsCh).WithTimeout(time.Second * 5).Should(BeClosed()) + Eventually(ctx, pipe.producer.JobsCh).WithTimeout(time.Second * 5).Should(BeClosed()) + }, SpecTimeout(time.Second*5)) }) - }) - - Context("ginkgo consumer", func() { - It("πŸ§ͺ should: receive and process all", func(specCtx SpecContext) { - var ( - wg sync.WaitGroup - ) - sequence := 0 - - resultsCh := make(chan async.JobResult[TestJobResult], ResultChSize) - - wg.Add(1) - By("πŸ‘Ύ WAIT-GROUP ADD(producer)") - - provider := func() TestJobInput { - sequence++ - return TestJobInput{ - sequenceNo: sequence, - Recipient: "cosmo πŸ‘½", - } - } - - producer := helpers.NewProducer[TestJobInput, TestJobResult](specCtx, &wg, JobChSize, provider, Delay) - pool := async.NewWorkerPool[TestJobInput, TestJobResult](&async.NewWorkerPoolParams[TestJobInput, TestJobResult]{ - Exec: &exec{}, - JobsCh: producer.JobsCh, - Cancel: make(async.CancelStream), - Quit: &wg, - }) - - wg.Add(1) - By("πŸ‘Ύ WAIT-GROUP ADD(worker-pool)\n") - - go pool.Run(specCtx, resultsCh) - wg.Add(1) - By("πŸ‘Ύ WAIT-GROUP ADD(consumer)") + Context("and: Cancelled", func() { + It("πŸ§ͺ should: handle cancellation and shutdown cleanly", func(_ SpecContext) { - consumer := helpers.NewConsumer(specCtx, &wg, resultsCh) - - go func() { - snooze := time.Second / 5 - fmt.Printf(" >>> πŸ’€ Sleeping before requesting stop (%v) ...\n", snooze) - time.Sleep(snooze) - producer.Stop() - fmt.Printf(" >>> 🍧🍧🍧 stop submitted.\n") - }() - - wg.Wait() - fmt.Printf("<--- orpheus(alpha) finished Counts >>> (Producer: '%v', Consumer: '%v'). 🎯🎯🎯\n", - producer.Count, - consumer.Count, - ) - - Expect(producer.Count).To(Equal(consumer.Count)) - Eventually(specCtx, resultsCh).WithTimeout(time.Second * 2).Should(BeClosed()) - Eventually(specCtx, producer.JobsCh).WithTimeout(time.Second * 2).Should(BeClosed()) - }, SpecTimeout(time.Second*2)) + }) + }) }) }) diff --git a/async/worker.go b/async/worker.go index 94e9fde..30fc849 100644 --- a/async/worker.go +++ b/async/worker.go @@ -5,32 +5,54 @@ import ( "fmt" ) -type workersCollection[I, R any] map[WorkerID]*worker[I, R] - type worker[I any, R any] struct { id WorkerID // TODO: there is still no benefit on using an interface rather than a function, // might have to change this back to a function // - fn Executive[I, R] -} + fn Executive[I, R] + jobsInCh <-chan Job[I] + resultsOutCh ResultStreamOut[R] + finishedChOut FinishedStreamOut -func (w *worker[I, R]) accept(ctx context.Context, info *workerInfo[I, R]) { - fmt.Printf("---> πŸš€ worker.accept: '%v', input:'%v'\n", w.id, info.job.Input) - result, _ := w.fn.Invoke(info.job) - - select { // BREAKS: when cancellation occurs, send on closed chan - case <-ctx.Done(): - fmt.Println("---> πŸš€ worker.accept(result) - done received πŸ’₯πŸ’₯πŸ’₯") + // this might be better replaced with a broadcast mechanism such as sync.Cond + // + cancelChIn <-chan CancelWorkSignal +} - case info.resultsOut <- result: +func (w *worker[I, R]) run(ctx context.Context) { + defer func() { + w.finishedChOut <- w.id // ⚠️ non-pre-emptive send, but this should be ok + fmt.Printf(" <--- πŸš€ worker.run(%v) (SENT FINISHED). πŸš€πŸš€πŸš€\n", w.id) + }() + + for running := true; running; { + select { + case <-ctx.Done(): + fmt.Printf(" ---> πŸš€ worker.run(%v)(finished) - done received πŸ”ΆπŸ”ΆπŸ”Ά\n", w.id) + + running = false + case job, ok := <-w.jobsInCh: + if ok { + fmt.Printf(" ---> πŸš€ worker.run(%v)(input:'%v')\n", w.id, job.Input) + w.invoke(ctx, job) + } else { + fmt.Printf(" ---> πŸš€ worker.run(%v)(jobs chan closed) πŸŸ₯πŸŸ₯πŸŸ₯\n", w.id) + + running = false + } + } } +} + +func (w *worker[I, R]) invoke(ctx context.Context, job Job[I]) { + result, _ := w.fn.Invoke(job) select { case <-ctx.Done(): - fmt.Println("---> πŸš€ worker.accept(finished) - done received ❌❌❌") + fmt.Printf(" ---> πŸš€ worker.invoke(%v)(cancel) - done received πŸ’₯πŸ’₯πŸ’₯\n", w.id) - case info.finishedOut <- w.id: + case w.resultsOutCh <- result: } } diff --git a/go.mod b/go.mod index ba42a4b..81f6f9a 100644 --- a/go.mod +++ b/go.mod @@ -11,6 +11,8 @@ require ( go.uber.org/zap v1.25.0 ) +require github.com/fortytw2/leaktest v1.3.0 + require ( github.com/go-logr/logr v1.2.4 // indirect github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 // indirect diff --git a/go.sum b/go.sum index cf2ac23..493c663 100644 --- a/go.sum +++ b/go.sum @@ -4,6 +4,8 @@ github.com/benbjohnson/clock v1.3.0 h1:ip6w0uFQkncKQ979AypyG0ER7mqUSBdKLOgAle/AT github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw= +github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g= github.com/go-logr/logr v1.2.4 h1:g01GSCwiDw2xSZfjJ2/T9M+S6pFdcNtFYsp+Y43HYDQ= github.com/go-logr/logr v1.2.4/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 h1:tfuBGBXKqDEevZMzYi5KSi8KkcZtzBcTgAUUtapy0OI= diff --git a/internal/helpers/test-consumer.go b/internal/helpers/test-consumer.go index a67f4e3..9cd91e6 100644 --- a/internal/helpers/test-consumer.go +++ b/internal/helpers/test-consumer.go @@ -4,45 +4,53 @@ import ( "context" "fmt" "sync" + + "github.com/snivilised/lorax/async" ) type Consumer[R any] struct { - ResultsCh <-chan R + ResultsCh <-chan async.JobResult[R] quit *sync.WaitGroup Count int } -func NewConsumer[R any](ctx context.Context, wg *sync.WaitGroup, resultsCh <-chan R) *Consumer[R] { +func StartConsumer[R any]( + ctx context.Context, + wg *sync.WaitGroup, + resultsCh <-chan async.JobResult[R], +) *Consumer[R] { consumer := &Consumer[R]{ ResultsCh: resultsCh, quit: wg, } - go consumer.start(ctx) + go consumer.run(ctx) return consumer } -func (c *Consumer[R]) start(ctx context.Context) { +func (c *Consumer[R]) run(ctx context.Context) { defer func() { - fmt.Printf("===> consumer finished (Quit). πŸ’ πŸ’ πŸ’  \n") c.quit.Done() + fmt.Printf("<<<< consumer.run - finished (QUIT). πŸ’ πŸ’ πŸ’  \n") }() - fmt.Printf("===> πŸ’  consumer.start ...\n") + fmt.Printf("<<<< πŸ’  consumer.run ...\n") for running := true; running; { select { case <-ctx.Done(): - fmt.Println("---> πŸ’  consumer.start - done received πŸ’”πŸ’”πŸ’”") - running = false + fmt.Println("<<<< πŸ’  consumer.run - done received πŸ’”πŸ’”πŸ’”") + case result, ok := <-c.ResultsCh: if ok { c.Count++ - fmt.Printf("---> πŸ’  consumer.start new result arrived(#%v): '%+v' \n", c.Count, result) + fmt.Printf("<<<< πŸ’  consumer.run - new result arrived(#%v): '%+v' \n", + c.Count, result.Payload, + ) } else { running = false - fmt.Printf("---> πŸ’  consumer.start no more results available (running: %+v)\n", running) + fmt.Printf("<<<< πŸ’  consumer.run - no more results available (running: %+v)\n", running) } } } diff --git a/internal/helpers/test-producer.go b/internal/helpers/test-producer.go index e20126a..2d2d166 100644 --- a/internal/helpers/test-producer.go +++ b/internal/helpers/test-producer.go @@ -6,19 +6,14 @@ import ( "sync" "time" + "github.com/google/uuid" "github.com/snivilised/lorax/async" ) -const ( - BatchSize = 10 -) - type ProviderFn[I any] func() I type Producer[I, R any] struct { sequenceNo int - audience []string - batchSize int JobsCh async.JobStream[I] quit *sync.WaitGroup Count int @@ -30,7 +25,8 @@ type Producer[I, R any] struct { // The producer owns the Jobs channel as it knows when to close it. This producer is // a fake producer and exposes a stop method that the client go routing can call to // indicate end of the work load. -func NewProducer[I, R any](ctx context.Context, +func StartProducer[I, R any]( + ctx context.Context, wg *sync.WaitGroup, capacity int, provider ProviderFn[I], @@ -41,47 +37,40 @@ func NewProducer[I, R any](ctx context.Context, } producer := Producer[I, R]{ - audience: []string{ // REDUNDANT, done by client via the provider function - "paul", "phil", "lindsey", "kaz", "kerry", - "nick", "john", "raj", "jim", "mark", "robyn", - }, - batchSize: BatchSize, JobsCh: make(async.JobStream[I], capacity), quit: wg, provider: provider, delay: delay, - terminateCh: make(chan string, async.DefaultChSize), + terminateCh: make(chan string), } - go producer.start(ctx) + go producer.run(ctx) return &producer } -func (p *Producer[I, R]) start(ctx context.Context) { +func (p *Producer[I, R]) run(ctx context.Context) { defer func() { close(p.JobsCh) - fmt.Printf("===> producer finished (Quit). ✨✨✨ \n") p.quit.Done() + fmt.Printf(">>>> producer.run - finished (QUIT). ✨✨✨ \n") }() - fmt.Printf("===> ✨ producer.start ...\n") + fmt.Printf(">>>> ✨ producer.run ...\n") for running := true; running; { select { case <-ctx.Done(): - fmt.Println("---> πŸ’  producer.start - done received β›”β›”β›”") - running = false + fmt.Println(">>>> πŸ’  producer.run - done received β›”β›”β›”") + case <-p.terminateCh: running = false - fmt.Printf("---> ✨ producer termination detected (running: %v)\n", running) + fmt.Printf(">>>> ✨ producer.run - termination detected (running: %v)\n", running) - default: - fmt.Printf("---> ✨ producer.start/default(running: %v) ...\n", running) + case <-time.After(time.Second / time.Duration(p.delay)): + fmt.Printf(">>>> ✨ producer.run - default (running: %v) ...\n", running) p.item() - - time.Sleep(time.Second / time.Duration(p.delay)) } } } @@ -89,16 +78,33 @@ func (p *Producer[I, R]) start(ctx context.Context) { func (p *Producer[I, R]) item() { i := p.provider() j := async.Job[I]{ + ID: fmt.Sprintf("JOB-ID:%v", uuid.NewString()), Input: i, } p.JobsCh <- j p.Count++ - fmt.Printf("===> ✨ producer.item, posted item: '%+v'\n", i) + fmt.Printf(">>>> ✨ producer.item, posted item: '%+v'\n", i) } func (p *Producer[I, R]) Stop() { - fmt.Println("---> 🧲 terminating ...") + fmt.Println(">>>> 🧲 producer terminating ...") p.terminateCh <- "done" close(p.terminateCh) } + +// StopProducerAfter, run in a new go routine +func StopProducerAfter[I, R any]( + ctx context.Context, + producer *Producer[I, R], + delay time.Duration, +) { + fmt.Printf(" >>> πŸ’€ Sleeping before requesting stop (%v) ...\n", delay) + select { + case <-ctx.Done(): + case <-time.After(delay): + } + + producer.Stop() + fmt.Printf(" >>> 🍧🍧🍧 stop submitted.\n") +}