Skip to content

Commit

Permalink
test(boost): silence unit tests (#251)
Browse files Browse the repository at this point in the history
  • Loading branch information
plastikfan committed Apr 29, 2024
1 parent 0ca36d8 commit c1fa7e0
Show file tree
Hide file tree
Showing 5 changed files with 101 additions and 32 deletions.
26 changes: 22 additions & 4 deletions boost/worker-pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ func start[I, O any](outputsDupCh *boost.Duplex[boost.JobOutput[O]]) *pipeline[I
func (p *pipeline[I, O]) produce(parentContext context.Context,
interval time.Duration,
provider helpers.ProviderFunc[I],
verbose bool,
) {
p.cancel = func(_ context.Context,
parentCancel context.CancelFunc,
Expand All @@ -142,6 +143,7 @@ func (p *pipeline[I, O]) produce(parentContext context.Context,
go helpers.CancelProducerAfter[I, O](
delay,
parentCancel,
verbose,
)
}
p.stop = func(_ context.Context,
Expand All @@ -152,14 +154,17 @@ func (p *pipeline[I, O]) produce(parentContext context.Context,
parentContext,
p.producer,
delay,
verbose,
)
}

p.producer = helpers.StartProducer[I, O](
parentContext,
p.wgan,
JobChSize,
provider,
interval,
verbose,
)

p.wgan.Add(1, p.producer.RoutineName)
Expand All @@ -186,11 +191,14 @@ func (p *pipeline[I, O]) process(parentContext context.Context,
go p.pool.Start(parentContext, parentCancel, p.outputsDup.WriterCh)
}

func (p *pipeline[I, O]) consume(parentContext context.Context, interval time.Duration) {
func (p *pipeline[I, O]) consume(parentContext context.Context,
interval time.Duration, verbose bool,
) {
p.consumer = helpers.StartConsumer(parentContext,
p.wgan,
p.outputsDup.ReaderCh,
interval,
verbose,
)

p.wgan.Add(1, p.consumer.RoutineName)
Expand Down Expand Up @@ -286,6 +294,7 @@ var (
pipe.producer.Count,
)
}
silentSummariser summariseFunc = func(_ TestPipeline) {}
)

type durations struct {
Expand All @@ -311,6 +320,10 @@ var _ = Describe("WorkerPool", Ordered, func() {
func(specContext SpecContext, entry *poolTE) {
defer leaktest.Check(GinkgoT())()

const (
verbose = false
)

outputDup := lo.TernaryF(entry.outputsChSize > 0,
func() *boost.Duplex[boost.JobOutput[TestOutput]] {
return boost.NewDuplex(make(TestJobOutputStream, entry.outputsChSize))
Expand All @@ -323,11 +336,15 @@ var _ = Describe("WorkerPool", Ordered, func() {
pipe := start[TestInput, TestOutput](outputDup)

defer func() {
if counter, ok := (pipe.wgan).(boost.AnnotatedWgCounter); ok {
if counter, ok := (pipe.wgan).(boost.AnnotatedWgCounter); ok && verbose {
fmt.Printf("🎈🎈🎈🎈 remaining count: '%v'\n", counter.Count())
}
}()

if !verbose {
entry.summarise = silentSummariser
}

parentContext, parentCancel := context.WithCancel(specContext)

By("👾 WAIT-GROUP ADD(producer)")
Expand All @@ -337,9 +354,10 @@ var _ = Describe("WorkerPool", Ordered, func() {
Recipient: audience[recipient],
}
}

pipe.produce(parentContext, lo.Ternary(entry.intervals.producer > 0,
entry.intervals.producer, defaults.producerInterval,
), provider)
), provider, verbose)

By("👾 WAIT-GROUP ADD(worker-pool)\n")
now := lo.Ternary(entry.now > 0, entry.now, defaults.noOfWorkers)
Expand All @@ -354,7 +372,7 @@ var _ = Describe("WorkerPool", Ordered, func() {
By("👾 WAIT-GROUP ADD(consumer)")
pipe.consume(parentContext, lo.Ternary(entry.intervals.consumer > 0,
entry.intervals.consumer, defaults.consumerInterval,
))
), verbose)
}

By("👾 NOW AWAITING TERMINATION")
Expand Down
27 changes: 20 additions & 7 deletions internal/helpers/test-consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,22 @@ type Consumer[O any] struct {
interval time.Duration
OutputsChIn boost.JobOutputStreamR[O]
Count int
verbose bool
}

func StartConsumer[O any](
parentContext context.Context,
quitter boost.AnnotatedWgQuitter,
outputsChIn boost.JobOutputStreamR[O],
interval time.Duration,
verbose bool,
) *Consumer[O] {
consumer := &Consumer[O]{
quitter: quitter,
RoutineName: boost.GoRoutineName("💠 consumer"),
interval: interval,
OutputsChIn: outputsChIn,
verbose: verbose,
}

go consumer.run(parentContext)
Expand All @@ -37,27 +40,37 @@ func StartConsumer[O any](
func (c *Consumer[O]) run(parentContext context.Context) {
defer func() {
c.quitter.Done(c.RoutineName)
fmt.Printf("<<<< 💠 consumer.run - finished (QUIT). 💠💠💠 \n")
if c.verbose {
fmt.Printf("<<<< 💠 consumer.run - finished (QUIT). 💠💠💠 \n")
}
}()
fmt.Printf("<<<< 💠 consumer.run ...(ctx:%+v)\n", parentContext)
if c.verbose {
fmt.Printf("<<<< 💠 consumer.run ...(ctx:%+v)\n", parentContext)
}

for running := true; running; {
<-time.After(c.interval)
select {
case <-parentContext.Done():
running = false

fmt.Println("<<<< 💠 consumer.run - done received 💔💔💔")
if c.verbose {
fmt.Println("<<<< 💠 consumer.run - done received 💔💔💔")
}

case result, ok := <-c.OutputsChIn:
if ok {
c.Count++
fmt.Printf("<<<< 💠 consumer.run - new result arrived(#%v): '%+v' \n",
c.Count, result.Payload,
)
if c.verbose {
fmt.Printf("<<<< 💠 consumer.run - new result arrived(#%v): '%+v' \n",
c.Count, result.Payload,
)
}
} else {
running = false
fmt.Printf("<<<< 💠 consumer.run - no more results available (running: %+v)\n", running)
if c.verbose {
fmt.Printf("<<<< 💠 consumer.run - no more results available (running: %+v)\n", running)
}
}
}
}
Expand Down
76 changes: 57 additions & 19 deletions internal/helpers/test-producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ type Producer[I, O any] struct {
terminateDup terminationDuplex
JobsCh boost.JobStream[I]
Count int
verbose bool
}

// The producer owns the Jobs channel as it knows when to close it. This producer is
Expand All @@ -34,6 +35,7 @@ func StartProducer[I, O any](
capacity int,
provider ProviderFunc[I],
interval time.Duration,
verbose bool,
) *Producer[I, O] {
if interval == 0 {
panic(fmt.Sprintf("Invalid delay requested: '%v'", interval))
Expand All @@ -46,6 +48,7 @@ func StartProducer[I, O any](
interval: interval,
terminateDup: boost.NewDuplex(make(chan termination)),
JobsCh: make(boost.JobStream[I], capacity),
verbose: verbose,
}

go producer.run(parentContext)
Expand All @@ -57,24 +60,35 @@ func (p *Producer[I, O]) run(parentContext context.Context) {
defer func() {
close(p.JobsCh)
p.quitter.Done(p.RoutineName)
fmt.Printf(">>>> ✨ producer.run - finished (QUIT). ✨✨✨ \n")
}()

fmt.Printf(">>>> ✨ producer.run ...(ctx:%+v)\n", parentContext)
if p.verbose {
fmt.Printf(">>>> ✨ producer.run - finished (QUIT). ✨✨✨ \n")
}
}()
if p.verbose {
fmt.Printf(">>>> ✨ producer.run ...(ctx:%+v)\n", parentContext)
}

for running := true; running; {
select {
case <-parentContext.Done():
running = false

fmt.Println(">>>> ✨ producer.run - done received ⛔⛔⛔")
if p.verbose {
fmt.Println(">>>> ✨ producer.run - done received ⛔⛔⛔")
}

case <-p.terminateDup.ReaderCh:
running = false
fmt.Printf(">>>> ✨ producer.run - termination detected (running: %v)\n", running)

if p.verbose {
fmt.Printf(">>>> ✨ producer.run - termination detected (running: %v)\n", running)
}

case <-time.After(p.interval):
fmt.Printf(">>>> ✨ producer.run - default (running: %v) ...\n", running)
if p.verbose {
fmt.Printf(">>>> ✨ producer.run - default (running: %v) ...\n", running)
}

if !p.item(parentContext) {
running = false
Expand All @@ -95,28 +109,37 @@ func (p *Producer[I, O]) item(parentContext context.Context) bool {
SequenceNo: p.sequenceNo,
}

fmt.Printf(">>>> ✨ producer.item, 🟠 waiting to post item: '%+v'\n", i)
if p.verbose {
fmt.Printf(">>>> ✨ producer.item, 🟠 waiting to post item: '%+v'\n", i)
}

select {
case <-parentContext.Done():
fmt.Println(">>>> ✨ producer.item - done received ⛔⛔⛔")
if p.verbose {
fmt.Println(">>>> ✨ producer.item - done received ⛔⛔⛔")
}

result = false

case p.JobsCh <- j:
}

if result {
fmt.Printf(">>>> ✨ producer.item, 🟢 posted item: '%+v'\n", i)
} else {
fmt.Printf(">>>> ✨ producer.item, 🔴 item NOT posted: '%+v'\n", i)
if p.verbose {
if result {
fmt.Printf(">>>> ✨ producer.item, 🟢 posted item: '%+v'\n", i)
} else {
fmt.Printf(">>>> ✨ producer.item, 🔴 item NOT posted: '%+v'\n", i)
}
}

return result
}

func (p *Producer[I, O]) Stop() {
fmt.Println(">>>> 🧲 producer terminating ...")
if p.verbose {
fmt.Println(">>>> 🧲 producer terminating ...")
}

p.terminateDup.WriterCh <- termination("done")
close(p.terminateDup.Channel)
}
Expand All @@ -126,32 +149,47 @@ func StopProducerAfter[I, O any](
parentContext context.Context,
producer *Producer[I, O],
delay time.Duration,
verbose bool,
) {
fmt.Printf(" >>> 💤 StopAfter - Sleeping before requesting stop (%v) ...\n", delay)
if verbose {
fmt.Printf(" >>> 💤 StopAfter - Sleeping before requesting stop (%v) ...\n", delay)
}

select {
case <-parentContext.Done():
case <-time.After(delay):
}

producer.Stop()
fmt.Printf(" >>> StopAfter - 🍧🍧🍧 stop submitted.\n")

if verbose {
fmt.Printf(" >>> StopAfter - 🍧🍧🍧 stop submitted.\n")
}
}

func CancelProducerAfter[I, O any](
delay time.Duration,
parentCancel context.CancelFunc,
verbose bool,
) {
fmt.Printf(" >>> 💤 CancelAfter - Sleeping before requesting cancellation (%v) ...\n", delay)
if verbose {
fmt.Printf(" >>> 💤 CancelAfter - Sleeping before requesting cancellation (%v) ...\n", delay)
}
<-time.After(delay)

// we should always expect to get a cancel function back, even if we don't
// ever use it, so it is still relevant to get it in the stop test case
//
if parentCancel != nil {
fmt.Printf(" >>> CancelAfter - 🛑🛑🛑 cancellation submitted.\n")
if verbose {
fmt.Printf(" >>> CancelAfter - 🛑🛑🛑 cancellation submitted.\n")
}
parentCancel()
fmt.Printf(" >>> CancelAfter - ➖➖➖ CANCELLED\n")
} else {

if verbose {
fmt.Printf(" >>> CancelAfter - ➖➖➖ CANCELLED\n")
}
} else if verbose {
fmt.Printf(" >>> CancelAfter(noc) - ✖️✖️✖️ cancellation attempt benign.\n")
}
}
2 changes: 1 addition & 1 deletion rx/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -408,7 +408,7 @@ func RangePF[T ProxyField[T, O], O Numeric](iterator RangeIteratorPF[T, O],
}

return &ObservableImpl[T]{
iterable: newRangeIterableNF(iterator, opts...),
iterable: newRangeIterablePF(iterator, opts...),
}
}

Expand Down
2 changes: 1 addition & 1 deletion rx/iterable-range.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ type rangeIterablePF[T ProxyField[T, O], O Numeric] struct {
opts []Option[T]
}

func newRangeIterableNF[T ProxyField[T, O], O Numeric](iterator RangeIteratorPF[T, O],
func newRangeIterablePF[T ProxyField[T, O], O Numeric](iterator RangeIteratorPF[T, O],
opts ...Option[T],
) Iterable[T] {
return &rangeIterablePF[T, O]{
Expand Down

0 comments on commit c1fa7e0

Please sign in to comment.