Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add Concurrency entity for worker #1405

Closed
wants to merge 13 commits into from
4 changes: 0 additions & 4 deletions internal/common/autoscaler/autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,6 @@ package autoscaler
type (
AutoScaler interface {
Estimator
// Acquire X ResourceUnit of resource
Acquire(ResourceUnit) error
// Release X ResourceUnit of resource
Release(ResourceUnit)
Comment on lines -27 to -30
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these methods are unnecessary after Permit introduction

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

with this removed, this is basically a "periodically update a cache and broadcast" thing, rather than a resource-controller.

which seems fine, just checking that that's what is needed. without anything using the hooks or GetCurrent() I'm kinda struggling to figure out what's needed eventually vs something we should get rid of while we can.

// GetCurrent ResourceUnit of resource
GetCurrent() ResourceUnit
// Start starts the autoscaler go routine that scales the ResourceUnit according to Estimator
Expand Down
23 changes: 7 additions & 16 deletions internal/internal_poller_autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@ import (
"sync"
"time"

"github.com/marusama/semaphore/v2"
"go.uber.org/atomic"
"go.uber.org/zap"

"go.uber.org/cadence/internal/common/autoscaler"
"go.uber.org/cadence/internal/worker"
)

// defaultPollerScalerCooldownInSeconds
Expand All @@ -53,7 +53,7 @@ type (
isDryRun bool
cooldownTime time.Duration
logger *zap.Logger
sem semaphore.Semaphore // resizable semaphore to control number of concurrent pollers
permit worker.Permit
ctx context.Context
cancel context.CancelFunc
wg *sync.WaitGroup // graceful stop
Expand Down Expand Up @@ -82,6 +82,7 @@ type (
func newPollerScaler(
options pollerAutoScalerOptions,
logger *zap.Logger,
permit worker.Permit,
hooks ...func()) *pollerAutoScaler {
if !options.Enabled {
return nil
Expand All @@ -91,7 +92,7 @@ func newPollerScaler(
isDryRun: options.DryRun,
cooldownTime: options.Cooldown,
logger: logger,
sem: semaphore.New(options.InitCount),
permit: permit,
wg: &sync.WaitGroup{},
ctx: ctx,
cancel: cancel,
Expand All @@ -107,19 +108,9 @@ func newPollerScaler(
}
}

// Acquire concurrent poll quota
func (p *pollerAutoScaler) Acquire(resource autoscaler.ResourceUnit) error {
return p.sem.Acquire(p.ctx, int(resource))
}

// Release concurrent poll quota
func (p *pollerAutoScaler) Release(resource autoscaler.ResourceUnit) {
p.sem.Release(int(resource))
}

// GetCurrent poll quota
func (p *pollerAutoScaler) GetCurrent() autoscaler.ResourceUnit {
return autoscaler.ResourceUnit(p.sem.GetLimit())
return autoscaler.ResourceUnit(p.permit.Quota())
}

// Start an auto-scaler go routine and returns a done to stop it
Expand All @@ -133,7 +124,7 @@ func (p *pollerAutoScaler) Start() {
case <-p.ctx.Done():
return
case <-time.After(p.cooldownTime):
currentResource := autoscaler.ResourceUnit(p.sem.GetLimit())
currentResource := autoscaler.ResourceUnit(p.permit.Quota())
currentUsages, err := p.pollerUsageEstimator.Estimate()
if err != nil {
logger.Warnw("poller autoscaler skip due to estimator error", "error", err)
Expand All @@ -146,7 +137,7 @@ func (p *pollerAutoScaler) Start() {
"recommend", uint64(proposedResource),
"isDryRun", p.isDryRun)
if !p.isDryRun {
p.sem.SetLimit(int(proposedResource))
p.permit.SetQuota(int(proposedResource))
}
p.pollerUsageEstimator.Reset()

Expand Down
7 changes: 5 additions & 2 deletions internal/internal_poller_autoscaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,14 @@
package internal

import (
"context"
"math/rand"
"sync"
"testing"
"time"

"go.uber.org/cadence/internal/common/testlogger"
"go.uber.org/cadence/internal/worker"

"github.com/stretchr/testify/assert"
"go.uber.org/atomic"
Expand Down Expand Up @@ -171,6 +173,7 @@ func Test_pollerAutoscaler(t *testing.T) {
TargetUtilization: float64(tt.args.targetMilliUsage) / 1000,
},
testlogger.NewZap(t),
worker.NewResizablePermit(tt.args.initialPollerCount),
// hook function that collects number of iterations
func() {
autoscalerEpoch.Add(1)
Expand All @@ -190,9 +193,9 @@ func Test_pollerAutoscaler(t *testing.T) {
go func() {
defer wg.Done()
for pollResult := range pollChan {
pollerScaler.Acquire(1)
pollerScaler.permit.Acquire(context.Background())
pollerScaler.CollectUsage(pollResult)
pollerScaler.Release(1)
pollerScaler.permit.Release()
}
}()
}
Expand Down
38 changes: 22 additions & 16 deletions internal/internal_worker_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"time"

"go.uber.org/cadence/internal/common/debug"
"go.uber.org/cadence/internal/worker"

"github.com/uber-go/tally"
"go.uber.org/zap"
Expand Down Expand Up @@ -141,7 +142,7 @@ type (
logger *zap.Logger
metricsScope tally.Scope

pollerRequestCh chan struct{}
concurrency *worker.ConcurrencyLimit
pollerAutoScaler *pollerAutoScaler
taskQueueCh chan interface{}
sessionTokenBucket *sessionTokenBucket
Expand All @@ -167,11 +168,18 @@ func createPollRetryPolicy() backoff.RetryPolicy {
func newBaseWorker(options baseWorkerOptions, logger *zap.Logger, metricsScope tally.Scope, sessionTokenBucket *sessionTokenBucket) *baseWorker {
ctx, cancel := context.WithCancel(context.Background())

concurrency := &worker.ConcurrencyLimit{
PollerPermit: worker.NewResizablePermit(options.pollerCount),
TaskPermit: worker.NewChannelPermit(options.maxConcurrentTask),
}
Comment on lines +171 to +174
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently these are two quite different things, but they're sharing a "Permit" type (sharing the name is fine, their semantics are similar)

All they have in common is

Quota() int
Release()

because one is change-able and one is not, one uses chans and one does not.

Is there a need to combine their APIs in Permit? seems like it's just making compile-time guarantees weaker for no benefit.


var pollerAS *pollerAutoScaler
if pollerOptions := options.pollerAutoScaler; pollerOptions.Enabled {
concurrency.PollerPermit = worker.NewResizablePermit(pollerOptions.InitCount)
pollerAS = newPollerScaler(
pollerOptions,
logger,
concurrency.PollerPermit,
)
}

Expand All @@ -182,7 +190,7 @@ func newBaseWorker(options baseWorkerOptions, logger *zap.Logger, metricsScope t
retrier: backoff.NewConcurrentRetrier(pollOperationRetryPolicy),
logger: logger.With(zapcore.Field{Key: tagWorkerType, Type: zapcore.StringType, String: options.workerType}),
metricsScope: tagScope(metricsScope, tagWorkerType, options.workerType),
pollerRequestCh: make(chan struct{}, options.maxConcurrentTask),
concurrency: concurrency,
pollerAutoScaler: pollerAS,
taskQueueCh: make(chan interface{}), // no buffer, so poller only able to poll new task after previous is dispatched.
limiterContext: ctx,
Expand Down Expand Up @@ -244,11 +252,13 @@ func (bw *baseWorker) runPoller() {
select {
case <-bw.shutdownCh:
return
case <-bw.pollerRequestCh:
bw.metricsScope.Gauge(metrics.ConcurrentTaskQuota).Update(float64(cap(bw.pollerRequestCh)))
// This metric is used to monitor how many poll requests have been allocated
// and can be used to approximate number of concurrent task running (not pinpoint accurate)
bw.metricsScope.Gauge(metrics.PollerRequestBufferUsage).Update(float64(cap(bw.pollerRequestCh) - len(bw.pollerRequestCh)))
case <-bw.concurrency.TaskPermit.GetChan(): // don't poll unless there is a task permit
// TODO move to a centralized place inside the worker
// emit metrics on concurrent task permit quota and current task permit count
// NOTE task permit doesn't mean there is a task running, it still needs to poll until it gets a task to process
// thus the metrics is only an estimated value of how many tasks are running concurrently
bw.metricsScope.Gauge(metrics.ConcurrentTaskQuota).Update(float64(bw.concurrency.TaskPermit.Quota()))
bw.metricsScope.Gauge(metrics.PollerRequestBufferUsage).Update(float64(bw.concurrency.TaskPermit.Count()))
if bw.sessionTokenBucket != nil {
bw.sessionTokenBucket.waitForAvailableToken()
}
Expand All @@ -260,10 +270,6 @@ func (bw *baseWorker) runPoller() {
func (bw *baseWorker) runTaskDispatcher() {
defer bw.shutdownWG.Done()

for i := 0; i < bw.options.maxConcurrentTask; i++ {
bw.pollerRequestCh <- struct{}{}
}

for {
// wait for new task or shutdown
select {
Expand Down Expand Up @@ -294,10 +300,10 @@ func (bw *baseWorker) pollTask() {
var task interface{}

if bw.pollerAutoScaler != nil {
if pErr := bw.pollerAutoScaler.Acquire(1); pErr == nil {
defer bw.pollerAutoScaler.Release(1)
if pErr := bw.concurrency.PollerPermit.Acquire(bw.limiterContext); pErr == nil {
defer bw.concurrency.PollerPermit.Release()
} else {
bw.logger.Warn("poller auto scaler acquire error", zap.Error(pErr))
bw.logger.Warn("poller permit acquire error", zap.Error(pErr))
}
}

Expand Down Expand Up @@ -333,7 +339,7 @@ func (bw *baseWorker) pollTask() {
case <-bw.shutdownCh:
}
} else {
bw.pollerRequestCh <- struct{}{} // poll failed, trigger a new poll
bw.concurrency.TaskPermit.Release() // poll failed, trigger a new poll by returning a task permit
}
}

Expand Down Expand Up @@ -368,7 +374,7 @@ func (bw *baseWorker) processTask(task interface{}) {
}

if isPolledTask {
bw.pollerRequestCh <- struct{}{}
bw.concurrency.TaskPermit.Release() // task processed, trigger a new poll by returning a task permit
}
}()
err := bw.options.taskWorker.ProcessTask(task)
Expand Down
70 changes: 70 additions & 0 deletions internal/worker/channel_permit.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
// Copyright (c) 2017-2021 Uber Technologies Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package worker

import (
"context"
"fmt"
)

type channelPermit struct {
channel chan struct{}
}

// NewChannelPermit creates a static permit that's not resizable
func NewChannelPermit(count int) ChannelPermit {
channel := make(chan struct{}, count)
for i := 0; i < count; i++ {
channel <- struct{}{}
}
return &channelPermit{channel: channel}

Check warning on line 38 in internal/worker/channel_permit.go

View check run for this annotation

Codecov / codecov/patch

internal/worker/channel_permit.go#L33-L38

Added lines #L33 - L38 were not covered by tests
}

func (p *channelPermit) Acquire(ctx context.Context) error {
select {
case <-ctx.Done():
return fmt.Errorf("failed to acquire permit before context is done")
case p.channel <- struct{}{}:
return nil

Check warning on line 46 in internal/worker/channel_permit.go

View check run for this annotation

Codecov / codecov/patch

internal/worker/channel_permit.go#L41-L46

Added lines #L41 - L46 were not covered by tests
}
}
Comment on lines +41 to +48
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pretty strong sign that there need to be more tests: without a GetChan() reader, this will never succeed (and then it'll allow too many things / may block release permanently)


// AcquireChan returns a permit ready channel
func (p *channelPermit) GetChan() <-chan struct{} {
return p.channel

Check warning on line 52 in internal/worker/channel_permit.go

View check run for this annotation

Codecov / codecov/patch

internal/worker/channel_permit.go#L51-L52

Added lines #L51 - L52 were not covered by tests
}

func (p *channelPermit) Release() {
p.channel <- struct{}{}

Check warning on line 56 in internal/worker/channel_permit.go

View check run for this annotation

Codecov / codecov/patch

internal/worker/channel_permit.go#L55-L56

Added lines #L55 - L56 were not covered by tests
}

// Count returns the number of permits available
func (p *channelPermit) Count() int {
return len(p.channel)

Check warning on line 61 in internal/worker/channel_permit.go

View check run for this annotation

Codecov / codecov/patch

internal/worker/channel_permit.go#L60-L61

Added lines #L60 - L61 were not covered by tests
}

func (p *channelPermit) Quota() int {
return cap(p.channel)

Check warning on line 65 in internal/worker/channel_permit.go

View check run for this annotation

Codecov / codecov/patch

internal/worker/channel_permit.go#L64-L65

Added lines #L64 - L65 were not covered by tests
}

// SetQuota on static permit doesn't take effect
func (p *channelPermit) SetQuota(_ int) {

Check warning on line 69 in internal/worker/channel_permit.go

View check run for this annotation

Codecov / codecov/patch

internal/worker/channel_permit.go#L69

Added line #L69 was not covered by tests
}
48 changes: 48 additions & 0 deletions internal/worker/concurrency.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// Copyright (c) 2017-2021 Uber Technologies Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package worker

import (
"context"
)

var _ Permit = (*resizablePermit)(nil)
var _ ChannelPermit = (*channelPermit)(nil)

// ConcurrencyLimit contains synchronization primitives for dynamically controlling the concurrencies in workers
type ConcurrencyLimit struct {
PollerPermit Permit // controls concurrency of pollers
TaskPermit ChannelPermit // controls concurrency of task processing
}

// Permit is an adaptive permit issuer to control concurrency
type Permit interface {
Acquire(context.Context) error
Count() int
Quota() int
Release()
SetQuota(int)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems worth splitting this out so you don't have to no-op it. It isn't even needed - this all compiles fine with SetQuota only defined on a non-channel permit.

}

type ChannelPermit interface {
Permit
GetChan() <-chan struct{} // fetch the underlying channel
}
Loading
Loading