Skip to content

Commit

Permalink
[FEATURE] - Configuration Rate Limiting (#1410)
Browse files Browse the repository at this point in the history
Currently the controller will attempt to reconcile all Configurations when a change is detected. In small scale deployments this is not an issue, but where there is a large number of Configurations this can cause a large number of API calls, and potentially cause rate limiting to be applied. This change adds a `--configuration-threshold <float64>` e.g to say only 10 percents of jobs can run at any one time `--configuration-threshold 0.1`.

When a Configuration is deferred due to rate limiting, the `ConditionReady` is set to false and a warning message is appended to the `Status.Conditions` field.
  • Loading branch information
gambol99 authored May 14, 2024
1 parent ec4f1ff commit 8444cae
Show file tree
Hide file tree
Showing 7 changed files with 223 additions and 0 deletions.
1 change: 1 addition & 0 deletions cmd/controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ func main() {
flags.DurationVar(&config.DriftInterval, "drift-interval", 3*time.Hour, "The minimum duration the controller will wait before triggering a drift check")
flags.DurationVar(&config.ResyncPeriod, "resync-period", 5*time.Hour, "The resync period for the controller")
flags.DurationVar(&config.RevisionExpiration, "revision-expiration", 0, "The duration a revision should be kept is not referenced or latest (zero means disabled)")
flags.Float64Var(&config.ConfigurationThreshold, "configurations-threshold", 0, "The maximum percentage of configurations that can be run at any one time")
flags.Float64Var(&config.DriftThreshold, "drift-threshold", 0.10, "The maximum percentage of configurations that can be run drift detection at any one time")
flags.IntVar(&config.APIServerPort, "apiserver-port", 10080, "The port the apiserver should be listening on")
flags.IntVar(&config.MetricsPort, "metrics-port", 9090, "The port the metric endpoint binds to")
Expand Down
2 changes: 2 additions & 0 deletions pkg/controller/configuration/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ type Controller struct {
cache *pcache.Cache
// recorder is the kubernetes event recorder
recorder record.EventRecorder
// ConfigurationThreshold is the max number of configurations we are willing to run at the same time
ConfigurationThreshold float64
// ControllerNamespace is the namespace where the runner is running
ControllerNamespace string
// BackendTemplate is the name of the secret in the controller namespace which holds a
Expand Down
50 changes: 50 additions & 0 deletions pkg/controller/configuration/ensure.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ func (c *Controller) ensureCapturedState(configuration *terraformv1alpha1.Config
cond := controller.ConditionMgr(configuration, corev1alpha1.ConditionReady, c.recorder)

return func(ctx context.Context) (reconcile.Result, error) {

// @step: retrieve a list of policies in the cluster
policies := &terraformv1alpha1.PolicyList{}
if err := c.cc.List(ctx, policies); err != nil {
Expand All @@ -77,6 +78,15 @@ func (c *Controller) ensureCapturedState(configuration *terraformv1alpha1.Config
return reconcile.Result{}, err
}

// @step: retrieve a list of all the confugrations in the cluster - shouldn't have much impact
// as it's a cached client and we defer to the cache
configurations := &terraformv1alpha1.ConfigurationList{}
if err := c.cc.List(ctx, configurations); err != nil {
cond.Failed(err, "Failed to list the configurations in cluster")

return reconcile.Result{}, err
}

// @step: retrieve a list of jobs
jobs := &batchv1.JobList{}
if err := c.cc.List(ctx, jobs, client.InNamespace(c.ControllerNamespace)); err != nil {
Expand All @@ -90,13 +100,53 @@ func (c *Controller) ensureCapturedState(configuration *terraformv1alpha1.Config
configuration.Status.ResourceStatus = terraformv1alpha1.ResourcesOutOfSync
}

state.configurations = configurations
state.jobs = jobs
state.policies = policies

return reconcile.Result{}, nil
}
}

// ensureConfigurationThreshold is responsible for ensuring we do not exceed the configuration threshold
// for the number of jobs we can run at the same time
func (c *Controller) ensureConfigurationThreshold(configuration *terraformv1alpha1.Configuration, state *state) controller.EnsureFunc {
cond := controller.ConditionMgr(configuration, corev1alpha1.ConditionReady, c.recorder)

return func(ctx context.Context) (reconcile.Result, error) {
switch {
// @step: we can skip if the threshold is not set
case c.ConfigurationThreshold == 0:
return reconcile.Result{}, nil

// @step: we can skip if we only have one configuration
case len(state.configurations.Items) == 1:
return reconcile.Result{}, nil
}

// @step: we need to calculate the number of configurations we are running
total := len(state.configurations.Items)
running := filters.Jobs(state.jobs).IsRunning()
percent := (float64(running) / float64(total)) * 100

log.WithFields(log.Fields{
"running": running,
"running_percent": percent,
"threshold": c.ConfigurationThreshold,
"total": total,
}).Debug("checking configuration is within the threshold")

// @step: if we are over the threshold, we need to wait
if percent > (c.ConfigurationThreshold * 100) {
cond.Warning("Configuration is over the threshold for running configurations, waiting in queue")

return reconcile.Result{RequeueAfter: 30 * time.Second}, nil
}

return reconcile.Result{}, nil
}
}

// ensureNoActivity is responsible for ensuring there active jobs are running for this configuration, if so we act
// safely and wait for the job to finish
func (c *Controller) ensureNoActivity(configuration *terraformv1alpha1.Configuration, state *state) controller.EnsureFunc {
Expand Down
3 changes: 3 additions & 0 deletions pkg/controller/configuration/reconcile.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ import (
type state struct {
// auth is an optional secret which is used for authentication
auth *v1.Secret
// configurations is a list of configurations in the cluster
configurations *terraformv1alpha1.ConfigurationList
// checkovConstraint is the policy constraint for this configuration
checkovConstraint *terraformv1alpha1.PolicyConstraint
// hasDrift is a flag to indicate if the configuration has drift
Expand Down Expand Up @@ -123,6 +125,7 @@ func (c *Controller) Reconcile(ctx context.Context, request reconcile.Request) (
finalizer.EnsurePresent(configuration),
c.ensureReconcileAnnotation(configuration),
c.ensureCapturedState(configuration, state),
c.ensureConfigurationThreshold(configuration, state),
c.ensureNoActivity(configuration, state),
c.ensureCostSecret(configuration),
c.ensureValueFromSecret(configuration, state),
Expand Down
147 changes: 147 additions & 0 deletions pkg/controller/configuration/reconcile_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,153 @@ var _ = Describe("Configuration Controller Default Injection", func() {
Expect(cc.Create(context.Background(), secret)).To(Succeed())
})

Context("and the configuration and the number of running jobs has exceeded the threshold", func() {
running := 3

BeforeEach(func() {
ctrl.ConfigurationThreshold = 0.2

for i := 1; i <= running; i++ {
cfg := fixtures.NewValidBucketConfiguration(namespace, fmt.Sprintf("test-%d", i))
Expect(cc.Create(context.Background(), cfg)).To(Succeed())

job := fixtures.NewRunningTerraformJob(cfg, terraformv1alpha1.StageTerraformApply)
job.Namespace = ctrl.ControllerNamespace
Expect(cc.Create(context.Background(), job)).To(Succeed())
}

result, _, rerr = controllertests.Roll(context.TODO(), ctrl, configuration, 0)
})

It("should not error", func() {
Expect(rerr).To(BeNil())
})

It("should not create a plan job", func() {
list := &batchv1.JobList{}
Expect(cc.List(context.TODO(), list, client.InNamespace(ctrl.ControllerNamespace))).ToNot(HaveOccurred())
Expect(len(list.Items)).To(Equal(running))
})

It("should not have deferred the configuration", func() {
Expect(result.Requeue).To(BeFalse())
Expect(result.RequeueAfter).To(Equal(30 * time.Second))
})

It("Should have updated the configuration status", func() {
Expect(cc.Get(context.TODO(), configuration.GetNamespacedName(), configuration)).ToNot(HaveOccurred())
cond := configuration.Status.GetCondition(corev1alpha1.ConditionReady)
Expect(cond.Status).To(Equal(metav1.ConditionFalse))
Expect(cond.Reason).To(Equal(string(corev1alpha1.ReasonWarning)))
Expect(cond.Message).To(Equal("Configuration is over the threshold for running configurations, waiting in queue"))
})
})

Context("and the configuration and the number of running jobs does not exceeded the threshold", func() {
running := 5
total := 10

BeforeEach(func() {
ctrl.ConfigurationThreshold = 0.8

// create 10 configurations, of which 5 are running
for i := 1; i <= total; i++ {
cfg := fixtures.NewValidBucketConfiguration(namespace, fmt.Sprintf("test-%d", i))
Expect(cc.Create(context.Background(), cfg)).To(Succeed())
}

for i := 1; i <= running; i++ {
cfg := fixtures.NewValidBucketConfiguration(namespace, fmt.Sprintf("test-%d", i))
job := fixtures.NewRunningTerraformJob(cfg, terraformv1alpha1.StageTerraformApply)
job.Namespace = ctrl.ControllerNamespace

Expect(cc.Create(context.Background(), job)).To(Succeed())
}

result, _, rerr = controllertests.Roll(context.TODO(), ctrl, configuration, 0)
})

It("should not error", func() {
Expect(rerr).To(BeNil())
})

It("should not create a plan job", func() {
list := &batchv1.JobList{}
Expect(cc.List(context.TODO(), list, client.InNamespace(ctrl.ControllerNamespace))).ToNot(HaveOccurred())
Expect(len(list.Items)).To(Equal(running + 1))
})

It("should not have deferred the configuration", func() {
Expect(result.Requeue).To(BeFalse())
Expect(result.RequeueAfter).To(Equal(10 * time.Second))
})

It("Should have updated the configuration status", func() {
Expect(cc.Get(context.TODO(), configuration.GetNamespacedName(), configuration)).ToNot(HaveOccurred())
cond := configuration.Status.GetCondition(terraformv1alpha1.ConditionTerraformPlan)
Expect(cond.Status).To(Equal(metav1.ConditionFalse))
Expect(cond.Reason).To(Equal(string(corev1alpha1.ReasonInProgress)))
Expect(cond.Message).To(Equal("Terraform plan is running"))
})
})

Context("and the configuration and the number of jobs does not exceeded the threshold", func() {
running := 6
stopped := 4
total := 10

BeforeEach(func() {
ctrl.ConfigurationThreshold = 0.8

// create 10 configurations, of which 5 are running
for i := 1; i <= total; i++ {
cfg := fixtures.NewValidBucketConfiguration(namespace, fmt.Sprintf("test-%d", i))
Expect(cc.Create(context.Background(), cfg)).To(Succeed())
}

for i := 1; i <= stopped; i++ {
cfg := fixtures.NewValidBucketConfiguration(namespace, fmt.Sprintf("test-%d", i))
job := fixtures.NewCompletedTerraformJob(cfg, terraformv1alpha1.StageTerraformPlan)
job.Namespace = ctrl.ControllerNamespace

Expect(cc.Create(context.Background(), job)).To(Succeed())
}

for i := 1; i <= running; i++ {
cfg := fixtures.NewValidBucketConfiguration(namespace, fmt.Sprintf("test-%d", i))
job := fixtures.NewRunningTerraformJob(cfg, terraformv1alpha1.StageTerraformApply)
job.Namespace = ctrl.ControllerNamespace

Expect(cc.Create(context.Background(), job)).To(Succeed())
}

result, _, rerr = controllertests.Roll(context.TODO(), ctrl, configuration, 0)
})

It("should not error", func() {
Expect(rerr).To(BeNil())
})

It("should not create a plan job", func() {
list := &batchv1.JobList{}
Expect(cc.List(context.TODO(), list, client.InNamespace(ctrl.ControllerNamespace))).ToNot(HaveOccurred())
Expect(len(list.Items)).To(Equal(running + stopped + 1))
})

It("should not have deferred the configuration", func() {
Expect(result.Requeue).To(BeFalse())
Expect(result.RequeueAfter).To(Equal(10 * time.Second))
})

It("Should have updated the configuration status", func() {
Expect(cc.Get(context.TODO(), configuration.GetNamespacedName(), configuration)).ToNot(HaveOccurred())
cond := configuration.Status.GetCondition(terraformv1alpha1.ConditionTerraformPlan)
Expect(cond.Status).To(Equal(metav1.ConditionFalse))
Expect(cond.Reason).To(Equal(string(corev1alpha1.ReasonInProgress)))
Expect(cond.Message).To(Equal("Terraform plan is running"))
})
})

Context("and the referenced secret does not exist", func() {
BeforeEach(func() {
secret := &v1.Secret{}
Expand Down
3 changes: 3 additions & 0 deletions pkg/server/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ type Config struct {
BackendTemplate string
// BackoffLimit is the number of times we are willing to allow a job to fail
BackoffLimit int
// ConfigurationThreshold is the max number of configurations we are willing
// to run at the same time
ConfigurationThreshold float64
// DriftControllerInterval is the interval for the controller to check for drift
DriftControllerInterval time.Duration
// DriftInterval is the minimum interval between drift checks
Expand Down
17 changes: 17 additions & 0 deletions pkg/utils/filters/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,23 @@ func Jobs(list *batchv1.JobList) *Filter {
return &Filter{list: list}
}

// IsRunning returns the number of running jobs
func (j *Filter) IsRunning() int {
list, found := j.List()
if !found {
return 0
}

running := 0
for i := 0; i < len(list.Items); i++ {
if list.Items[i].Status.Active > 0 {
running++
}
}

return running
}

// WithName filters on the configuration name
func (j *Filter) WithName(name string) *Filter {
j.name = name
Expand Down

0 comments on commit 8444cae

Please sign in to comment.