Skip to content

Commit

Permalink
feat(ants,boost): implement timeout on send (#281)
Browse files Browse the repository at this point in the history
  • Loading branch information
plastikfan committed Jun 6, 2024
1 parent 6b51295 commit 04deaaf
Show file tree
Hide file tree
Showing 18 changed files with 471 additions and 132 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ coverage.out
ginkgo.report

.task/
.env
.DS_Store
thumbs.db

i18n/out/en-US/active.en-GB.json

1 change: 1 addition & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ issues:
fix: true
exclude:
- "cuddle"
- "go statements can only invoke functions assigned on line above"

run:
issues-exit-code: 1
Expand Down
22 changes: 16 additions & 6 deletions boost/base-pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,28 @@ import (

type (
basePool[I, O any] struct {
wg *sync.WaitGroup
sequence int32
inputDupCh *Duplex[I]
outputDupCh *Duplex[JobOutput[O]]
ending bool
wg *sync.WaitGroup
sequence int32
inputDupCh *Duplex[I]
oi *outputInfo[O]
ending bool
}
)

func (p *basePool[I, O]) next() int32 {
return atomic.AddInt32(&p.sequence, int32(1))
}

// Observe
func (p *basePool[I, O]) Observe() JobOutputStreamR[O] {
return p.outputDupCh.ReaderCh
return p.oi.outputDupCh.ReaderCh
}

// CancelCh
func (p *basePool[I, O]) CancelCh() CancelStreamR {
if p.oi != nil {
return p.oi.cancelDupCh.ReaderCh
}

return nil
}
2 changes: 2 additions & 0 deletions boost/boost-public-api.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ type (

// Next is a sequential unique id generator func type
Next func() string

OnCancel func()
)

type ExecutiveFunc[I, O any] func(j Job[I]) (JobOutput[O], error)
Expand Down
30 changes: 30 additions & 0 deletions boost/cancellation-monitor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package boost

import (
"context"
"sync"
)

// StartCancellationMonitor
func StartCancellationMonitor(ctx context.Context,
cancel context.CancelFunc,
wg *sync.WaitGroup,
cancelCh CancelStreamR,
on OnCancel,
) {
wg.Add(1)
go func(ctx context.Context,
cancel context.CancelFunc,
wg *sync.WaitGroup,
cancelCh CancelStreamR,
) {
defer wg.Done()

select {
case <-cancelCh:
on()
cancel()
case <-ctx.Done():
}
}(ctx, cancel, wg, cancelCh)
}
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,49 @@ import (
// output channel is created through which the client receives
// all generated outputs.

func main() {
var wg sync.WaitGroup

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

pool, err := boost.NewManifoldFuncPool(
ctx, AntsSize, func(input int) (int, error) {
time.Sleep(time.Duration(input) * time.Millisecond)

return n + 1, nil
}, &wg,
boost.WithOutput(OutputChSize, CheckCloseInterval, TimeoutOnSend),
)

defer pool.Release(ctx)

if err != nil {
fmt.Printf("🔥 error creating pool: '%v'\n", err)
return
}

wg.Add(1)
go produce(ctx, pool, &wg)

wg.Add(1)
go consume(ctx, pool, &wg)

fmt.Printf("pool with func, no of running workers:%d\n",
pool.Running(),
)
wg.Wait()
fmt.Println("🏁 (manifold-func-pool) FINISHED")
}

const (
AntsSize = 1000
n = 100000
OutputChSize = 10
Param = 100
OutputChTimeout = time.Second / 2 // do not use a value that is similar to CheckCloseInterval
CheckCloseInterval = time.Second / 10
TimeoutOnSend = time.Second * 2
)

func produce(ctx context.Context,
Expand Down Expand Up @@ -77,38 +113,3 @@ func consume(_ context.Context,
)
}
}

func main() {
var wg sync.WaitGroup

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

pool, err := boost.NewManifoldFuncPool(
ctx, AntsSize, func(input int) (int, error) {
time.Sleep(time.Duration(input) * time.Millisecond)

return n + 1, nil
}, &wg,
boost.WithOutput(OutputChSize, CheckCloseInterval),
)

defer pool.Release(ctx)

if err != nil {
fmt.Printf("🔥 error creating pool: '%v'\n", err)
return
}

wg.Add(1)
go produce(ctx, pool, &wg) //nolint:wsl // pendant

wg.Add(1)
go consume(ctx, pool, &wg) //nolint:wsl // pendant

fmt.Printf("pool with func, no of running workers:%d\n",
pool.Running(),
)
wg.Wait()
fmt.Println("🏁 (manifold-func-pool) FINISHED")
}
74 changes: 74 additions & 0 deletions boost/examples/mf-err-missing-consumer/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package main

import (
"context"
"fmt"
"sync"
"time"

"github.com/snivilised/lorax/boost"
)

// Demonstrates Timeout On Send as a result of not consuming the output
func main() {
var wg sync.WaitGroup

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

fmt.Printf("⏱️ timeout on send, missing consumer: '%.2f's\n", TimeoutOnSend.Seconds())
pool, err := boost.NewManifoldFuncPool(
ctx, AntsSize, func(input int) (int, error) {
time.Sleep(time.Duration(input) * time.Millisecond)

return n + 1, nil
}, &wg,
boost.WithOutput(OutputChSize, CheckCloseInterval, TimeoutOnSend),
)

defer pool.Release(ctx)
if cc := pool.CancelCh(); cc != nil {
boost.StartCancellationMonitor(ctx, cancel, &wg, cc, func() {
fmt.Print("🔴 cancellation received, cancelling...\n")
})
}

if err != nil {
fmt.Printf("🔥 error creating pool: '%v'\n", err)
return
}

wg.Add(1)
go produce(ctx, pool, &wg)

fmt.Printf("pool with func, no of running workers:%d\n",
pool.Running(),
)
wg.Wait()
fmt.Println("🏁 (manifold-func-pool, missing consumer) FINISHED")
}

const (
AntsSize = 1000
n = 100000
OutputChSize = 10
Param = 100
CheckCloseInterval = time.Second / 10
TimeoutOnSend = time.Second * 3
)

func produce(ctx context.Context,
pool *boost.ManifoldFuncPool[int, int],
wg *sync.WaitGroup,
) {
defer wg.Done()

for i, n := 0, 100; i < n; i++ {
_ = pool.Post(ctx, Param)
}

// required to inform the worker pool that no more jobs will be submitted.
// failure to invoke Conclude will result in a never ending worker pool.
//
pool.Conclude(ctx)
}
111 changes: 111 additions & 0 deletions boost/examples/mf-err-timeout-on-send/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
package main

import (
"context"
"fmt"
"sync"
"time"

"github.com/snivilised/lorax/boost"
"github.com/snivilised/lorax/internal/lo"
)

// Demonstrates Timeout On Send
func main() {
var wg sync.WaitGroup

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

fmt.Printf("⏱️ timeout on send: '%.2f's\n", TimeoutOnSend.Seconds())
pool, err := boost.NewManifoldFuncPool(
ctx, AntsSize, func(input int) (int, error) {
time.Sleep(time.Duration(input) * time.Millisecond)

return n + 1, nil
}, &wg,
boost.WithOutput(OutputChSize, CheckCloseInterval, TimeoutOnSend),
)

defer pool.Release(ctx)
if cc := pool.CancelCh(); cc != nil {
boost.StartCancellationMonitor(ctx, cancel, &wg, cc, func() {
fmt.Print("🔴 cancellation received, cancelling...\n")
})
}

if err != nil {
fmt.Printf("🔥 error creating pool: '%v'\n", err)
return
}

wg.Add(1)
go produce(ctx, pool, &wg)

wg.Add(1)
go consume(ctx, pool, &wg)

fmt.Printf("pool with func, no of running workers:%d\n",
pool.Running(),
)
wg.Wait()
fmt.Println("🏁 (manifold-func-pool, timeout on send) FINISHED")
}

const (
AntsSize = 1000
n = 100000
OutputChSize = 10
Param = 100
CheckCloseInterval = time.Second / 10
TimeoutOnSend = time.Second
)

func produce(ctx context.Context,
pool *boost.ManifoldFuncPool[int, int],
wg *sync.WaitGroup,
) {
defer wg.Done()

for i, n := 0, 100; i < n; i++ {
_ = pool.Post(ctx, Param)
}

// required to inform the worker pool that no more jobs will be submitted.
// failure to invoke Conclude will result in a never ending worker pool.
//
pool.Conclude(ctx)
}

func consume(ctx context.Context,
pool *boost.ManifoldFuncPool[int, int],
wg *sync.WaitGroup,
) {
defer wg.Done()

const (
fast = time.Second / 10
slow = time.Second * 3
barrier = 10
)

for count := 0; ; count++ {
// Slow consumer after 10 iterations, resulting in a timeout
//
time.Sleep(
lo.Ternary(count > barrier, slow, fast),
)

// NB: can not range over the observe channel since range
// is non-preempt-able and therefore does not react to
// ctx.Done.
select {
case output := <-pool.Observe():
fmt.Printf("🍒 payload: '%v', id: '%v', seq: '%v' (e: '%v')\n",
output.Payload, output.ID, output.SequenceNo, output.Error,
)
case <-ctx.Done():
return
}
}
}
Loading

0 comments on commit 04deaaf

Please sign in to comment.