Skip to content

Commit

Permalink
feat(boost): add alert func to control worker pool activity display (#12
Browse files Browse the repository at this point in the history
)
  • Loading branch information
plastikfan committed Oct 9, 2023
1 parent 679064a commit d807f3c
Show file tree
Hide file tree
Showing 5 changed files with 78 additions and 34 deletions.
8 changes: 5 additions & 3 deletions boost/annotated-wait-group.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,11 @@ func (a *waitGroupAnImpl) Wait(name ...GoRoutineName) {
}

func (a *waitGroupAnImpl) indicate(highlight, name, op string) {
fmt.Printf(
" %v [[ WaitGroupAssister(%v).%v ]] - gr-name: '%v' (count: '%v') (running: '%v')\n",
highlight, a.waitGroupName, op, name, a.counter, a.running(),
Alert(
fmt.Sprintf(
" %v [[ WaitGroupAssister(%v).%v ]] - gr-name: '%v' (count: '%v') (running: '%v')\n",
highlight, a.waitGroupName, op, name, a.counter, a.running(),
),
)
}

Expand Down
4 changes: 4 additions & 0 deletions boost/boost-public-api.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,3 +62,7 @@ func NewDuplex[T any](channel chan T) *Duplex[T] {
WriterCh: channel,
}
}

type ActivityCallback func(message string)

var Alert ActivityCallback = func(message string) {}
57 changes: 35 additions & 22 deletions boost/worker-pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,23 +130,27 @@ func (p *WorkerPool[I, O]) run(
p.private.resultOutCh <- r

p.WaitAQ.Done(p.RoutineName)
fmt.Printf("<--- WorkerPool.run (QUIT). 🧊🧊🧊\n")
Alert("<--- WorkerPool.run (QUIT). 🧊🧊🧊\n")
}(result)
fmt.Printf("===> 🧊 WorkerPool.run ...(ctx:%+v)\n", parentContext)
Alert(fmt.Sprintf(
"===> 🧊 WorkerPool.run ...(ctx:%+v)\n",
parentContext,
))

for running := true; running; {
select {
case <-parentContext.Done():
running = false

close(forwardChOut) // ⚠️ This is new
fmt.Println("===> 🧊 WorkerPool.run (source jobs chan closed) - done received ☒️☒️☒️")
Alert("===> 🧊 WorkerPool.run (source jobs chan closed) - done received ☒️☒️☒️")

case job, ok := <-p.sourceJobsChIn:
if ok {
fmt.Printf("===> 🧊 (#workers: '%v') WorkerPool.run - new job received\n",
Alert(fmt.Sprintf(
"===> 🧊 (#workers: '%v') WorkerPool.run - new job received",
len(p.private.pool),
)
))

if len(p.private.pool) < p.noWorkers {
p.spawn(parentContext,
Expand All @@ -159,17 +163,19 @@ func (p *WorkerPool[I, O]) run(
}
select {
case forwardChOut <- job:
fmt.Printf("===> 🧊 WorkerPool.run - forwarded job 🧿🧿🧿(%v) [Seq: %v]\n",
Alert(fmt.Sprintf(
"===> 🧊 WorkerPool.run - forwarded job 🧿🧿🧿(%v) [Seq: %v]",
job.ID,
job.SequenceNo,
)
))
case <-parentContext.Done():
running = false

close(forwardChOut) // ⚠️ This is new
fmt.Printf("===> 🧊 (#workers: '%v') WorkerPool.run - done received ☒️☒️☒️\n",
Alert(fmt.Sprintf(
"===> 🧊 (#workers: '%v') WorkerPool.run - done received ☒️☒️☒️",
len(p.private.pool),
)
))
}
} else {
// ⚠️ This close is essential. Since the pool acts as a bridge between
Expand All @@ -179,7 +185,7 @@ func (p *WorkerPool[I, O]) run(
//
running = false
close(forwardChOut)
fmt.Printf("===> πŸš€ WorkerPool.run(source jobs chan closed) πŸŸ₯πŸŸ₯πŸŸ₯\n")
Alert("===> πŸš€ WorkerPool.run(source jobs chan closed) πŸŸ₯πŸŸ₯πŸŸ₯")
}
}
}
Expand All @@ -191,14 +197,16 @@ func (p *WorkerPool[I, O]) run(
if err := p.drain(p.private.finishedCh); err != nil {
result.Error = err

fmt.Printf("===> 🧊 WorkerPool.run - drain complete with error: '%v' (workers count: '%v'). πŸ“›πŸ“›πŸ“›\n",
Alert(fmt.Sprintf(
"===> 🧊 WorkerPool.run - drain complete with error: '%v' (workers count: '%v'). πŸ“›πŸ“›πŸ“›",
err,
len(p.private.pool),
)
))
} else {
fmt.Printf("===> 🧊 WorkerPool.run - drain complete OK (workers count: '%v'). β˜‘οΈβ˜‘οΈβ˜‘οΈ\n",
Alert(fmt.Sprintf(
"===> 🧊 WorkerPool.run - drain complete OK (workers count: '%v'). β˜‘οΈβ˜‘οΈβ˜‘οΈ",
len(p.private.pool),
)
))
}
}

Expand All @@ -222,14 +230,17 @@ func (p *WorkerPool[I, O]) spawn(

p.private.pool[w.core.id] = w
go w.core.run(parentContext, parentCancel, outputChTimeout)
fmt.Printf("===> 🧊 WorkerPool.spawned new worker: '%v' πŸŽ€πŸŽ€πŸŽ€\n", w.core.id)
Alert(fmt.Sprintf(
"===> 🧊 WorkerPool.spawned new worker: '%v' πŸŽ€πŸŽ€πŸŽ€",
w.core.id,
))
}

func (p *WorkerPool[I, O]) drain(finishedChIn finishedStreamR) error {
fmt.Printf(
"!!!! 🧊 WorkerPool.drain - waiting for remaining workers: %v (#GRs: %v); 🧊🧊🧊 \n",
Alert(fmt.Sprintf(
"!!!! 🧊 WorkerPool.drain - waiting for remaining workers: %v (#GRs: %v); 🧊🧊🧊",
len(p.private.pool), runtime.NumGoroutine(),
)
))

var firstError error

Expand Down Expand Up @@ -280,19 +291,21 @@ func (p *WorkerPool[I, O]) drain(finishedChIn finishedStreamR) error {
}

if workerResult.err != nil {
fmt.Printf("!!!! 🧊 WorkerPool.drain - worker (%v) πŸ’’πŸ’’πŸ’’ finished with error: '%v'\n",
Alert(fmt.Sprintf(
"!!!! 🧊 WorkerPool.drain - worker (%v) πŸ’’πŸ’’πŸ’’ finished with error: '%v'",
workerResult.id,
workerResult.err,
)
))

if firstError == nil {
firstError = workerResult.err
}
}

fmt.Printf("!!!! 🧊 WorkerPool.drain - worker-result-error(%v) finished, remaining: '%v' πŸŸ₯\n",
Alert(fmt.Sprintf(
"!!!! 🧊 WorkerPool.drain - worker-result-error(%v) finished, remaining: '%v' πŸŸ₯",
workerResult.err, len(p.private.pool),
)
))
}

return firstError
Expand Down
10 changes: 9 additions & 1 deletion boost/worker-pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,15 @@ type poolTE struct {
assert assertFunc
}

var _ = Describe("WorkerPool", func() {
func alertPrinter(message string) {
fmt.Println(message)
}

var _ = Describe("WorkerPool", Ordered, func() {
BeforeAll(func() {
boost.Alert = alertPrinter
})

DescribeTable("stream of jobs",
func(specContext SpecContext, entry *poolTE) {
defer leaktest.Check(GinkgoT())()
Expand Down
33 changes: 25 additions & 8 deletions boost/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,28 +25,39 @@ func (w *worker[I, O]) run(parentContext context.Context,
defer func(r *workerFinishedResult) {
w.finishedChOut <- r // ⚠️ non-pre-emptive send, but this should be ok

fmt.Printf(" <--- πŸš€ worker.run(%v) (SENT FINISHED - error:'%v'). πŸš€πŸš€πŸš€\n", w.id, r.err)
Alert(fmt.Sprintf(" <--- πŸš€ worker.run(%v) (SENT FINISHED - error:'%v'). πŸš€πŸš€πŸš€",
w.id, r.err,
))
}(&result)

fmt.Printf(" ---> πŸš€ worker.run(%v) ...(ctx:%+v)\n", w.id, parentContext)
Alert(
fmt.Sprintf(" ---> πŸš€ worker.run(%v) ...(ctx:%+v)\n", w.id, parentContext),
)

for running := true; running; {
select {
case <-parentContext.Done():
fmt.Printf(" ---> πŸš€ worker.run(%v)(finished) - done received πŸ”ΆπŸ”ΆπŸ”Ά\n", w.id)
Alert(fmt.Sprintf(
" ---> πŸš€ worker.run(%v)(finished) - done received πŸ”ΆπŸ”ΆπŸ”Ά", w.id,
))

running = false
case job, ok := <-w.jobsChIn:
if ok {
fmt.Printf(" ---> πŸš€ worker.run(%v)(input:'%v')\n", w.id, job.Input)
Alert(fmt.Sprintf(
" ---> πŸš€ worker.run(%v)(input:'%v')", w.id, job.Input,
))

err := w.invoke(parentContext, parentCancel, outputChTimeout, job)

if err != nil {
result.err = err
running = false
}
} else {
fmt.Printf(" ---> πŸš€ worker.run(%v)(jobs chan closed) πŸŸ₯πŸŸ₯πŸŸ₯\n", w.id)
Alert(fmt.Sprintf(
" ---> πŸš€ worker.run(%v)(jobs chan closed) πŸŸ₯πŸŸ₯πŸŸ₯", w.id,
))

running = false
}
Expand All @@ -67,16 +78,22 @@ func (w *worker[I, O]) invoke(parentContext context.Context,
result, _ := w.exec(job)

if w.outputsChOut != nil {
fmt.Printf(" ---> πŸš€ worker.invoke ⏰ output timeout: '%v'\n", outputChTimeout)
Alert(fmt.Sprintf(
" ---> πŸš€ worker.invoke ⏰ output timeout: '%v'", outputChTimeout,
))

select {
case w.outputsChOut <- result:

case <-parentContext.Done():
fmt.Printf(" ---> πŸš€ worker.invoke(%v)(cancel) - done received πŸ’₯πŸ’₯πŸ’₯\n", w.id)
Alert(fmt.Sprintf(
" ---> πŸš€ worker.invoke(%v)(cancel) - done received πŸ’₯πŸ’₯πŸ’₯", w.id,
))

case <-outputContext.Done():
fmt.Printf(" ---> πŸš€ worker.invoke(%v)(cancel) - timeout on send πŸ‘ΏπŸ‘ΏπŸ‘Ώ\n", w.id)
Alert(fmt.Sprintf(
" ---> πŸš€ worker.invoke(%v)(cancel) - timeout on send πŸ‘ΏπŸ‘ΏπŸ‘Ώ", w.id,
))

// ??? err = i18n.NewOutputChTimeoutError()
err = errors.New("timeout on send")
Expand Down

0 comments on commit d807f3c

Please sign in to comment.