Skip to content

Commit

Permalink
controller flags for ignoreDrainFailures and drainTimeout (#307)
Browse files Browse the repository at this point in the history
  • Loading branch information
shreyas-badiger authored Oct 1, 2021
1 parent 4d1afe8 commit 995b81b
Show file tree
Hide file tree
Showing 8 changed files with 149 additions and 41 deletions.
21 changes: 11 additions & 10 deletions api/v1alpha1/rollingupgrade_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ type RollingUpgradeSpec struct {
PostDrain PostDrainSpec `json:"postDrain,omitempty"`
PostTerminate PostTerminateSpec `json:"postTerminate,omitempty"`
Strategy UpdateStrategy `json:"strategy,omitempty"`
IgnoreDrainFailures bool `json:"ignoreDrainFailures,omitempty"`
IgnoreDrainFailures *bool `json:"ignoreDrainFailures,omitempty"`
ForceRefresh bool `json:"forceRefresh,omitempty"`
ReadinessGates []NodeReadinessGate `json:"readinessGates,omitempty"`
}
Expand Down Expand Up @@ -216,7 +216,7 @@ type UpdateStrategy struct {
Type UpdateStrategyType `json:"type,omitempty"`
Mode UpdateStrategyMode `json:"mode,omitempty"`
MaxUnavailable intstr.IntOrString `json:"maxUnavailable,omitempty"`
DrainTimeout int `json:"drainTimeout"`
DrainTimeout *int `json:"drainTimeout,omitempty"`
}

func (c UpdateStrategyMode) String() string {
Expand All @@ -232,7 +232,7 @@ func (r *RollingUpgrade) ScalingGroupName() string {
return r.Spec.AsgName
}

func (r *RollingUpgrade) DrainTimeout() int {
func (r *RollingUpgrade) DrainTimeout() *int {
return r.Spec.Strategy.DrainTimeout
}

Expand Down Expand Up @@ -331,7 +331,7 @@ func (r *RollingUpgrade) IsForceRefresh() bool {
return r.Spec.ForceRefresh
}

func (r *RollingUpgrade) IsIgnoreDrainFailures() bool {
func (r *RollingUpgrade) IsIgnoreDrainFailures() *bool {
return r.Spec.IgnoreDrainFailures
}
func (r *RollingUpgrade) StrategyMode() UpdateStrategyMode {
Expand Down Expand Up @@ -372,12 +372,13 @@ func (r *RollingUpgrade) Validate() (bool, error) {
}

// validating the DrainTimeout value
if strategy.DrainTimeout == 0 {
r.Spec.Strategy.DrainTimeout = -1
} else if strategy.DrainTimeout < -1 {
err := fmt.Errorf("%s: Invalid value for startegy DrainTimeout - %d", r.Name, strategy.MaxUnavailable.IntVal)
return false, err
if strategy.DrainTimeout != nil {
if *strategy.DrainTimeout == 0 {
*r.Spec.Strategy.DrainTimeout = -1
} else if *strategy.DrainTimeout < -1 {
err := fmt.Errorf("%s: Invalid value for startegy DrainTimeout - %d", r.Name, strategy.MaxUnavailable.IntVal)
return false, err
}
}

return true, nil
}
12 changes: 11 additions & 1 deletion api/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 0 additions & 2 deletions config/crd/bases/upgrademgr.keikoproj.io_rollingupgrades.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,6 @@ spec:
type: string
type:
type: string
required:
- drainTimeout
type: object
type: object
status:
Expand Down
3 changes: 1 addition & 2 deletions controllers/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,7 @@ func createRollingUpgrade() *v1alpha1.RollingUpgrade {
AsgName: "mock-asg-1",
PostDrainDelaySeconds: 30,
Strategy: v1alpha1.UpdateStrategy{
Type: v1alpha1.RandomUpdateStrategy,
DrainTimeout: 30,
Type: v1alpha1.RandomUpdateStrategy,
},
},
}
Expand Down
27 changes: 16 additions & 11 deletions controllers/rollingupgrade_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,17 +42,19 @@ import (
type RollingUpgradeReconciler struct {
client.Client
logr.Logger
Scheme *runtime.Scheme
AdmissionMap sync.Map
CacheConfig *cache.Config
EventWriter *kubeprovider.EventWriter
maxParallel int
ScriptRunner ScriptRunner
Auth *RollingUpgradeAuthenticator
DrainGroupMapper *sync.Map
DrainErrorMapper *sync.Map
ClusterNodesMap *sync.Map
ReconcileMap *sync.Map
Scheme *runtime.Scheme
AdmissionMap sync.Map
CacheConfig *cache.Config
EventWriter *kubeprovider.EventWriter
maxParallel int
ScriptRunner ScriptRunner
Auth *RollingUpgradeAuthenticator
DrainGroupMapper *sync.Map
DrainErrorMapper *sync.Map
ClusterNodesMap *sync.Map
ReconcileMap *sync.Map
DrainTimeout int
IgnoreDrainFailures bool
}

// RollingUpgradeAuthenticator has the clients for providers
Expand Down Expand Up @@ -166,6 +168,9 @@ func (r *RollingUpgradeReconciler) Reconcile(ctx context.Context, req ctrl.Reque
c.ClusterNodes = r.getClusterNodes()
return c
}(),

DrainTimeout: r.DrainTimeout,
IgnoreDrainFailures: r.IgnoreDrainFailures,
}

// process node rotation
Expand Down
36 changes: 28 additions & 8 deletions controllers/upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,14 @@ type DrainManager struct {

type RollingUpgradeContext struct {
logr.Logger
ScriptRunner ScriptRunner
Auth *RollingUpgradeAuthenticator
Cloud *DiscoveredState
RollingUpgrade *v1alpha1.RollingUpgrade
DrainManager *DrainManager
metricsMutex *sync.Mutex
ScriptRunner ScriptRunner
Auth *RollingUpgradeAuthenticator
Cloud *DiscoveredState
RollingUpgrade *v1alpha1.RollingUpgrade
DrainManager *DrainManager
metricsMutex *sync.Mutex
DrainTimeout int
IgnoreDrainFailures bool
}

func (r *RollingUpgradeContext) RotateNodes() error {
Expand Down Expand Up @@ -249,6 +251,24 @@ func (r *RollingUpgradeContext) ReplaceNodeBatch(batch []*autoscaling.Instance)
)
r.DrainManager.DrainGroup.Add(1)

// Determine IgnoreDrainFailure and DrainTimeout values. CR spec takes the precedence.
var (
drainTimeout int
ignoreDrainFailures bool
)
if r.RollingUpgrade.DrainTimeout() == nil {
drainTimeout = r.DrainTimeout
} else {
drainTimeout = *r.RollingUpgrade.DrainTimeout()
}

if r.RollingUpgrade.IsIgnoreDrainFailures() == nil {
ignoreDrainFailures = r.IgnoreDrainFailures
} else {
ignoreDrainFailures = *r.RollingUpgrade.IsIgnoreDrainFailures()
}

// Drain the nodes in parallel
go func() {
defer r.DrainManager.DrainGroup.Done()

Expand All @@ -267,9 +287,9 @@ func (r *RollingUpgradeContext) ReplaceNodeBatch(batch []*autoscaling.Instance)
// Turns onto NodeRotationDrain
r.NodeStep(inProcessingNodes, nodeSteps, r.RollingUpgrade.Spec.AsgName, nodeName, v1alpha1.NodeRotationDrain)

if err := r.Auth.DrainNode(node, time.Duration(r.RollingUpgrade.PostDrainDelaySeconds()), r.RollingUpgrade.DrainTimeout(), r.Auth.Kubernetes); err != nil {
if err := r.Auth.DrainNode(node, time.Duration(r.RollingUpgrade.PostDrainDelaySeconds()), drainTimeout, r.Auth.Kubernetes); err != nil {
// ignore drain failures if either of spec or controller args have set ignoreDrainFailures to true.
if !r.RollingUpgrade.IsIgnoreDrainFailures() {
if !ignoreDrainFailures {
r.DrainManager.DrainErrors <- errors.Errorf("DrainNode failed: instanceID - %v, %v", instanceID, err.Error())
return
}
Expand Down
75 changes: 72 additions & 3 deletions controllers/upgrade_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func TestDrainNode(t *testing.T) {
err := rollupCtx.Auth.DrainNode(
test.Node,
time.Duration(rollupCtx.RollingUpgrade.PostDrainDelaySeconds()),
rollupCtx.RollingUpgrade.DrainTimeout(),
900,
rollupCtx.Auth.Kubernetes,
)
if (test.ExpectError && err == nil) || (!test.ExpectError && err != nil) {
Expand Down Expand Up @@ -112,7 +112,7 @@ func TestRunCordonOrUncordon(t *testing.T) {
Out: os.Stdout,
ErrOut: os.Stdout,
DeleteEmptyDirData: true,
Timeout: time.Duration(rollupCtx.RollingUpgrade.Spec.Strategy.DrainTimeout) * time.Second,
Timeout: 900,
}
err := drain.RunCordonOrUncordon(helper, test.Node, test.Cordon)
if (test.ExpectError && err == nil) || (!test.ExpectError && err != nil) {
Expand Down Expand Up @@ -163,7 +163,7 @@ func TestRunDrainNode(t *testing.T) {
Out: os.Stdout,
ErrOut: os.Stdout,
DeleteEmptyDirData: true,
Timeout: time.Duration(rollupCtx.RollingUpgrade.Spec.Strategy.DrainTimeout) * time.Second,
Timeout: 900,
}
err := drain.RunNodeDrain(helper, test.Node.Name)
if (test.ExpectError && err == nil) || (!test.ExpectError && err != nil) {
Expand Down Expand Up @@ -448,3 +448,72 @@ func TestSetBatchStandBy(t *testing.T) {
}
}
}

func TestIgnoreDrainFailuresAndDrainTimeout(t *testing.T) {
var tests = []struct {
TestDescription string
Reconciler *RollingUpgradeReconciler
RollingUpgrade *v1alpha1.RollingUpgrade
AsgClient *MockAutoscalingGroup
ClusterNodes []*corev1.Node
ExpectedStatusValue string
}{
{
"CR spec has IgnoreDrainFailures as nil, so default false should be considered",
createRollingUpgradeReconciler(t),
createRollingUpgrade(),
createASGClient(),
createNodeSlice(),
v1alpha1.StatusComplete,
},
{
"CR spec has IgnoreDrainFailures as true, so default false should not be considered",
createRollingUpgradeReconciler(t),
func() *v1alpha1.RollingUpgrade {
rollingUpgrade := createRollingUpgrade()
ignoreDrainFailuresValue := true
rollingUpgrade.Spec.IgnoreDrainFailures = &ignoreDrainFailuresValue
return rollingUpgrade
}(),
createASGClient(),
createNodeSlice(),
v1alpha1.StatusComplete,
},
{
"CR spec has DrainTimeout as nil, so default value of 900 should be considered",
createRollingUpgradeReconciler(t),
createRollingUpgrade(),
createASGClient(),
createNodeSlice(),
v1alpha1.StatusComplete,
},
{
"CR spec has DrainTimeout as 1800, so default value of 900 should not be considered",
createRollingUpgradeReconciler(t),
func() *v1alpha1.RollingUpgrade {
rollingUpgrade := createRollingUpgrade()
drainTimeoutValue := 1800
rollingUpgrade.Spec.Strategy.DrainTimeout = &drainTimeoutValue
return rollingUpgrade
}(),
createASGClient(),
createNodeSlice(),
v1alpha1.StatusComplete,
},
}
for _, test := range tests {
rollupCtx := createRollingUpgradeContext(test.Reconciler)
rollupCtx.RollingUpgrade = test.RollingUpgrade
rollupCtx.Cloud.ScalingGroups = test.AsgClient.autoScalingGroups
rollupCtx.Cloud.ClusterNodes = test.ClusterNodes
rollupCtx.Auth.AmazonClientSet.AsgClient = test.AsgClient

err := rollupCtx.RotateNodes()
if err != nil {
t.Errorf("Test Description: %s \n error: %v", test.TestDescription, err)
}
if rollupCtx.RollingUpgrade.CurrentStatus() != test.ExpectedStatusValue {
t.Errorf("Test Description: %s \n expected value: %s, actual value: %s", test.TestDescription, test.ExpectedStatusValue, rollupCtx.RollingUpgrade.CurrentStatus())
}
}
}
14 changes: 10 additions & 4 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ func main() {
maxAPIRetries int
debugMode bool
logMode string
drainTimeout int
ignoreDrainFailures bool
)

flag.BoolVar(&debugMode, "debug", false, "enable debug logging")
Expand All @@ -96,6 +98,8 @@ func main() {
flag.StringVar(&namespace, "namespace", "", "The namespace in which to watch objects")
flag.IntVar(&maxParallel, "max-parallel", 10, "The max number of parallel rolling upgrades")
flag.IntVar(&maxAPIRetries, "max-api-retries", 12, "The number of maximum retries for failed/rate limited AWS API calls")
flag.IntVar(&drainTimeout, "drain-timeout", 900, "when the drain command should timeout")
flag.BoolVar(&ignoreDrainFailures, "ignore-drain-failures", false, "proceed with instance termination despite drain failures.")

opts := zap.Options{
Development: true,
Expand Down Expand Up @@ -197,10 +201,12 @@ func main() {
ScriptRunner: controllers.ScriptRunner{
Logger: logger,
},
DrainGroupMapper: &sync.Map{},
DrainErrorMapper: &sync.Map{},
ClusterNodesMap: &sync.Map{},
ReconcileMap: &sync.Map{},
DrainGroupMapper: &sync.Map{},
DrainErrorMapper: &sync.Map{},
ClusterNodesMap: &sync.Map{},
ReconcileMap: &sync.Map{},
DrainTimeout: drainTimeout,
IgnoreDrainFailures: ignoreDrainFailures,
}

reconciler.SetMaxParallel(maxParallel)
Expand Down

0 comments on commit 995b81b

Please sign in to comment.