Skip to content

Commit

Permalink
feat(boost): implement manifold func pool (#276)
Browse files Browse the repository at this point in the history
  • Loading branch information
plastikfan committed Jun 4, 2024
1 parent 212f623 commit 1789e48
Show file tree
Hide file tree
Showing 24 changed files with 703 additions and 88 deletions.
1 change: 1 addition & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
"cSpell.words": [
"Alloc",
"Assistable",
"Berthe",
"binaryheap",
"bodyclose",
"cenkalti",
Expand Down
8 changes: 7 additions & 1 deletion boost/ants-api.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,21 @@ package boost
import "github.com/snivilised/lorax/internal/ants"

type (
Option = ants.Option
IDGenerator = ants.IDGenerator
InputParam = ants.InputParam
Option = ants.Option
PoolFunc = ants.PoolFunc
TaskFunc = ants.TaskFunc
)

var (
WithDisablePurge = ants.WithDisablePurge
WithExpiryDuration = ants.WithExpiryDuration
WithGenerator = ants.WithGenerator
WithMaxBlockingTasks = ants.WithMaxBlockingTasks
WithNonblocking = ants.WithNonblocking
WithOptions = ants.WithOptions
WithOutput = ants.WithOutput
WithPanicHandler = ants.WithPanicHandler
WithPreAlloc = ants.WithPreAlloc
)
23 changes: 23 additions & 0 deletions boost/base-pool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package boost

import (
"sync"
"sync/atomic"
)

type (
basePool[O any] struct {
wg *sync.WaitGroup
sequence int32
outputDupCh *Duplex[JobOutput[O]]
ending bool
}
)

func (p *basePool[O]) next() int32 {
return atomic.AddInt32(&p.sequence, int32(1))
}

func (p *basePool[O]) Observe() JobOutputStreamR[O] {
return p.outputDupCh.ReaderCh
}
12 changes: 5 additions & 7 deletions boost/boost-public-api.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,15 @@ const (
type (
Job[I any] struct {
ID string
Input I
SequenceNo int
Input I
}

JobOutput[O any] struct {
Payload O
ID string
SequenceNo int
Payload O
Error error
}

JobStream[I any] chan Job[I]
Expand Down Expand Up @@ -47,11 +50,6 @@ type (
PoolResultStreamR = <-chan *PoolResult
PoolResultStreamW = chan<- *PoolResult

// IDGenerator is a sequential unique id generator interface
IDGenerator interface {
Generate() string
}

// Next is a sequential unique id generator func type
Next func() string
)
Expand Down
6 changes: 3 additions & 3 deletions boost/examples/alpha/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ func main() {

const NoW = 3

pool, _ := boost.NewFuncPool[int, int](ctx, NoW, func(inputCh ants.InputParam) {
n, _ := inputCh.(int)
pool, _ := boost.NewFuncPool[int, int](ctx, NoW, func(input ants.InputParam) {
n, _ := input.(int)
fmt.Printf("<--- (n: %v)🍒 \n", n)
time.Sleep(time.Second)
}, &wg, ants.WithNonblocking(false))
Expand All @@ -59,7 +59,7 @@ func main() {
fmt.Printf("POST: <--- (n: %v) [%v] 🍊 \n", i, time.Now().Format(time.TimeOnly))
}

fmt.Printf("pool with func, running workers number:%d\n",
fmt.Printf("pool with func, no of running workers:%d\n",
pool.Running(),
)
wg.Wait()
Expand Down
2 changes: 1 addition & 1 deletion boost/examples/beta/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func main() {
fmt.Printf("POST: <--- (n: %v) [%v] 🍊 \n", i, time.Now().Format(time.TimeOnly))
}

fmt.Printf("task pool, running workers number:%d\n",
fmt.Printf("task pool, no of running workers:%d\n",
pool.Running(),
)
wg.Wait()
Expand Down
118 changes: 118 additions & 0 deletions boost/examples/gamma/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
package main

import (
"context"
"fmt"
"sync"
"time"

"github.com/snivilised/lorax/boost"
)

// Demonstrates use of manifold func base worker pool where
// the client manifold func returns an output and an error. An
// output channel is created through which the client receives
// all generated outputs.

const (
AntsSize = 1000
n = 100000
OutputChSize = 10
Param = 100
OutputChTimeout = time.Second / 2 // do not use a value that is similar to interval
interval = time.Second / 10
)

func produce(ctx context.Context,
pool *boost.ManifoldFuncPool[int, int],
wg *sync.WaitGroup,
) {
defer wg.Done()

// Only the producer (observable) knows when the workload is complete
// but clearly it has no idea when the worker-pool is complete. Initially,
// one might think that the worker-pool knows when work is complete
// but this is in correct. The pool only knows when the pool is dormant,
// not that no more jobs will be submitted.
// This poses a problem from the perspective of the consumer; it does
// not know when to exit its output processing loop.
// What this indicates to us is that the knowledge of end of workload is
// a combination of multiple events:
//
// 1) The producer knows when it will submit no more work
// 2) The pool knows when all it's workers are dormant
//
// A non deterministic way for the consumer to exit it's output processing
// loop, is to use a timeout. But what is a sensible value? Only the client
// knows this and even so, it can't really be sure no more outputs will
// arrive after the timeout; essentially its making an educated guess, which
// is not reliable.
//
for i, n := 0, 100; i < n; i++ {
_ = pool.Post(ctx, Param)
}

pool.EndWork(ctx, interval)
}

func consume(ctx context.Context,
pool *boost.ManifoldFuncPool[int, int],
wg *sync.WaitGroup,
) {
defer wg.Done()

rch := pool.Observe()
for {
select {
case output, ok := <-rch:
if !ok {
return
} else {
fmt.Printf("🍒 payload: '%v', id: '%v', seq: '%v' (e: '%v')\n",
output.Payload, output.ID, output.SequenceNo, output.Error,
)
}
case <-time.After(OutputChTimeout):
fmt.Printf("⏱️ timeout!\n")
return
case <-ctx.Done():
fmt.Printf("❌ cancelled!\n")
return
}
}
}

func main() {
var wg sync.WaitGroup

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

pool, err := boost.NewManifoldFuncPool(
ctx, AntsSize, func(input int) (int, error) {
time.Sleep(time.Duration(input) * time.Millisecond)

return n + 1, nil
}, &wg,
boost.WithOutput(OutputChSize),
)

defer pool.Release(ctx)

if err != nil {
fmt.Printf("🔥 error creating pool: '%v'\n", err)
return
}

wg.Add(1)
go produce(ctx, pool, &wg) //nolint:wsl // pendant

wg.Add(1)
go consume(ctx, pool, &wg) //nolint:wsl // pendant

fmt.Printf("pool with func, no of running workers:%d\n",
pool.Running(),
)
wg.Wait()
fmt.Println("🏁 (manifold-func-pool) FINISHED")
}
49 changes: 49 additions & 0 deletions boost/generic-pool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package boost

import (
"context"

"github.com/snivilised/lorax/internal/ants"
)

// functionalPool
type functionalPool struct {
pool *ants.PoolWithFunc
}

func (p *functionalPool) Post(ctx context.Context, job InputParam) error {
return p.pool.Invoke(ctx, job)
}

func (p *functionalPool) Release(ctx context.Context) {
p.pool.Release(ctx)
}

func (p *functionalPool) Running() int {
return p.pool.Running()
}

func (p *functionalPool) Waiting() int {
return p.pool.Waiting()
}

// taskPool
type taskPool struct {
pool *ants.Pool
}

func (p *taskPool) Post(ctx context.Context, task TaskFunc) error {
return p.pool.Submit(ctx, task)
}

func (p *taskPool) Release(ctx context.Context) {
p.pool.Release(ctx)
}

func (p *taskPool) Running() int {
return p.pool.Running()
}

func (p *taskPool) Waiting() int {
return p.pool.Waiting()
}
7 changes: 4 additions & 3 deletions boost/id-generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,16 @@ package boost

import (
"fmt"
"sync/atomic"
)

type Sequential struct {
Format string
id int
id int32
}

func (g *Sequential) Generate() string {
g.id++
n := atomic.AddInt32(&g.id, int32(1))

return fmt.Sprintf(g.Format, g.id)
return fmt.Sprintf(g.Format, n)
}
15 changes: 15 additions & 0 deletions boost/options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package boost

// withDefaults prepends boost withDefaults to the sequence of options
func withDefaults(options ...Option) []Option {
const (
noDefaults = 1
)
o := make([]Option, 0, len(options)+noDefaults)
o = append(o, WithGenerator(&Sequential{
Format: "ID:%v",
}))
o = append(o, options...)

return o
}
19 changes: 0 additions & 19 deletions boost/pool-defs-internal.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,5 @@
package boost

import (
"sync"

"github.com/snivilised/lorax/internal/ants"
)

const (
// TODO: This is just temporary, channel size definition still needs to be
// fine tuned
Expand All @@ -29,19 +23,6 @@ type (
}

workersCollectionL[I, O any] map[workerID]*workerWrapperL[I, O]

basePool struct {
wg *sync.WaitGroup
idGen IDGenerator
}

taskPool struct {
pool *ants.Pool
}

functionalPool struct {
pool *ants.PoolWithFunc
}
)

// Worker pool types:
Expand Down
6 changes: 6 additions & 0 deletions boost/support_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,12 @@ func demoPoolFunc(inputCh ants.InputParam) {
time.Sleep(time.Duration(n) * time.Millisecond)
}

func demoPoolManifoldFunc(input int) (int, error) {
time.Sleep(time.Duration(input) * time.Millisecond)

return n + 1, nil
}

var stopLongRunningFunc int32

func longRunningFunc() {
Expand Down
Loading

0 comments on commit 1789e48

Please sign in to comment.