Table of Contents generated with DocToc
wait库内的各种function,大体来说都是以轮询的形式,根据时间间隔、条件判断,来确定工具执行函数是否应被继续执行
条件类型 | 说明 |
---|---|
Until类 | 用得最多的类型,一般以一条chan struct{} 或context Done接收done信号作为终止轮询的依据 |
poll类 | 两条channel,一条用作传递单次执行信号用来轮询,一条用作传递done信号 |
Backoff类 | 每间隔一定的时长执行一次回溯函数,一般情况下,间隔时长随着回溯次数递增而倍数级延长,但间隔时长也会有上限值 |
// k8s.io/[email protected]/pkg/util/wait/wait.go
// Until函数每period会调度f函数,如果stopCh中有停止信号,则退出。
// 当程序运行时间超过period时,也不会退出调度循环,该特性和Ticker相同。底层使用Timer实现。
func Until(f func(), period time.Duration, stopCh <-chan struct{}) {
JitterUntil(f, period, 0.0, true, stopCh)
}
func NonSlidingUntil(f func(), period time.Duration, stopCh <-chan struct{}) {
JitterUntil(f, period, 0.0, false, stopCh)
}
Until和NonSlidingUntil为一对,UntilWithContext和NonSlidingUntilWithContext为一对,区别只是定时器启动时间点不同,可以简单用下图表示:
最终调用的都是 JitterUntil
func JitterUntil(f func(), period time.Duration, jitterFactor float64, sliding bool, stopCh <-chan struct{}) {
BackoffUntil(f, NewJitteredBackoffManager(period, jitterFactor, &clock.RealClock{}), sliding, stopCh)
}
If sliding is true, the period is computed after f runs. If it is false then period includes the runtime for f. sliding为true时,时间不包括f运行时间,即f函数执行完成后计时周期。
// k8s.io/[email protected]/pkg/util/wait/wait.go
func BackoffUntil(f func(), backoff BackoffManager, sliding bool, stopCh <-chan struct{}) {
var t clock.Timer
for {
select {
case <-stopCh:
// f()执行前先判断一次stopCh是否有信号,执行完之后也还要执行一次,说明见下方
return
default:
}
if !sliding {
// sliding为false时,时间包括f运行时间
t = backoff.Backoff()
}
func() {
defer runtime.HandleCrash()
f()
}()
if sliding {
// sliding为true时,时间不包括f运行时间,即f函数执行完成后计时周期
t = backoff.Backoff()
}
// NOTE: b/c there is no priority selection in golang
// it is possible for this to race, meaning we could
// trigger t.C and stopCh, and t.C select falls through.
// In order to mitigate we re-check stopCh at the beginning
// of every loop to prevent extra executions of f().
select {
case <-stopCh:
if !t.Stop() {
<-t.C()
}
return
case <-t.C():
}
}
}
// 成功的判断依据
type ConditionFunc func() (done bool, err error)
// 等待 interval 后执行一次 ConditionFunc,之后每隔 interval 时间执行一次 ConditionFunc,直到它返回 true,err
func Poll(interval, timeout time.Duration, condition ConditionFunc) error {
return PollWithContext(context.Background(), interval, timeout, condition.WithContext())
}
func PollInfinite(interval time.Duration, condition ConditionFunc) error {
return PollInfiniteWithContext(context.Background(), interval, condition.WithContext())
}
func PollInfiniteWithContext(ctx context.Context, interval time.Duration, condition ConditionWithContextFunc) error {
return poll(ctx, false, poller(interval, 0), condition)
}
func PollUntil(interval time.Duration, condition ConditionFunc, stopCh <-chan struct{}) error {
ctx, cancel := ContextForChannel(stopCh)
defer cancel()
return PollUntilWithContext(ctx, interval, condition.WithContext())
}
func PollUntilWithContext(ctx context.Context, interval time.Duration, condition ConditionWithContextFunc) error {
return poll(ctx, false, poller(interval, 0), condition)
}
// PollImmediate 的作用是设定一个间隔时间和超时时间以及一个函数,在超时时间内没到达一个间隔时间就执行一次函数,直到函数返回 true 或者到达超时时间。
func PollImmediate(interval, timeout time.Duration, condition ConditionFunc) error {
return PollImmediateWithContext(context.Background(), interval, timeout, condition.WithContext())
}
func PollImmediateWithContext(ctx context.Context, interval, timeout time.Duration, condition ConditionWithContextFunc) error {
return poll(ctx, true, poller(interval, timeout), condition)
}
func poll(ctx context.Context, immediate bool, wait WaitWithContextFunc, condition ConditionWithContextFunc) error {
if immediate {
done, err := runConditionWithCrashProtectionWithContext(ctx, condition)
if err != nil {
return err
}
if done {
return nil
}
}
select {
case <-ctx.Done():
// returning ctx.Err() will break backward compatibility
return ErrWaitTimeout
default:
return WaitForWithContext(ctx, wait, condition)
}
}
func WaitForWithContext(ctx context.Context, wait WaitWithContextFunc, fn ConditionWithContextFunc) error {
waitCtx, cancel := context.WithCancel(context.Background())
defer cancel()
c := wait(waitCtx)
for {
select {
case _, open := <-c: // 达到运行条件
ok, err := runConditionWithCrashProtectionWithContext(ctx, fn)
if err != nil {
return err
}
if ok {
return nil
}
if !open {
return ErrWaitTimeout
}
case <-ctx.Done():
// returning ctx.Err() will break backward compatibility
return ErrWaitTimeout
}
}
}
WaitWithContextFunc 的实现 poller
func poller(interval, timeout time.Duration) WaitWithContextFunc {
return WaitWithContextFunc(func(ctx context.Context) <-chan struct{} {
// 执行信号chan
ch := make(chan struct{})
go func() {
defer close(ch)
tick := time.NewTicker(interval)
defer tick.Stop()
// 默认无超时时间设定
var after <-chan time.Time
if timeout != 0 {
// time.After is more convenient, but it
// potentially leaves timers around much longer
// than necessary if we exit early.
timer := time.NewTimer(timeout)
after = timer.C
defer timer.Stop()
}
for {
select {
case <-tick.C:
// If the consumer isn't ready for this signal drop it and
// check the other channels.
select {
case ch <- struct{}{}:
default:
}
case <-after:
// 超时
return
case <-ctx.Done():
return
}
}
}()
return ch
})
}
// https://github.com/argoproj/argo-workflows/blob/7173a271bb9c59ca67df7a06965eb80afd37c0cb/workflow/controller/controller.go
func (wfc *WorkflowController) createClusterWorkflowTemplateInformer(ctx context.Context) {
// ...
if cwftGetAllowed && cwftListAllowed && cwftWatchAllowed {
wfc.cwftmplInformer = informer.NewTolerantClusterWorkflowTemplateInformer(wfc.dynamicInterface, clusterWorkflowTemplateResyncPeriod)
go wfc.cwftmplInformer.Informer().Run(ctx.Done())
// since the above call is asynchronous, make sure we populate our cache before we try to use it later
if !cache.WaitForCacheSync(
ctx.Done(),
wfc.cwftmplInformer.Informer().HasSynced,
) {
log.Fatal("Timed out waiting for ClusterWorkflowTemplate cache to sync")
}
} else {
log.Warnf("Controller doesn't have RBAC access for ClusterWorkflowTemplates")
}
}
func WaitForCacheSync(stopCh <-chan struct{}, cacheSyncs ...InformerSynced) bool {
err := wait.PollImmediateUntil(syncedPollPeriod,
func() (bool, error) {
for _, syncFunc := range cacheSyncs {
if !syncFunc() {
return false, nil
}
}
return true, nil
},
stopCh)
if err != nil {
klog.V(2).Infof("stop requested")
return false
}
klog.V(4).Infof("caches populated")
return true
}
func ExponentialBackoff(backoff Backoff, condition ConditionFunc) error {
// 在最外层限定了backoff函数的最多重复执行次数,即等于Steps字段值
for backoff.Steps > 0 {
if ok, err := runConditionWithCrashProtection(condition); err != nil || ok {
// 过程中condition()条件函数执行异常或正常则直接返回
return err
}
if backoff.Steps == 1 {
break
}
time.Sleep(backoff.Step())
}
return ErrWaitTimeout
}
ExponentialBackoff可以实现在函数执行错误后实现以指数退避方式的延时重试。ExponentialBackoff内部使用的是time.Sleep
type Backoff struct {
// 表示初始的延时时间
Duration time.Duration
// Duration is multiplied by factor each iteration. Must be greater
// than or equal to zero.
// 指数退避的因子
Factor float64
// 可以看作是偏差因子,该值越大,每次重试的延时的可选区间越大
Jitter float64
// The number of steps before duration stops changing. If zero, initial
// duration is always used. Used for exponential backoff in combination
// with Factor.
// 指数退避的步数,可以看作程序的最大重试次数
Steps int
// The returned duration will never be greater than cap *before* jitter
// is applied. The actual maximum cap is `cap * (1.0 + jitter)`.
// 用于在Factor非0时限制最大延时时间和最大重试次数,为0表示不限制最大延时时间
Cap time.Duration
}
func (b *Backoff) Step() time.Duration {
if b.Steps < 1 {
if b.Jitter > 0 {
return Jitter(b.Duration, b.Jitter)
}
return b.Duration
}
b.Steps--
duration := b.Duration
// calculate the next step
if b.Factor != 0 {
b.Duration = time.Duration(float64(b.Duration) * b.Factor)
if b.Cap > 0 && b.Duration > b.Cap {
b.Duration = b.Cap
b.Steps = 0
}
}
if b.Jitter > 0 {
duration = Jitter(duration, b.Jitter)
}
return duration
}
func Jitter(duration time.Duration, maxFactor float64) time.Duration {
if maxFactor <= 0.0 {
maxFactor = 1.0
}
wait := duration + time.Duration(rand.Float64()*maxFactor*float64(duration))
return wait
}
- Duration= 1 * time.Second, Factor= 0,Jitter=0.5,根据下面计算方式wait := duration + time.Duration(rand.Float64()maxFactorfloat64(duration)),即1+[0.0,1.0)*0.5,预期duration为[1s,1.5s)
- Duration= 1 * time.Second, Factor= 3,根据下面计算方式b.Duration = time.Duration(float64(b.Duration) * b.Factor),即duration(1) = duration*3 ,duration(2) = 3 * duration(1)
创造性地将sync.WaitGroup与chan和ctx结合,实现了协程间同步和等待全部Group中的协程结束的功能。由于StartWithChannel和StartWithContext的入参函数类型比较固定,因此使用上并不通用,但可以作为参考
func (g *Group) Wait()
func (g *Group) StartWithChannel(stopCh <-chan struct{}, f func(stopCh <-chan struct{}))
func (g *Group) StartWithContext(ctx context.Context, f func(context.Context))