Skip to content

Commit

Permalink
Add max-session-length flag and logic (#4592)
Browse files Browse the repository at this point in the history
This makes the agent disconnect and reconnect after some configurable
amount of time (+ built-in jitter). The aim of this change is to have
agents organically rebalance their connections to the backends over
time.

Signed-off-by: Cyril Cressent <[email protected]>
  • Loading branch information
ccressent authored Jan 31, 2022
1 parent d8694ba commit ba2acff
Show file tree
Hide file tree
Showing 5 changed files with 60 additions and 16 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ Versioning](http://semver.org/spec/v2.0.0.html).

### Added
- Added metric threshold service checks.
- Added `--max-session-length` flag to agent to configure the maximum duration
after which the agent will reconnect to one of its backends.
- Added API group version information to the /version endpoint.

### Changed
Expand Down
62 changes: 47 additions & 15 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"errors"
"fmt"
"io/ioutil"
"math/rand"
"net/http"
"net/url"
"os"
Expand Down Expand Up @@ -162,6 +163,7 @@ type Agent struct {
unmarshal UnmarshalFunc
sequencesMu sync.Mutex
sequences map[string]int64
maxSessionLength time.Duration

// ProcessGetter gets information about local agent processes.
ProcessGetter process.Getter
Expand All @@ -183,20 +185,21 @@ func NewAgentContext(ctx context.Context, config *Config) (*Agent, error) {
return nil, errors.New("keepalive warning timeout must be greater than keepalive interval")
}
agent := &Agent{
backendSelector: &RandomBackendSelector{Backends: config.BackendURLs},
connected: false,
config: config,
executor: command.NewExecutor(),
handler: handler.NewMessageHandler(),
entityConfigCh: make(chan struct{}),
inProgress: make(map[string]*corev2.CheckConfig),
inProgressMu: &sync.Mutex{},
sendq: make(chan *transport.Message, 10),
systemInfo: &corev2.System{},
unmarshal: UnmarshalJSON,
marshal: MarshalJSON,
ProcessGetter: &process.NoopProcessGetter{},
sequences: make(map[string]int64),
backendSelector: &RandomBackendSelector{Backends: config.BackendURLs},
connected: false,
config: config,
executor: command.NewExecutor(),
handler: handler.NewMessageHandler(),
entityConfigCh: make(chan struct{}),
inProgress: make(map[string]*corev2.CheckConfig),
inProgressMu: &sync.Mutex{},
sendq: make(chan *transport.Message, 10),
systemInfo: &corev2.System{},
unmarshal: UnmarshalJSON,
marshal: MarshalJSON,
ProcessGetter: &process.NoopProcessGetter{},
sequences: make(map[string]int64),
maxSessionLength: config.MaxSessionLength,
}

agent.statsdServer = NewStatsdServer(agent)
Expand Down Expand Up @@ -467,16 +470,20 @@ func (a *Agent) connectionManager(ctx context.Context, cancel context.CancelFunc

newConnections.WithLabelValues().Inc()

go a.enforceMaxSessionLength(connCancel)
go a.receiveLoop(connCtx, connCancel, conn)

// Block until we receive an entity config, or the grace period expires,
// unless the agent manages its entity
if !a.config.AgentManagedEntity {
entityConfigGracePeriodTimer := time.NewTimer(entityConfigGracePeriod)

select {
case <-a.entityConfigCh:
logger.Debug("successfully received the initial entity config")
case <-time.After(entityConfigGracePeriod):
case <-entityConfigGracePeriodTimer.C:
logger.Warning("the initial entity config was never received, using the local entity")
entityConfigGracePeriodTimer.Stop()
case <-connCtx.Done():
// The connection was closed before we received an entity config or we
// reached the grace period
Expand All @@ -493,6 +500,31 @@ func (a *Agent) connectionManager(ctx context.Context, cancel context.CancelFunc
}
}

// enforceMaxSessionLength cancels the connection's context after some amount of
// time, forcing the agent to reconnect to one of the configured backends.
//
// That amount of time, the timeout, is the maximum session length minus some
// random jitter, to avoid creating potential thundering herds where all the
// agents started at the same time and with the maximum same session length all
// reconnect at once. The jitter is a random duration between 0 and
// half the maximum session length.
//
// This effectively makes agents reconnect after some random duration between
// half --max-session-length and --max-session-length.
func (a *Agent) enforceMaxSessionLength(connCancel context.CancelFunc) {
if a.maxSessionLength > 0 {
jitter := time.Duration(rand.Float64() * 0.5 * float64(a.maxSessionLength))
timeout := a.maxSessionLength - jitter

logger.Infof("Session will be terminated in %v", timeout)
<-time.After(timeout)
logger.Infof("Ending session after %v (max session length is %v)", timeout, a.maxSessionLength)
connCancel()
} else {
logger.Debugf("maxSessionLength is %v, agent won't periodically disconnect", a.maxSessionLength)
}
}

func (a *Agent) receiveLoop(ctx context.Context, cancel context.CancelFunc, conn transport.Transport) {
defer cancel()
for {
Expand Down
4 changes: 4 additions & 0 deletions agent/cmd/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ const (
flagRetryMin = "retry-min"
flagRetryMax = "retry-max"
flagRetryMultiplier = "retry-multiplier"
flagMaxSessionLength = "max-session-length"

// TLS flags
flagTrustedCAFile = "trusted-ca-file"
Expand Down Expand Up @@ -138,6 +139,7 @@ func NewAgentConfig(cmd *cobra.Command) (*agent.Config, error) {
cfg.RetryMin = viper.GetDuration(flagRetryMin)
cfg.RetryMax = viper.GetDuration(flagRetryMax)
cfg.RetryMultiplier = viper.GetFloat64(flagRetryMultiplier)
cfg.MaxSessionLength = viper.GetDuration(flagMaxSessionLength)

// Set the labels & annotations using values defined configuration files
// and/or environment variables for now
Expand Down Expand Up @@ -320,6 +322,7 @@ func handleConfig(cmd *cobra.Command, arguments []string) error {
viper.SetDefault(flagRetryMin, time.Second)
viper.SetDefault(flagRetryMax, 120*time.Second)
viper.SetDefault(flagRetryMultiplier, 2.0)
viper.SetDefault(flagMaxSessionLength, 0*time.Second)

// Merge in flag set so that it appears in command usage
flags := flagSet()
Expand Down Expand Up @@ -441,6 +444,7 @@ func flagSet() *pflag.FlagSet {
flagSet.Duration(flagRetryMin, viper.GetDuration(flagRetryMin), "minimum amount of time to wait before retrying an agent connection to the backend")
flagSet.Duration(flagRetryMax, viper.GetDuration(flagRetryMax), "maximum amount of time to wait before retrying an agent connection to the backend")
flagSet.Float64(flagRetryMultiplier, viper.GetFloat64(flagRetryMultiplier), "value multiplied with the current retry delay to produce a longer retry delay (bounded by --retry-max)")
flagSet.Duration(flagMaxSessionLength, viper.GetDuration(flagMaxSessionLength), "maximum amount of time after which the agent will reconnect to one of the configured backends (no maximum by default)")

flagSet.SetOutput(ioutil.Discard)

Expand Down
4 changes: 4 additions & 0 deletions agent/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,10 @@ type Config struct {
// RetryMultiplier is multiplied with the current retry delay to produce
// a longer retry delay. It is bounded by RetryMax.
RetryMultiplier float64

// MaxSessionLength is the maximum duration after which the agent will
// reconnect to one of the backends.
MaxSessionLength time.Duration
}

// StatsdServerConfig contains the statsd server configuration
Expand Down
4 changes: 3 additions & 1 deletion cmd/loadit/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ var (
flagHugeEvents = flag.Bool("huge-events", false, "send 1 MB events to the backend")
flagUser = flag.String("user", agent.DefaultUser, "user to authenticate with server")
flagPassword = flag.String("password", agent.DefaultPassword, "password to authenticate with server")
flagBaseEntityName = flag.String("base-entity-name", "test-host", "base entity name to prepend with count number.")
flagBaseEntityName = flag.String("base-entity-name", "test-host", "base entity name to prepend with count number")
flagMaxSessionLength = flag.Duration("max-session-length", 0*time.Second, "maximum amount of time after which the agent will reconnect to one of the configured backends (no maximum by default)")
)

func main() {
Expand Down Expand Up @@ -92,6 +93,7 @@ func main() {
cfg.BackendHeartbeatInterval = 30
cfg.BackendHeartbeatTimeout = 300
cfg.PrometheusBinding = *flagPromBinding
cfg.MaxSessionLength = *flagMaxSessionLength

agent, err := agent.NewAgent(cfg)
if err != nil {
Expand Down

0 comments on commit ba2acff

Please sign in to comment.