Skip to content

Commit

Permalink
Move jobs backoff back into traefik
Browse files Browse the repository at this point in the history
  • Loading branch information
emilevauge committed Sep 27, 2016
1 parent 1b6af20 commit 364958c
Show file tree
Hide file tree
Showing 11 changed files with 209 additions and 120 deletions.
210 changes: 105 additions & 105 deletions glide.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion glide.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import:
subpackages:
- fun
- package: github.com/Sirupsen/logrus
- package: github.com/emilevauge/backoff
- package: github.com/cenk/backoff
- package: github.com/codegangsta/negroni
- package: github.com/containous/flaeg
version: a731c034dda967333efce5f8d276aeff11f8ff87
Expand Down
2 changes: 1 addition & 1 deletion integration/utils/try.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package utils

import (
"errors"
"github.com/emilevauge/backoff"
"github.com/cenk/backoff"
"net/http"
"strconv"
"time"
Expand Down
39 changes: 39 additions & 0 deletions job/job.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package job

import (
"github.com/cenk/backoff"
"time"
)

var (
_ backoff.BackOff = (*BackOff)(nil)
)

const (
defaultMinJobInterval = 30 * time.Second
)

// BackOff is an exponential backoff implementation for long running jobs.
// In long running jobs, an operation() that fails after a long Duration should not increments the backoff period.
// If operation() takes more than MinJobInterval, Reset() is called in NextBackOff().
type BackOff struct {
*backoff.ExponentialBackOff
MinJobInterval time.Duration
}

// NewBackOff creates an instance of BackOff using default values.
func NewBackOff(backOff *backoff.ExponentialBackOff) *BackOff {
backOff.MaxElapsedTime = 0
return &BackOff{
ExponentialBackOff: backOff,
MinJobInterval: defaultMinJobInterval,
}
}

// NextBackOff calculates the next backoff interval.
func (b *BackOff) NextBackOff() time.Duration {
if b.GetElapsedTime() >= b.MinJobInterval {
b.Reset()
}
return b.ExponentialBackOff.NextBackOff()
}
44 changes: 44 additions & 0 deletions job/job_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package job

import (
"github.com/cenk/backoff"
"testing"
"time"
)

func TestJobBackOff(t *testing.T) {
var (
testInitialInterval = 500 * time.Millisecond
testRandomizationFactor = 0.1
testMultiplier = 2.0
testMaxInterval = 5 * time.Second
testMinJobInterval = 1 * time.Second
)

exp := NewBackOff(backoff.NewExponentialBackOff())
exp.InitialInterval = testInitialInterval
exp.RandomizationFactor = testRandomizationFactor
exp.Multiplier = testMultiplier
exp.MaxInterval = testMaxInterval
exp.MinJobInterval = testMinJobInterval
exp.Reset()

var expectedResults = []time.Duration{500, 500, 500, 1000, 2000, 4000, 5000, 5000, 500, 1000, 2000, 4000, 5000, 5000}
for i, d := range expectedResults {
expectedResults[i] = d * time.Millisecond
}

for i, expected := range expectedResults {
// Assert that the next backoff falls in the expected range.
var minInterval = expected - time.Duration(testRandomizationFactor*float64(expected))
var maxInterval = expected + time.Duration(testRandomizationFactor*float64(expected))
if i < 3 || i == 8 {
time.Sleep(2 * time.Second)
}
var actualInterval = exp.NextBackOff()
if !(minInterval <= actualInterval && actualInterval <= maxInterval) {
t.Error("error")
}
// assertEquals(t, expected, exp.currentInterval)
}
}
5 changes: 3 additions & 2 deletions provider/consul_catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,10 @@ import (

"github.com/BurntSushi/ty/fun"
log "github.com/Sirupsen/logrus"
"github.com/cenk/backoff"
"github.com/containous/traefik/job"
"github.com/containous/traefik/safe"
"github.com/containous/traefik/types"
"github.com/emilevauge/backoff"
"github.com/hashicorp/consul/api"
)

Expand Down Expand Up @@ -330,7 +331,7 @@ func (provider *ConsulCatalog) Provide(configurationChan chan<- types.ConfigMess
operation := func() error {
return provider.watch(configurationChan, stop)
}
err := backoff.RetryNotify(operation, backoff.NewJobBackOff(backoff.NewExponentialBackOff()), notify)
err := backoff.RetryNotify(operation, job.NewBackOff(backoff.NewExponentialBackOff()), notify)
if err != nil {
log.Errorf("Cannot connect to consul server %+v", err)
}
Expand Down
5 changes: 3 additions & 2 deletions provider/docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import (

"github.com/BurntSushi/ty/fun"
log "github.com/Sirupsen/logrus"
"github.com/cenk/backoff"
"github.com/containous/traefik/job"
"github.com/containous/traefik/safe"
"github.com/containous/traefik/types"
"github.com/containous/traefik/version"
Expand All @@ -26,7 +28,6 @@ import (
swarmtypes "github.com/docker/engine-api/types/swarm"
"github.com/docker/go-connections/nat"
"github.com/docker/go-connections/sockets"
"github.com/emilevauge/backoff"
"github.com/vdemeester/docker-events"
)

Expand Down Expand Up @@ -223,7 +224,7 @@ func (provider *Docker) Provide(configurationChan chan<- types.ConfigMessage, po
notify := func(err error, time time.Duration) {
log.Errorf("Docker connection error %+v, retrying in %s", err, time)
}
err := backoff.RetryNotify(operation, backoff.NewJobBackOff(backoff.NewExponentialBackOff()), notify)
err := backoff.RetryNotify(operation, job.NewBackOff(backoff.NewExponentialBackOff()), notify)
if err != nil {
log.Errorf("Cannot connect to docker server %+v", err)
}
Expand Down
5 changes: 3 additions & 2 deletions provider/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,11 @@ import (
"time"

log "github.com/Sirupsen/logrus"
"github.com/cenk/backoff"
"github.com/containous/traefik/job"
"github.com/containous/traefik/provider/k8s"
"github.com/containous/traefik/safe"
"github.com/containous/traefik/types"
"github.com/emilevauge/backoff"
)

const (
Expand Down Expand Up @@ -146,7 +147,7 @@ func (provider *Kubernetes) Provide(configurationChan chan<- types.ConfigMessage
notify := func(err error, time time.Duration) {
log.Errorf("Kubernetes connection error %+v, retrying in %s", err, time)
}
err := backoff.RetryNotify(operation, backoff.NewJobBackOff(backoff.NewExponentialBackOff()), notify)
err := backoff.RetryNotify(operation, job.NewBackOff(backoff.NewExponentialBackOff()), notify)
if err != nil {
log.Errorf("Cannot connect to Kubernetes server %+v", err)
}
Expand Down
7 changes: 4 additions & 3 deletions provider/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,12 @@ import (
"errors"
"github.com/BurntSushi/ty/fun"
log "github.com/Sirupsen/logrus"
"github.com/cenk/backoff"
"github.com/containous/traefik/job"
"github.com/containous/traefik/safe"
"github.com/containous/traefik/types"
"github.com/docker/libkv"
"github.com/docker/libkv/store"
"github.com/emilevauge/backoff"
)

// Kv holds common configurations of key-value providers.
Expand Down Expand Up @@ -75,7 +76,7 @@ func (provider *Kv) watchKv(configurationChan chan<- types.ConfigMessage, prefix
notify := func(err error, time time.Duration) {
log.Errorf("KV connection error: %+v, retrying in %s", err, time)
}
err := backoff.RetryNotify(operation, backoff.NewJobBackOff(backoff.NewExponentialBackOff()), notify)
err := backoff.RetryNotify(operation, job.NewBackOff(backoff.NewExponentialBackOff()), notify)
if err != nil {
return fmt.Errorf("Cannot connect to KV server: %v", err)
}
Expand Down Expand Up @@ -106,7 +107,7 @@ func (provider *Kv) provide(configurationChan chan<- types.ConfigMessage, pool *
notify := func(err error, time time.Duration) {
log.Errorf("KV connection error: %+v, retrying in %s", err, time)
}
err := backoff.RetryNotify(operation, backoff.NewJobBackOff(backoff.NewExponentialBackOff()), notify)
err := backoff.RetryNotify(operation, job.NewBackOff(backoff.NewExponentialBackOff()), notify)
if err != nil {
return fmt.Errorf("Cannot connect to KV server: %v", err)
}
Expand Down
5 changes: 3 additions & 2 deletions provider/marathon.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,10 @@ import (

"github.com/BurntSushi/ty/fun"
log "github.com/Sirupsen/logrus"
"github.com/cenk/backoff"
"github.com/containous/traefik/job"
"github.com/containous/traefik/safe"
"github.com/containous/traefik/types"
"github.com/emilevauge/backoff"
"github.com/gambol99/go-marathon"
)

Expand Down Expand Up @@ -110,7 +111,7 @@ func (provider *Marathon) Provide(configurationChan chan<- types.ConfigMessage,
notify := func(err error, time time.Duration) {
log.Errorf("Marathon connection error %+v, retrying in %s", err, time)
}
err := backoff.RetryNotify(operation, backoff.NewJobBackOff(backoff.NewExponentialBackOff()), notify)
err := backoff.RetryNotify(operation, job.NewBackOff(backoff.NewExponentialBackOff()), notify)
if err != nil {
log.Errorf("Cannot connect to Marathon server %+v", err)
}
Expand Down
5 changes: 3 additions & 2 deletions provider/mesos.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,10 @@ import (
"fmt"
"github.com/BurntSushi/ty/fun"
log "github.com/Sirupsen/logrus"
"github.com/cenk/backoff"
"github.com/containous/traefik/job"
"github.com/containous/traefik/safe"
"github.com/containous/traefik/types"
"github.com/emilevauge/backoff"
"github.com/mesos/mesos-go/detector"
_ "github.com/mesos/mesos-go/detector/zoo" // Registers the ZK detector
"github.com/mesosphere/mesos-dns/detect"
Expand Down Expand Up @@ -110,7 +111,7 @@ func (provider *Mesos) Provide(configurationChan chan<- types.ConfigMessage, poo
notify := func(err error, time time.Duration) {
log.Errorf("mesos connection error %+v, retrying in %s", err, time)
}
err := backoff.RetryNotify(operation, backoff.NewJobBackOff(backoff.NewExponentialBackOff()), notify)
err := backoff.RetryNotify(operation, job.NewBackOff(backoff.NewExponentialBackOff()), notify)
if err != nil {
log.Errorf("Cannot connect to mesos server %+v", err)
}
Expand Down

0 comments on commit 364958c

Please sign in to comment.