Skip to content

Commit

Permalink
feat(boost): add worker pool wrapper (#271)
Browse files Browse the repository at this point in the history
  • Loading branch information
plastikfan committed May 29, 2024
1 parent d0a2280 commit 5c3b453
Show file tree
Hide file tree
Showing 24 changed files with 1,134 additions and 222 deletions.
4 changes: 4 additions & 0 deletions Taskfile.yml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ tasks:
cmds:
- ginkgo -p ./internal/ants

tb:
cmds:
- ginkgo -p ./boost

ti:
cmds:
- go test ./i18n
Expand Down
17 changes: 17 additions & 0 deletions boost/ants-api.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package boost

import "github.com/snivilised/lorax/internal/ants"

type (
Option = ants.Option
)

var (
WithDisablePurge = ants.WithDisablePurge
WithExpiryDuration = ants.WithExpiryDuration
WithMaxBlockingTasks = ants.WithMaxBlockingTasks
WithNonblocking = ants.WithNonblocking
WithOptions = ants.WithOptions
WithPanicHandler = ants.WithPanicHandler
WithPreAlloc = ants.WithPreAlloc
)
8 changes: 8 additions & 0 deletions boost/boost-public-api.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,14 @@ type (
PoolResultStream = chan *PoolResult
PoolResultStreamR = <-chan *PoolResult
PoolResultStreamW = chan<- *PoolResult

// IDGenerator is a sequential unique id generator interface
IDGenerator interface {
Generate() string
}

// Next is a sequential unique id generator func type
Next func() string
)

type ExecutiveFunc[I, O any] func(j Job[I]) (JobOutput[O], error)
Expand Down
16 changes: 16 additions & 0 deletions boost/id-generator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package boost

import (
"fmt"
)

type Sequential struct {
Format string
id int
}

func (g *Sequential) Generate() string {
g.id++

return fmt.Sprintf(g.Format, g.id)
}
8 changes: 8 additions & 0 deletions boost/observable.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package boost

type observable[O any] struct {
stream JobStreamR[O]
}

func (o *observable[O]) Observe() {
}
19 changes: 19 additions & 0 deletions boost/pool-defs-internal.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
package boost

import (
"sync"

"github.com/snivilised/lorax/internal/ants"
)

const (
// TODO: This is just temporary, channel size definition still needs to be
// fine tuned
Expand All @@ -23,4 +29,17 @@ type (
}

workersCollectionL[I, O any] map[workerID]*workerWrapperL[I, O]

basePool struct {
idGen IDGenerator
wg *sync.WaitGroup
}

generalPool struct {
pool *ants.Pool
}

functionalPool struct {
pool *ants.PoolWithFunc
}
)
57 changes: 57 additions & 0 deletions boost/support_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package boost_test

import (
"runtime"
"sync/atomic"
"time"

. "github.com/onsi/ginkgo/v2" //nolint:revive // ok
"github.com/snivilised/lorax/internal/ants"
)

const (
_ = 1 << (10 * iota)
KiB // 1024
MiB // 1048576
)

const (
Param = 100
AntsSize = 1000
TestSize = 10000
n = 100000
PoolSize = 10
)

const (
RunTimes = 1e6
PoolCap = 5e4
BenchParam = 10
DefaultExpiredTime = 10 * time.Second
)

var curMem uint64

func demoFunc() {
time.Sleep(time.Duration(BenchParam) * time.Millisecond)
}

func demoPoolFunc(inputCh ants.InputParam) {
n, _ := inputCh.(int)
time.Sleep(time.Duration(n) * time.Millisecond)
}

var stopLongRunningFunc int32

func longRunningFunc() {
for atomic.LoadInt32(&stopLongRunningFunc) == 0 {
runtime.Gosched()
}
}

func ShowMemStats() {
mem := runtime.MemStats{}
runtime.ReadMemStats(&mem)
curMem = mem.TotalAlloc/MiB - curMem
GinkgoWriter.Printf("memory usage:%d MB", curMem)
}
55 changes: 55 additions & 0 deletions boost/worker-pool-func.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package boost

import (
"sync"

"github.com/snivilised/lorax/internal/ants"
)

type WorkerPoolInvoker[I, O any] struct {
basePool
functionalPool
sourceJobsChIn JobStream[I]
}

// NewFuncPool creates a new worker pool using the native ants interface; ie
// new jobs are submitted with Submit(task TaskFunc)
func NewFuncPool[I, O any](size int,
pf ants.PoolFunc,
wg *sync.WaitGroup,
options ...Option,
) (*WorkerPoolInvoker[I, O], error) {
// TODO: the automatic invocation of Add/Done might not
// be valid, need to confirm. I thought that each gr was
// allocated for each job, but this is not necessarily
// the case, because each worker has its own job queue.
//
pool, err := ants.NewPoolWithFunc(size, func(i ants.InputParam) {
defer wg.Done()
pf(i)
}, options...)

return &WorkerPoolInvoker[I, O]{
basePool: basePool{
idGen: &Sequential{},
wg: wg,
},
functionalPool: functionalPool{
pool: pool,
},
}, err
}

func (p *WorkerPoolInvoker[I, O]) Post(job ants.InputParam) error {
p.wg.Add(1) // because the gr lifetime is tied to the job not the worker

return p.pool.Invoke(job)
}

func (p *WorkerPoolInvoker[I, O]) Running() int {
return p.pool.Running()
}

func (p *WorkerPoolInvoker[I, O]) Release() {
p.pool.Release()
}
34 changes: 34 additions & 0 deletions boost/worker-pool-func_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package boost_test

import (
"sync"

. "github.com/onsi/ginkgo/v2" //nolint:revive // ginkgo ok
. "github.com/onsi/gomega" //nolint:revive // gomega ok

"github.com/snivilised/lorax/boost"
)

var _ = Describe("WorkerPoolFunc", func() {
Context("ants", func() {
It("should: not fail", func() {
// TestNonblockingSubmit
var wg sync.WaitGroup

pool, err := boost.NewFuncPool[int, int](AntsSize, demoPoolFunc, &wg)

defer pool.Release()

for i := 0; i < n; i++ { // producer
_ = pool.Post(Param)
}
wg.Wait()
GinkgoWriter.Printf("pool with func, running workers number:%d\n",
pool.Running(),
)
ShowMemStats()

Expect(err).To(Succeed())
})
})
})
34 changes: 17 additions & 17 deletions boost/worker-pool-legacy.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
"go.uber.org/zap/zapcore"
)

// privateWpInfo (dmz!) contains any state that needs to be mutated in a non concurrent manner
// privateWpInfoL (dmz!) contains any state that needs to be mutated in a non concurrent manner
// and therefore should be exclusively accessed by a single go routine. Actually, due to
// our ability to compose functionality with channels as opposed to shared state, the
// pool does not contain any state that is accessed directly or indirectly from other
Expand All @@ -23,30 +23,30 @@ import (
// is to clearly indicate what state can be accessed in different concurrency contexts,
// to ensure future updates can be applied with minimal cognitive overload.
//
// There is another purpose for privateWpInfo and that is to do with "confinement" as
// There is another purpose for privateWpInfoL and that is to do with "confinement" as
// described on page 86 of CiG. The aim here is to use "lexical confinement" for
// duplex channel definitions, so although a channel is thread safe so ordinarily
// would not be a candidate member of privateWpInfo, a duplex channel ought to be
// would not be a candidate member of privateWpInfoL, a duplex channel ought to be
// protected from accidentally being used incorrectly, ie trying to write to a channel
// that is meant to be read only. So methods that use a channel should now receive the
// channel through a method parameter (defined as either chan<-, or <-chan), rather
// than be expected to simply access the member variable directly. This clearly signals
// that any channel defined in privateWpInfo should never to accessed directly (other
// that any channel defined in privateWpInfoL should never to accessed directly (other
// than for passing it to another method). This is an experimental convention that
// is being established for all snivilised projects.
type privateWpInfo[I, O any] struct {
type privateWpInfoL[I, O any] struct {
pool workersCollectionL[I, O]
workersJobsCh chan Job[I]
finishedCh finishedStream
cancelCh CancelStream
resultOutCh PoolResultStreamW
}

// WorkerPool owns the resultOut channel, because it is the only entity that knows
// WorkerPoolL owns the resultOut channel, because it is the only entity that knows
// when all workers have completed their work due to the finished channel, which it also
// owns.
type WorkerPool[I, O any] struct {
private privateWpInfo[I, O]
type WorkerPoolL[I, O any] struct {
private privateWpInfoL[I, O]
outputChTimeout time.Duration
exec ExecutiveFunc[I, O]
noWorkers int
Expand All @@ -57,7 +57,7 @@ type WorkerPool[I, O any] struct {
Logger *slog.Logger
}

type NewWorkerPoolParams[I, O any] struct {
type NewWorkerPoolParamsL[I, O any] struct {
NoWorkers int
OutputChTimeout time.Duration
Exec ExecutiveFunc[I, O]
Expand All @@ -67,7 +67,7 @@ type NewWorkerPoolParams[I, O any] struct {
Logger *slog.Logger
}

func NewWorkerPool[I, O any](params *NewWorkerPoolParams[I, O]) *WorkerPool[I, O] {
func NewWorkerPoolL[I, O any](params *NewWorkerPoolParamsL[I, O]) *WorkerPoolL[I, O] {
noWorkers := runtime.NumCPU()
if params.NoWorkers > 1 && params.NoWorkers <= MaxWorkers {
noWorkers = params.NoWorkers
Expand All @@ -86,8 +86,8 @@ func NewWorkerPool[I, O any](params *NewWorkerPoolParams[I, O]) *WorkerPool[I, O
},
)

wp := &WorkerPool[I, O]{
private: privateWpInfo[I, O]{
wp := &WorkerPoolL[I, O]{
private: privateWpInfoL[I, O]{
pool: make(workersCollectionL[I, O], noWorkers),
workersJobsCh: make(JobStream[I], noWorkers),
finishedCh: make(finishedStream, noWorkers),
Expand All @@ -113,15 +113,15 @@ var eyeballs = []string{
"❤️", "💙", "💚", "💜", "💛", "🤍", "💖", "💗", "💝",
}

func (p *WorkerPool[I, O]) composeID() workerID {
func (p *WorkerPoolL[I, O]) composeID() workerID {
n := len(p.private.pool)
index := (n) % len(eyeballs)
emoji := eyeballs[index]

return workerID(fmt.Sprintf("(%v)WORKER-ID-%v:%v", emoji, n, uuid.NewString()))
}

func (p *WorkerPool[I, O]) Start(
func (p *WorkerPoolL[I, O]) Start(
parentContext context.Context,
parentCancel context.CancelFunc,
outputsChOut chan<- JobOutput[O],
Expand All @@ -134,7 +134,7 @@ func (p *WorkerPool[I, O]) Start(
)
}

func (p *WorkerPool[I, O]) run(
func (p *WorkerPoolL[I, O]) run(
parentContext context.Context,
parentCancel context.CancelFunc,
outputChTimeout time.Duration,
Expand Down Expand Up @@ -230,7 +230,7 @@ func (p *WorkerPool[I, O]) run(
}
}

func (p *WorkerPool[I, O]) spawn(
func (p *WorkerPoolL[I, O]) spawn(
parentContext context.Context,
parentCancel context.CancelFunc,
outputChTimeout time.Duration,
Expand All @@ -257,7 +257,7 @@ func (p *WorkerPool[I, O]) spawn(
)
}

func (p *WorkerPool[I, O]) drain(finishedChIn finishedStreamR) error {
func (p *WorkerPoolL[I, O]) drain(finishedChIn finishedStreamR) error {
p.Logger.Debug("waiting for remaining workers...",
slog.String("source", "WorkerPool.drain"),
slog.Int("pool size", len(p.private.pool)),
Expand Down
6 changes: 3 additions & 3 deletions boost/worker-pool-legacy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ type pipeline[I, O any] struct {
outputsDup *boost.Duplex[boost.JobOutput[O]]
provider helpers.ProviderFuncL[I]
producer *helpers.ProducerL[I, O]
pool *boost.WorkerPool[I, O]
pool *boost.WorkerPoolL[I, O]
consumer *helpers.ConsumerL[O]
cancel TerminatorFunc[I, O]
stop TerminatorFunc[I, O]
Expand Down Expand Up @@ -176,8 +176,8 @@ func (p *pipeline[I, O]) process(parentContext context.Context,
noWorkers int,
executive boost.ExecutiveFunc[I, O],
) {
p.pool = boost.NewWorkerPool[I, O](
&boost.NewWorkerPoolParams[I, O]{
p.pool = boost.NewWorkerPoolL[I, O](
&boost.NewWorkerPoolParamsL[I, O]{
NoWorkers: noWorkers,
OutputChTimeout: outputChTimeout,
Exec: executive,
Expand Down
Loading

0 comments on commit 5c3b453

Please sign in to comment.