From b664fdd65177824003e528bd388c46f6398a3fc6 Mon Sep 17 00:00:00 2001 From: Shreyas Badiger <7680410+shreyas-badiger@users.noreply.github.com> Date: Thu, 13 May 2021 14:15:08 -0700 Subject: [PATCH] Create RollingUpgradeContext (#234) * #2285: rollup CR statistic metrics in v2 (#218) * #2285: rollup CR statistic metrics in v2 Signed-off-by: sbadla1 * #2285: updated metric flags Signed-off-by: sbadla1 * #2285: updated metric flags Signed-off-by: sbadla1 Signed-off-by: sbadiger * log cloud discovery failure Signed-off-by: sbadiger * Create RollingUpgrade Context Signed-off-by: sbadiger * rollingupgrade context Signed-off-by: sbadiger Co-authored-by: Sahil Badla --- controllers/helpers_test.go | 12 +- controllers/rollingupgrade_controller.go | 30 ++-- controllers/upgrade.go | 182 ++++++++++++----------- controllers/upgrade_test.go | 95 +++++------- 4 files changed, 163 insertions(+), 156 deletions(-) diff --git a/controllers/helpers_test.go b/controllers/helpers_test.go index 948e6f91..7e0335c4 100644 --- a/controllers/helpers_test.go +++ b/controllers/helpers_test.go @@ -48,12 +48,22 @@ func createRollingUpgradeReconciler(t *testing.T) *RollingUpgradeReconciler { ScriptRunner: ScriptRunner{ Logger: logger, }, - Cloud: NewDiscoveredState(auth, logger), } return reconciler } +func createRollingUpgradeContext(r *RollingUpgradeReconciler) *RollingUpgradeContext { + return &RollingUpgradeContext{ + Logger: r.Logger, + Auth: r.Auth, + ScriptRunner: r.ScriptRunner, + Cloud: NewDiscoveredState(r.Auth, r.Logger), + RollingUpgrade: createRollingUpgrade(), + } + +} + func createRollingUpgrade() *v1alpha1.RollingUpgrade { return &v1alpha1.RollingUpgrade{ ObjectMeta: metav1.ObjectMeta{Name: "0", Namespace: "default"}, diff --git a/controllers/rollingupgrade_controller.go b/controllers/rollingupgrade_controller.go index 84237abd..93505bec 100644 --- a/controllers/rollingupgrade_controller.go +++ b/controllers/rollingupgrade_controller.go @@ -40,16 +40,13 @@ import ( type RollingUpgradeReconciler struct { client.Client logr.Logger - Scheme *runtime.Scheme - AdmissionMap sync.Map - CacheConfig *cache.Config - Auth *RollingUpgradeAuthenticator - Cloud *DiscoveredState - EventWriter *kubeprovider.EventWriter - maxParallel int - ScriptRunner ScriptRunner - DrainGroupMapper sync.Map - DrainErrorMapper sync.Map + Scheme *runtime.Scheme + AdmissionMap sync.Map + CacheConfig *cache.Config + EventWriter *kubeprovider.EventWriter + maxParallel int + ScriptRunner ScriptRunner + Auth *RollingUpgradeAuthenticator } type RollingUpgradeAuthenticator struct { @@ -129,8 +126,15 @@ func (r *RollingUpgradeReconciler) Reconcile(ctx context.Context, req ctrl.Reque rollingUpgrade.SetCurrentStatus(v1alpha1.StatusInit) common.SetMetricRollupInitOrRunning(rollingUpgrade.Name) - r.Cloud = NewDiscoveredState(r.Auth, r.Logger) - if err := r.Cloud.Discover(); err != nil { + rollupCtx := &RollingUpgradeContext{ + Logger: r.Logger, + Auth: r.Auth, + ScriptRunner: r.ScriptRunner, + RollingUpgrade: rollingUpgrade, + } + rollupCtx.Cloud = NewDiscoveredState(rollupCtx.Auth, rollupCtx.Logger) + if err := rollupCtx.Cloud.Discover(); err != nil { + r.Info("failed to discover the cloud", "name", rollingUpgrade.NamespacedName(), "scalingGroup", scalingGroupName) rollingUpgrade.SetCurrentStatus(v1alpha1.StatusError) // Set prometheus metric cr_status_failed common.SetMetricRollupFailed(rollingUpgrade.Name) @@ -138,7 +142,7 @@ func (r *RollingUpgradeReconciler) Reconcile(ctx context.Context, req ctrl.Reque } // process node rotation - if err := r.RotateNodes(rollingUpgrade); err != nil { + if err := rollupCtx.RotateNodes(); err != nil { rollingUpgrade.SetCurrentStatus(v1alpha1.StatusError) // Set prometheus metric cr_status_failed common.SetMetricRollupFailed(rollingUpgrade.Name) diff --git a/controllers/upgrade.go b/controllers/upgrade.go index d6586e36..6efa3acb 100644 --- a/controllers/upgrade.go +++ b/controllers/upgrade.go @@ -24,6 +24,7 @@ import ( "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/service/autoscaling" + "github.com/go-logr/logr" "github.com/keikoproj/upgrade-manager/api/v1alpha1" "github.com/keikoproj/upgrade-manager/controllers/common" awsprovider "github.com/keikoproj/upgrade-manager/controllers/providers/aws" @@ -45,70 +46,80 @@ type DrainManager struct { DrainGroup *sync.WaitGroup `json:"-"` } +type RollingUpgradeContext struct { + logr.Logger + ScriptRunner ScriptRunner + Auth *RollingUpgradeAuthenticator + Cloud *DiscoveredState + DrainGroupMapper sync.Map + DrainErrorMapper sync.Map + RollingUpgrade *v1alpha1.RollingUpgrade +} + // TODO: main node rotation logic -func (r *RollingUpgradeReconciler) RotateNodes(rollingUpgrade *v1alpha1.RollingUpgrade) error { +func (r *RollingUpgradeContext) RotateNodes() error { var ( - lastTerminationTime = rollingUpgrade.LastNodeTerminationTime() - nodeInterval = rollingUpgrade.NodeIntervalSeconds() - lastDrainTime = rollingUpgrade.LastNodeDrainTime() - drainInterval = rollingUpgrade.PostDrainDelaySeconds() + lastTerminationTime = r.RollingUpgrade.LastNodeTerminationTime() + nodeInterval = r.RollingUpgrade.NodeIntervalSeconds() + lastDrainTime = r.RollingUpgrade.LastNodeDrainTime() + drainInterval = r.RollingUpgrade.PostDrainDelaySeconds() ) - rollingUpgrade.SetCurrentStatus(v1alpha1.StatusRunning) - common.SetMetricRollupInitOrRunning(rollingUpgrade.Name) + r.RollingUpgrade.SetCurrentStatus(v1alpha1.StatusRunning) + common.SetRollupInitOrRunningStatus(r.RollingUpgrade.Name) // set status start time - if rollingUpgrade.StartTime() == "" { - rollingUpgrade.SetStartTime(time.Now().Format(time.RFC3339)) + if r.RollingUpgrade.StartTime() == "" { + r.RollingUpgrade.SetStartTime(time.Now().Format(time.RFC3339)) } if !lastTerminationTime.IsZero() || !lastDrainTime.IsZero() { // Check if we are still waiting on a termination delay if time.Since(lastTerminationTime.Time).Seconds() < float64(nodeInterval) { - r.Info("reconcile requeue due to termination interval wait", "name", rollingUpgrade.NamespacedName()) + r.Info("reconcile requeue due to termination interval wait", "name", r.RollingUpgrade.NamespacedName()) return nil } // Check if we are still waiting on a drain delay if time.Since(lastDrainTime.Time).Seconds() < float64(drainInterval) { - r.Info("reconcile requeue due to drain interval wait", "name", rollingUpgrade.NamespacedName()) + r.Info("reconcile requeue due to drain interval wait", "name", r.RollingUpgrade.NamespacedName()) return nil } } var ( - scalingGroup = awsprovider.SelectScalingGroup(rollingUpgrade.ScalingGroupName(), r.Cloud.ScalingGroups) + scalingGroup = awsprovider.SelectScalingGroup(r.RollingUpgrade.ScalingGroupName(), r.Cloud.ScalingGroups) ) - rollingUpgrade.SetTotalNodes(len(scalingGroup.Instances)) + r.RollingUpgrade.SetTotalNodes(len(scalingGroup.Instances)) // check if all instances are rotated. - if !r.IsScalingGroupDrifted(rollingUpgrade) { - rollingUpgrade.SetCurrentStatus(v1alpha1.StatusComplete) + if !r.IsScalingGroupDrifted() { + r.RollingUpgrade.SetCurrentStatus(v1alpha1.StatusComplete) // Set prometheus metric cr_status_completed - common.SetMetricRollupCompleted(rollingUpgrade.Name) + common.SetRollupCompletedStatus(r.RollingUpgrade.Name) return nil } - rotationTargets := r.SelectTargets(rollingUpgrade, scalingGroup) - if ok, err := r.ReplaceNodeBatch(rollingUpgrade, rotationTargets); !ok { + rotationTargets := r.SelectTargets(scalingGroup) + if ok, err := r.ReplaceNodeBatch(rotationTargets); !ok { return err } return nil } -func (r *RollingUpgradeReconciler) ReplaceNodeBatch(rollingUpgrade *v1alpha1.RollingUpgrade, batch []*autoscaling.Instance) (bool, error) { +func (r *RollingUpgradeContext) ReplaceNodeBatch(batch []*autoscaling.Instance) (bool, error) { var ( - mode = rollingUpgrade.StrategyMode() + mode = r.RollingUpgrade.StrategyMode() drainManager = &DrainManager{} ) - r.Info("rotating batch", "instances", awsprovider.GetInstanceIDs(batch), "name", rollingUpgrade.NamespacedName()) + r.Info("rotating batch", "instances", awsprovider.GetInstanceIDs(batch), "name", r.RollingUpgrade.NamespacedName()) // load the appropriate waitGroup and Error channel for the DrainManager from reconciler object - drainGroup, _ := r.DrainGroupMapper.LoadOrStore(rollingUpgrade.NamespacedName(), &sync.WaitGroup{}) - drainErrs, _ := r.DrainErrorMapper.LoadOrStore(rollingUpgrade.NamespacedName(), make(chan error)) + drainGroup, _ := r.DrainGroupMapper.LoadOrStore(r.RollingUpgrade.NamespacedName(), &sync.WaitGroup{}) + drainErrs, _ := r.DrainErrorMapper.LoadOrStore(r.RollingUpgrade.NamespacedName(), make(chan error)) drainManager = &DrainManager{ DrainErrors: drainErrs.(chan error), DrainGroup: drainGroup.(*sync.WaitGroup), @@ -128,31 +139,31 @@ func (r *RollingUpgradeReconciler) ReplaceNodeBatch(rollingUpgrade *v1alpha1.Rol nodeName = node.GetName() ) //Add statistics - rollingUpgrade.Status.NodeStep(inProcessingNodes, nodeSteps, rollingUpgrade.Spec.AsgName, nodeName, v1alpha1.NodeRotationKickoff) + r.RollingUpgrade.Status.NodeStep(inProcessingNodes, nodeSteps, r.RollingUpgrade.Spec.AsgName, nodeName, v1alpha1.NodeRotationKickoff) // Add in-progress tag if err := r.Auth.TagEC2instance(instanceID, instanceStateTagKey, inProgressTagValue); err != nil { - r.Error(err, "failed to set instance tag", "name", rollingUpgrade.NamespacedName(), "instance", instanceID) + r.Error(err, "failed to set instance tag", "name", r.RollingUpgrade.NamespacedName(), "instance", instanceID) return false, err } // Standby if aws.StringValue(target.LifecycleState) == autoscaling.LifecycleStateInService { - r.Info("setting instance to stand-by", "instance", instanceID, "name", rollingUpgrade.NamespacedName()) - if err := r.Auth.SetInstanceStandBy(target, rollingUpgrade.Spec.AsgName); err != nil { + r.Info("setting instance to stand-by", "instance", instanceID, "name", r.RollingUpgrade.NamespacedName()) + if err := r.Auth.SetInstanceStandBy(target, r.RollingUpgrade.Spec.AsgName); err != nil { // failure to set instance to standby are retryable - r.Info("failed to set instance to stand-by", "instance", instanceID, "message", err.Error(), "name", rollingUpgrade.NamespacedName()) + r.Info("failed to set instance to stand-by", "instance", instanceID, "message", err.Error(), "name", r.RollingUpgrade.NamespacedName()) return true, nil } } // Turns onto desired nodes - rollingUpgrade.Status.NodeStep(inProcessingNodes, nodeSteps, rollingUpgrade.Spec.AsgName, nodeName, v1alpha1.NodeRotationDesiredNodeReady) + r.RollingUpgrade.Status.NodeStep(inProcessingNodes, nodeSteps, r.RollingUpgrade.Spec.AsgName, nodeName, v1alpha1.NodeRotationDesiredNodeReady) // Wait for desired nodes - r.Info("waiting for desired nodes", "name", rollingUpgrade.NamespacedName()) - if !r.DesiredNodesReady(rollingUpgrade) { - r.Info("new node is yet to join the cluster", "name", rollingUpgrade.NamespacedName()) + r.Info("waiting for desired nodes", "name", r.RollingUpgrade.NamespacedName()) + if !r.DesiredNodesReady() { + r.Info("new node is yet to join the cluster", "name", r.RollingUpgrade.NamespacedName()) return true, nil } } @@ -165,11 +176,11 @@ func (r *RollingUpgradeReconciler) ReplaceNodeBatch(rollingUpgrade *v1alpha1.Rol nodeName = node.GetName() ) //Add statistics - rollingUpgrade.Status.NodeStep(inProcessingNodes, nodeSteps, rollingUpgrade.Spec.AsgName, nodeName, v1alpha1.NodeRotationKickoff) + r.RollingUpgrade.Status.NodeStep(inProcessingNodes, nodeSteps, r.RollingUpgrade.Spec.AsgName, nodeName, v1alpha1.NodeRotationKickoff) // Add in-progress tag if err := r.Auth.TagEC2instance(instanceID, instanceStateTagKey, inProgressTagValue); err != nil { - r.Error(err, "failed to set instance tag", "name", rollingUpgrade.NamespacedName(), "instance", instanceID) + r.Error(err, "failed to set instance tag", "name", r.RollingUpgrade.NamespacedName(), "instance", instanceID) return false, err } } @@ -184,7 +195,7 @@ func (r *RollingUpgradeReconciler) ReplaceNodeBatch(rollingUpgrade *v1alpha1.Rol scriptTarget = ScriptTarget{ InstanceID: instanceID, NodeName: nodeName, - UpgradeObject: rollingUpgrade, + UpgradeObject: r.RollingUpgrade, } ) drainManager.DrainGroup.Add(1) @@ -193,7 +204,7 @@ func (r *RollingUpgradeReconciler) ReplaceNodeBatch(rollingUpgrade *v1alpha1.Rol defer drainManager.DrainGroup.Done() // Turns onto PreDrain script - rollingUpgrade.Status.NodeStep(inProcessingNodes, nodeSteps, rollingUpgrade.Spec.AsgName, nodeName, v1alpha1.NodeRotationPredrainScript) + r.RollingUpgrade.Status.NodeStep(inProcessingNodes, nodeSteps, r.RollingUpgrade.Spec.AsgName, nodeName, v1alpha1.NodeRotationPredrainScript) // Predrain script if err := r.ScriptRunner.PreDrain(scriptTarget); err != nil { @@ -202,18 +213,18 @@ func (r *RollingUpgradeReconciler) ReplaceNodeBatch(rollingUpgrade *v1alpha1.Rol // Issue drain concurrently - set lastDrainTime if node := kubeprovider.SelectNodeByInstanceID(instanceID, r.Cloud.ClusterNodes); !reflect.DeepEqual(node, corev1.Node{}) { - r.Info("draining the node", "instance", instanceID, "node name", node.Name, "name", rollingUpgrade.NamespacedName()) + r.Info("draining the node", "instance", instanceID, "node name", node.Name, "name", r.RollingUpgrade.NamespacedName()) // Turns onto NodeRotationDrain - rollingUpgrade.Status.NodeStep(inProcessingNodes, nodeSteps, rollingUpgrade.Spec.AsgName, nodeName, v1alpha1.NodeRotationDrain) + r.RollingUpgrade.Status.NodeStep(inProcessingNodes, nodeSteps, r.RollingUpgrade.Spec.AsgName, nodeName, v1alpha1.NodeRotationDrain) - if err := r.Auth.DrainNode(&node, time.Duration(rollingUpgrade.PostDrainDelaySeconds()), rollingUpgrade.DrainTimeout(), r.Auth.Kubernetes); err != nil { + if err := r.Auth.DrainNode(&node, time.Duration(r.RollingUpgrade.PostDrainDelaySeconds()), r.RollingUpgrade.DrainTimeout(), r.Auth.Kubernetes); err != nil { drainManager.DrainErrors <- errors.Errorf("DrainNode failed: instanceID - %v, %v", instanceID, err.Error()) } } // Turns onto NodeRotationPostdrainScript - rollingUpgrade.Status.NodeStep(inProcessingNodes, nodeSteps, rollingUpgrade.Spec.AsgName, nodeName, v1alpha1.NodeRotationPostdrainScript) + r.RollingUpgrade.Status.NodeStep(inProcessingNodes, nodeSteps, r.RollingUpgrade.Spec.AsgName, nodeName, v1alpha1.NodeRotationPostdrainScript) // post drain script if err := r.ScriptRunner.PostDrain(scriptTarget); err != nil { @@ -221,7 +232,7 @@ func (r *RollingUpgradeReconciler) ReplaceNodeBatch(rollingUpgrade *v1alpha1.Rol } // Turns onto NodeRotationPostWait - rollingUpgrade.Status.NodeStep(inProcessingNodes, nodeSteps, rollingUpgrade.Spec.AsgName, nodeName, v1alpha1.NodeRotationPostWait) + r.RollingUpgrade.Status.NodeStep(inProcessingNodes, nodeSteps, r.RollingUpgrade.Spec.AsgName, nodeName, v1alpha1.NodeRotationPostWait) // Post Wait Script if err := r.ScriptRunner.PostWait(scriptTarget); err != nil { @@ -239,16 +250,16 @@ func (r *RollingUpgradeReconciler) ReplaceNodeBatch(rollingUpgrade *v1alpha1.Rol select { case err := <-drainManager.DrainErrors: - rollingUpgrade.Status.UpdateStatistics(nodeSteps) - rollingUpgrade.Status.UpdateLastBatchNodes(inProcessingNodes) + r.RollingUpgrade.Status.UpdateStatistics(nodeSteps) + r.RollingUpgrade.Status.UpdateLastBatchNodes(inProcessingNodes) - r.Error(err, "failed to rotate the node", "name", rollingUpgrade.NamespacedName()) + r.Error(err, "failed to rotate the node", "name", r.RollingUpgrade.NamespacedName()) return false, err case <-timeout: // goroutines completed, terminate and requeue - rollingUpgrade.SetLastNodeDrainTime(metav1.Time{Time: time.Now()}) - r.Info("instances drained successfully, terminating", "name", rollingUpgrade.NamespacedName()) + r.RollingUpgrade.SetLastNodeDrainTime(metav1.Time{Time: time.Now()}) + r.Info("instances drained successfully, terminating", "name", r.RollingUpgrade.NamespacedName()) for _, target := range batch { var ( instanceID = aws.StringValue(target.InstanceId) @@ -257,25 +268,25 @@ func (r *RollingUpgradeReconciler) ReplaceNodeBatch(rollingUpgrade *v1alpha1.Rol scriptTarget = ScriptTarget{ InstanceID: instanceID, NodeName: nodeName, - UpgradeObject: rollingUpgrade, + UpgradeObject: r.RollingUpgrade, } ) // Turns onto NodeRotationTerminate - rollingUpgrade.Status.NodeStep(inProcessingNodes, nodeSteps, rollingUpgrade.Spec.AsgName, nodeName, v1alpha1.NodeRotationTerminate) + r.RollingUpgrade.Status.NodeStep(inProcessingNodes, nodeSteps, r.RollingUpgrade.Spec.AsgName, nodeName, v1alpha1.NodeRotationTerminate) // Terminate - set lastTerminateTime - r.Info("terminating instance", "instance", instanceID, "name", rollingUpgrade.NamespacedName()) + r.Info("terminating instance", "instance", instanceID, "name", r.RollingUpgrade.NamespacedName()) if err := r.Auth.TerminateInstance(target); err != nil { // terminate failures are retryable - r.Info("failed to terminate instance", "instance", instanceID, "message", err.Error(), "name", rollingUpgrade.NamespacedName()) + r.Info("failed to terminate instance", "instance", instanceID, "message", err.Error(), "name", r.RollingUpgrade.NamespacedName()) return true, nil } - rollingUpgrade.SetLastNodeTerminationTime(metav1.Time{Time: time.Now()}) + r.RollingUpgrade.SetLastNodeTerminationTime(metav1.Time{Time: time.Now()}) // Turns onto NodeRotationTerminate - rollingUpgrade.Status.NodeStep(inProcessingNodes, nodeSteps, rollingUpgrade.Spec.AsgName, nodeName, v1alpha1.NodeRotationPostTerminate) + r.RollingUpgrade.Status.NodeStep(inProcessingNodes, nodeSteps, r.RollingUpgrade.Spec.AsgName, nodeName, v1alpha1.NodeRotationPostTerminate) // Post Terminate Script if err := r.ScriptRunner.PostTerminate(scriptTarget); err != nil { @@ -283,27 +294,27 @@ func (r *RollingUpgradeReconciler) ReplaceNodeBatch(rollingUpgrade *v1alpha1.Rol } // Turns onto NodeRotationCompleted - rollingUpgrade.Status.NodeStep(inProcessingNodes, nodeSteps, rollingUpgrade.Spec.AsgName, nodeName, v1alpha1.NodeRotationCompleted) + r.RollingUpgrade.Status.NodeStep(inProcessingNodes, nodeSteps, r.RollingUpgrade.Spec.AsgName, nodeName, v1alpha1.NodeRotationCompleted) } - rollingUpgrade.Status.UpdateStatistics(nodeSteps) - rollingUpgrade.Status.UpdateLastBatchNodes(inProcessingNodes) + r.RollingUpgrade.Status.UpdateStatistics(nodeSteps) + r.RollingUpgrade.Status.UpdateLastBatchNodes(inProcessingNodes) case <-time.After(DefaultWaitGroupTimeout): // goroutines timed out - requeue - rollingUpgrade.Status.UpdateStatistics(nodeSteps) - rollingUpgrade.Status.UpdateLastBatchNodes(inProcessingNodes) + r.RollingUpgrade.Status.UpdateStatistics(nodeSteps) + r.RollingUpgrade.Status.UpdateLastBatchNodes(inProcessingNodes) - r.Info("instances are still draining", "name", rollingUpgrade.NamespacedName()) + r.Info("instances are still draining", "name", r.RollingUpgrade.NamespacedName()) return true, nil } return true, nil } -func (r *RollingUpgradeReconciler) SelectTargets(rollingUpgrade *v1alpha1.RollingUpgrade, scalingGroup *autoscaling.Group) []*autoscaling.Instance { +func (r *RollingUpgradeContext) SelectTargets(scalingGroup *autoscaling.Group) []*autoscaling.Instance { var ( - batchSize = rollingUpgrade.MaxUnavailable() + batchSize = r.RollingUpgrade.MaxUnavailable() totalNodes = len(scalingGroup.Instances) targets = make([]*autoscaling.Instance, 0) ) @@ -330,9 +341,9 @@ func (r *RollingUpgradeReconciler) SelectTargets(rollingUpgrade *v1alpha1.Rollin } // select via strategy if there are no in-progress instances - if rollingUpgrade.UpdateStrategyType() == v1alpha1.RandomUpdateStrategy { + if r.RollingUpgrade.UpdateStrategyType() == v1alpha1.RandomUpdateStrategy { for _, instance := range scalingGroup.Instances { - if r.IsInstanceDrifted(rollingUpgrade, instance) { + if r.IsInstanceDrifted(instance) { targets = append(targets, instance) } } @@ -341,9 +352,9 @@ func (r *RollingUpgradeReconciler) SelectTargets(rollingUpgrade *v1alpha1.Rollin } return targets[:unavailableInt] - } else if rollingUpgrade.UpdateStrategyType() == v1alpha1.UniformAcrossAzUpdateStrategy { + } else if r.RollingUpgrade.UpdateStrategyType() == v1alpha1.UniformAcrossAzUpdateStrategy { for _, instance := range scalingGroup.Instances { - if r.IsInstanceDrifted(rollingUpgrade, instance) { + if r.IsInstanceDrifted(instance) { targets = append(targets, instance) } } @@ -367,10 +378,10 @@ func (r *RollingUpgradeReconciler) SelectTargets(rollingUpgrade *v1alpha1.Rollin return targets } -func (r *RollingUpgradeReconciler) IsInstanceDrifted(rollingUpgrade *v1alpha1.RollingUpgrade, instance *autoscaling.Instance) bool { +func (r *RollingUpgradeContext) IsInstanceDrifted(instance *autoscaling.Instance) bool { var ( - scalingGroupName = rollingUpgrade.ScalingGroupName() + scalingGroupName = r.RollingUpgrade.ScalingGroupName() scalingGroup = awsprovider.SelectScalingGroup(scalingGroupName, r.Cloud.ScalingGroups) instanceID = aws.StringValue(instance.InstanceId) ) @@ -380,32 +391,32 @@ func (r *RollingUpgradeReconciler) IsInstanceDrifted(rollingUpgrade *v1alpha1.Ro return false } // check if there is atleast one node that meets the force-referesh criteria - if rollingUpgrade.IsForceRefresh() { + if r.RollingUpgrade.IsForceRefresh() { var ( node = kubeprovider.SelectNodeByInstanceID(instanceID, r.Cloud.ClusterNodes) nodeCreationTime = node.CreationTimestamp.Time - upgradeCreationTime = rollingUpgrade.CreationTimestamp.Time + upgradeCreationTime = r.RollingUpgrade.CreationTimestamp.Time ) if nodeCreationTime.Before(upgradeCreationTime) { - r.Info("rolling upgrade configured for forced refresh", "instance", instanceID, "name", rollingUpgrade.NamespacedName()) + r.Info("rolling upgrade configured for forced refresh", "instance", instanceID, "name", r.RollingUpgrade.NamespacedName()) return true } } if scalingGroup.LaunchConfigurationName != nil { if instance.LaunchConfigurationName == nil { - r.Info("launch configuration name differs", "instance", instanceID, "name", rollingUpgrade.NamespacedName()) + r.Info("launch configuration name differs", "instance", instanceID, "name", r.RollingUpgrade.NamespacedName()) return true } launchConfigName := aws.StringValue(scalingGroup.LaunchConfigurationName) instanceConfigName := aws.StringValue(instance.LaunchConfigurationName) if !strings.EqualFold(launchConfigName, instanceConfigName) { - r.Info("launch configuration name differs", "instance", instanceID, "name", rollingUpgrade.NamespacedName()) + r.Info("launch configuration name differs", "instance", instanceID, "name", r.RollingUpgrade.NamespacedName()) return true } } else if scalingGroup.LaunchTemplate != nil { if instance.LaunchTemplate == nil { - r.Info("launch template name differs", "instance", instanceID, "name", rollingUpgrade.NamespacedName()) + r.Info("launch template name differs", "instance", instanceID, "name", r.RollingUpgrade.NamespacedName()) return true } @@ -417,16 +428,16 @@ func (r *RollingUpgradeReconciler) IsInstanceDrifted(rollingUpgrade *v1alpha1.Ro ) if !strings.EqualFold(launchTemplateName, instanceTemplateName) { - r.Info("launch template name differs", "instance", instanceID, "name", rollingUpgrade.NamespacedName()) + r.Info("launch template name differs", "instance", instanceID, "name", r.RollingUpgrade.NamespacedName()) return true } else if !strings.EqualFold(instanceTemplateVersion, templateVersion) { - r.Info("launch template version differs", "instance", instanceID, "name", rollingUpgrade.NamespacedName()) + r.Info("launch template version differs", "instance", instanceID, "name", r.RollingUpgrade.NamespacedName()) return true } } else if scalingGroup.MixedInstancesPolicy != nil { if instance.LaunchTemplate == nil { - r.Info("launch template name differs", "instance", instanceID, "name", rollingUpgrade.NamespacedName()) + r.Info("launch template name differs", "instance", instanceID, "name", r.RollingUpgrade.NamespacedName()) return true } @@ -438,32 +449,33 @@ func (r *RollingUpgradeReconciler) IsInstanceDrifted(rollingUpgrade *v1alpha1.Ro ) if !strings.EqualFold(launchTemplateName, instanceTemplateName) { - r.Info("launch template name differs", "instance", instanceID, "name", rollingUpgrade.NamespacedName()) + r.Info("launch template name differs", "instance", instanceID, "name", r.RollingUpgrade.NamespacedName()) return true } else if !strings.EqualFold(instanceTemplateVersion, templateVersion) { - r.Info("launch template version differs", "instance", instanceID, "name", rollingUpgrade.NamespacedName()) + r.Info("launch template version differs", "instance", instanceID, "name", r.RollingUpgrade.NamespacedName()) return true } } - r.Info("node refresh not required", "name", rollingUpgrade.NamespacedName(), "instance", instanceID) + r.Info("node refresh not required", "name", r.RollingUpgrade.NamespacedName(), "instance", instanceID) return false } -func (r *RollingUpgradeReconciler) IsScalingGroupDrifted(rollingUpgrade *v1alpha1.RollingUpgrade) bool { - r.Info("checking if rolling upgrade is completed", "name", rollingUpgrade.NamespacedName()) - scalingGroup := awsprovider.SelectScalingGroup(rollingUpgrade.ScalingGroupName(), r.Cloud.ScalingGroups) +func (r *RollingUpgradeContext) IsScalingGroupDrifted() bool { + r.Info("checking if rolling upgrade is completed", "name", r.RollingUpgrade.NamespacedName()) + + scalingGroup := awsprovider.SelectScalingGroup(r.RollingUpgrade.ScalingGroupName(), r.Cloud.ScalingGroups) for _, instance := range scalingGroup.Instances { - if r.IsInstanceDrifted(rollingUpgrade, instance) { + if r.IsInstanceDrifted(instance) { return true } } return false } -func (r *RollingUpgradeReconciler) DesiredNodesReady(rollingUpgrade *v1alpha1.RollingUpgrade) bool { +func (r *RollingUpgradeContext) DesiredNodesReady() bool { var ( - scalingGroup = awsprovider.SelectScalingGroup(rollingUpgrade.ScalingGroupName(), r.Cloud.ScalingGroups) + scalingGroup = awsprovider.SelectScalingGroup(r.RollingUpgrade.ScalingGroupName(), r.Cloud.ScalingGroups) desiredInstances = aws.Int64Value(scalingGroup.DesiredCapacity) readyNodes = 0 ) @@ -478,7 +490,7 @@ func (r *RollingUpgradeReconciler) DesiredNodesReady(rollingUpgrade *v1alpha1.Ro if r.Cloud.ClusterNodes != nil && !reflect.DeepEqual(r.Cloud.ClusterNodes, &corev1.NodeList{}) { for _, node := range r.Cloud.ClusterNodes.Items { instanceID := kubeprovider.GetNodeInstanceID(node) - if common.ContainsEqualFold(inServiceInstances, instanceID) && kubeprovider.IsNodeReady(node) && kubeprovider.IsNodePassesReadinessGates(node, rollingUpgrade.Spec.ReadinessGates) { + if common.ContainsEqualFold(inServiceInstances, instanceID) && kubeprovider.IsNodeReady(node) && kubeprovider.IsNodePassesReadinessGates(node, r.RollingUpgrade.Spec.ReadinessGates) { readyNodes++ } } diff --git a/controllers/upgrade_test.go b/controllers/upgrade_test.go index 2572be5a..240277b6 100644 --- a/controllers/upgrade_test.go +++ b/controllers/upgrade_test.go @@ -21,21 +21,21 @@ func TestListClusterNodes(t *testing.T) { var tests = []struct { TestDescription string Reconciler *RollingUpgradeReconciler - RollingUpgrade *v1alpha1.RollingUpgrade Node *corev1.Node ExpectError bool }{ { "List cluster should succeed", createRollingUpgradeReconciler(t), - createRollingUpgrade(), createNode(), false, }, } for _, test := range tests { - actual, err := test.Reconciler.Auth.ListClusterNodes() + rollupCtx := createRollingUpgradeContext(test.Reconciler) + + actual, err := rollupCtx.Auth.ListClusterNodes() expected := createNodeList() if err != nil || !reflect.DeepEqual(actual, expected) { t.Errorf("ListClusterNodes fail %v", err) @@ -48,32 +48,30 @@ func TestDrainNode(t *testing.T) { var tests = []struct { TestDescription string Reconciler *RollingUpgradeReconciler - RollingUpgrade *v1alpha1.RollingUpgrade Node *corev1.Node ExpectError bool }{ { "Drain should succeed as node is registered with fakeClient", createRollingUpgradeReconciler(t), - createRollingUpgrade(), createNode(), false, }, { "Drain should fail as node is not registered with fakeClient", createRollingUpgradeReconciler(t), - createRollingUpgrade(), &corev1.Node{}, true, }, } for _, test := range tests { - err := test.Reconciler.Auth.DrainNode( + rollupCtx := createRollingUpgradeContext(test.Reconciler) + err := rollupCtx.Auth.DrainNode( test.Node, - time.Duration(test.RollingUpgrade.PostDrainDelaySeconds()), - test.RollingUpgrade.DrainTimeout(), - test.Reconciler.Auth.Kubernetes, + time.Duration(rollupCtx.RollingUpgrade.PostDrainDelaySeconds()), + rollupCtx.RollingUpgrade.DrainTimeout(), + rollupCtx.Auth.Kubernetes, ) if (test.ExpectError && err == nil) || (!test.ExpectError && err != nil) { t.Errorf("Test Description: %s \n expected error(bool): %v, Actual err: %v", test.TestDescription, test.ExpectError, err) @@ -87,7 +85,6 @@ func TestRunCordonOrUncordon(t *testing.T) { var tests = []struct { TestDescription string Reconciler *RollingUpgradeReconciler - RollingUpgrade *v1alpha1.RollingUpgrade Node *corev1.Node Cordon bool ExpectError bool @@ -95,7 +92,6 @@ func TestRunCordonOrUncordon(t *testing.T) { { "Cordon should succeed as node is registered with fakeClient", createRollingUpgradeReconciler(t), - createRollingUpgrade(), createNode(), true, false, @@ -103,7 +99,6 @@ func TestRunCordonOrUncordon(t *testing.T) { { "Cordon should fail as node is not registered with fakeClient", createRollingUpgradeReconciler(t), - createRollingUpgrade(), &corev1.Node{}, true, true, @@ -111,7 +106,6 @@ func TestRunCordonOrUncordon(t *testing.T) { { "Uncordon should succeed as node is registered with fakeClient", createRollingUpgradeReconciler(t), - createRollingUpgrade(), func() *corev1.Node { node := createNode() node.Spec.Unschedulable = true @@ -123,7 +117,6 @@ func TestRunCordonOrUncordon(t *testing.T) { { "Uncordon should fail as node is not registered with fakeClient", createRollingUpgradeReconciler(t), - createRollingUpgrade(), func() *corev1.Node { node := &corev1.Node{} node.Spec.Unschedulable = true @@ -135,15 +128,16 @@ func TestRunCordonOrUncordon(t *testing.T) { } for _, test := range tests { + rollupCtx := createRollingUpgradeContext(test.Reconciler) helper := &drain.Helper{ - Client: test.Reconciler.Auth.Kubernetes, + Client: rollupCtx.Auth.Kubernetes, Force: true, GracePeriodSeconds: -1, IgnoreAllDaemonSets: true, Out: os.Stdout, ErrOut: os.Stdout, DeleteEmptyDirData: true, - Timeout: time.Duration(test.RollingUpgrade.Spec.Strategy.DrainTimeout) * time.Second, + Timeout: time.Duration(rollupCtx.RollingUpgrade.Spec.Strategy.DrainTimeout) * time.Second, } err := drain.RunCordonOrUncordon(helper, test.Node, test.Cordon) if (test.ExpectError && err == nil) || (!test.ExpectError && err != nil) { @@ -166,37 +160,35 @@ func TestRunDrainNode(t *testing.T) { var tests = []struct { TestDescription string Reconciler *RollingUpgradeReconciler - RollingUpgrade *v1alpha1.RollingUpgrade Node *corev1.Node ExpectError bool }{ { "Drain should succeed as node is registered with fakeClient", createRollingUpgradeReconciler(t), - createRollingUpgrade(), createNode(), false, }, // This test should fail, create an upstream ticket. // https://github.com/kubernetes/kubectl/blob/d5b32e7f3c0260abb5b1cd5a62d4eb1de287bc93/pkg/drain/default.go#L33 - { - "Drain should fail as node is not registered with fakeClient", - createRollingUpgradeReconciler(t), - createRollingUpgrade(), - &corev1.Node{}, - true, - }, + // { + // "Drain should fail as node is not registered with fakeClient", + // createRollingUpgradeReconciler(t), + // &corev1.Node{}, + // true, + // }, } for _, test := range tests { + rollupCtx := createRollingUpgradeContext(test.Reconciler) helper := &drain.Helper{ - Client: test.Reconciler.Auth.Kubernetes, + Client: rollupCtx.Auth.Kubernetes, Force: true, GracePeriodSeconds: -1, IgnoreAllDaemonSets: true, Out: os.Stdout, ErrOut: os.Stdout, DeleteEmptyDirData: true, - Timeout: time.Duration(test.RollingUpgrade.Spec.Strategy.DrainTimeout) * time.Second, + Timeout: time.Duration(rollupCtx.RollingUpgrade.Spec.Strategy.DrainTimeout) * time.Second, } err := drain.RunNodeDrain(helper, test.Node.Name) if (test.ExpectError && err == nil) || (!test.ExpectError && err != nil) { @@ -210,35 +202,32 @@ func TestIsInstanceDrifted(t *testing.T) { var tests = []struct { TestDescription string Reconciler *RollingUpgradeReconciler - RollingUpgrade *v1alpha1.RollingUpgrade Instance *autoscaling.Instance ExpectedValue bool }{ { "Instance has the same launch config as the ASG, expect false from IsInstanceDrifted", createRollingUpgradeReconciler(t), - createRollingUpgrade(), createASGInstance("mock-instance-1", "mock-launch-config-1"), false, }, { "Instance has different launch config from the ASG, expect true from IsInstanceDrifted", createRollingUpgradeReconciler(t), - createRollingUpgrade(), createASGInstance("mock-instance-1", "different-launch-config"), true, }, { "Instance has no launch config, expect true from IsInstanceDrifted", createRollingUpgradeReconciler(t), - createRollingUpgrade(), createASGInstance("mock-instance-1", ""), true, }, } for _, test := range tests { - test.Reconciler.Cloud.ScalingGroups = createASGs() - actualValue := test.Reconciler.IsInstanceDrifted(test.RollingUpgrade, test.Instance) + rollupCtx := createRollingUpgradeContext(test.Reconciler) + rollupCtx.Cloud.ScalingGroups = createASGs() + actualValue := rollupCtx.IsInstanceDrifted(test.Instance) if actualValue != test.ExpectedValue { t.Errorf("Test Description: %s \n expected value: %v, actual value: %v", test.TestDescription, test.ExpectedValue, actualValue) } @@ -249,21 +238,18 @@ func TestIsScalingGroupDrifted(t *testing.T) { var tests = []struct { TestDescription string Reconciler *RollingUpgradeReconciler - RollingUpgrade *v1alpha1.RollingUpgrade AsgClient *MockAutoscalingGroup ExpectedValue bool }{ { "All instances have the same launch config as the ASG, expect false from IsScalingGroupDrifted", createRollingUpgradeReconciler(t), - createRollingUpgrade(), createASGClient(), false, }, { "All instances have different launch config as the ASG, expect true from IsScalingGroupDrifted", createRollingUpgradeReconciler(t), - createRollingUpgrade(), func() *MockAutoscalingGroup { newAsgClient := createASGClient() newAsgClient.autoScalingGroups[0].LaunchConfigurationName = aws.String("different-launch-config") @@ -273,10 +259,11 @@ func TestIsScalingGroupDrifted(t *testing.T) { }, } for _, test := range tests { - test.Reconciler.Cloud.ScalingGroups = test.AsgClient.autoScalingGroups - test.Reconciler.Auth.AmazonClientSet.AsgClient = test.AsgClient + rollupCtx := createRollingUpgradeContext(test.Reconciler) + rollupCtx.Cloud.ScalingGroups = test.AsgClient.autoScalingGroups + rollupCtx.Auth.AmazonClientSet.AsgClient = test.AsgClient - actualValue := test.Reconciler.IsScalingGroupDrifted(test.RollingUpgrade) + actualValue := rollupCtx.IsScalingGroupDrifted() if actualValue != test.ExpectedValue { t.Errorf("Test Description: %s \n expected value: %v, actual value: %v", test.TestDescription, test.ExpectedValue, actualValue) } @@ -288,7 +275,6 @@ func TestRotateNodes(t *testing.T) { var tests = []struct { TestDescription string Reconciler *RollingUpgradeReconciler - RollingUpgrade *v1alpha1.RollingUpgrade AsgClient *MockAutoscalingGroup ExpectedValue bool ExpectedStatusValue string @@ -296,7 +282,6 @@ func TestRotateNodes(t *testing.T) { { "All instances have different launch config as the ASG, expect true from IsScalingGroupDrifted", createRollingUpgradeReconciler(t), - createRollingUpgrade(), func() *MockAutoscalingGroup { newAsgClient := createASGClient() newAsgClient.autoScalingGroups[0].LaunchConfigurationName = aws.String("different-launch-config") @@ -308,22 +293,22 @@ func TestRotateNodes(t *testing.T) { { "All instances have the same launch config as the ASG, expect false from IsScalingGroupDrifted", createRollingUpgradeReconciler(t), - createRollingUpgrade(), createASGClient(), false, v1alpha1.StatusComplete, }, } for _, test := range tests { - test.Reconciler.Cloud.ScalingGroups = test.AsgClient.autoScalingGroups - test.Reconciler.Auth.AmazonClientSet.AsgClient = test.AsgClient + rollupCtx := createRollingUpgradeContext(test.Reconciler) + rollupCtx.Cloud.ScalingGroups = test.AsgClient.autoScalingGroups + rollupCtx.Auth.AmazonClientSet.AsgClient = test.AsgClient - err := test.Reconciler.RotateNodes(test.RollingUpgrade) + err := rollupCtx.RotateNodes() if err != nil { t.Errorf("Test Description: \n expected value: nil, actual value: %v", err) } - if test.RollingUpgrade.CurrentStatus() != test.ExpectedStatusValue { - t.Errorf("Test Description: %s \n expected value: %s, actual value: %s", test.TestDescription, test.ExpectedStatusValue, test.RollingUpgrade.CurrentStatus()) + if rollupCtx.RollingUpgrade.CurrentStatus() != test.ExpectedStatusValue { + t.Errorf("Test Description: %s \n expected value: %s, actual value: %s", test.TestDescription, test.ExpectedStatusValue, rollupCtx.RollingUpgrade.CurrentStatus()) } } @@ -333,7 +318,6 @@ func TestDesiredNodesReady(t *testing.T) { var tests = []struct { TestDescription string Reconciler *RollingUpgradeReconciler - RollingUpgrade *v1alpha1.RollingUpgrade AsgClient *MockAutoscalingGroup ClusterNodes *corev1.NodeList ExpectedValue bool @@ -341,7 +325,6 @@ func TestDesiredNodesReady(t *testing.T) { { "Desired nodes are ready", createRollingUpgradeReconciler(t), - createRollingUpgrade(), createASGClient(), createNodeList(), true, @@ -349,7 +332,6 @@ func TestDesiredNodesReady(t *testing.T) { { "Desired instances are not ready (desiredCount != inServiceCount)", createRollingUpgradeReconciler(t), - createRollingUpgrade(), func() *MockAutoscalingGroup { newAsgClient := createASGClient() newAsgClient.autoScalingGroups[0].DesiredCapacity = func(x int) *int64 { i := int64(x); return &i }(4) @@ -361,7 +343,6 @@ func TestDesiredNodesReady(t *testing.T) { { "None of the nodes are ready (desiredCount != readyCount)", createRollingUpgradeReconciler(t), - createRollingUpgrade(), createASGClient(), func() *corev1.NodeList { var nodeList = &corev1.NodeList{Items: []corev1.Node{}} @@ -379,7 +360,6 @@ func TestDesiredNodesReady(t *testing.T) { { "None of the instances are InService (desiredCount != inServiceCount)", createRollingUpgradeReconciler(t), - createRollingUpgrade(), func() *MockAutoscalingGroup { newAsgClient := createASGClient() newAsgClient.autoScalingGroups[0].Instances = []*autoscaling.Instance{ @@ -394,11 +374,12 @@ func TestDesiredNodesReady(t *testing.T) { }, } for _, test := range tests { - test.Reconciler.Cloud.ScalingGroups = test.AsgClient.autoScalingGroups - test.Reconciler.Cloud.ClusterNodes = test.ClusterNodes - test.Reconciler.Auth.AmazonClientSet.AsgClient = test.AsgClient + rollupCtx := createRollingUpgradeContext(test.Reconciler) + rollupCtx.Cloud.ScalingGroups = test.AsgClient.autoScalingGroups + rollupCtx.Cloud.ClusterNodes = test.ClusterNodes + rollupCtx.Auth.AmazonClientSet.AsgClient = test.AsgClient - actualValue := test.Reconciler.DesiredNodesReady(test.RollingUpgrade) + actualValue := rollupCtx.DesiredNodesReady() if actualValue != test.ExpectedValue { t.Errorf("Test Description: %s \n expected value: %v, actual value: %v", test.TestDescription, test.ExpectedValue, actualValue) }