Skip to content

Commit

Permalink
Merge pull request kubernetes#17153 from justinsb/add_context_to_roll…
Browse files Browse the repository at this point in the history
…ing_update

chore: add context to rolling update functions
  • Loading branch information
k8s-ci-robot authored Dec 28, 2024
2 parents df6232e + ebcfebe commit cba6360
Show file tree
Hide file tree
Showing 8 changed files with 105 additions and 76 deletions.
3 changes: 1 addition & 2 deletions cmd/kops/delete_instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,6 @@ func RunDeleteInstance(ctx context.Context, f *util.Factory, out io.Writer, opti
d := &instancegroups.RollingUpdateCluster{
Clientset: clientSet,
Cluster: cluster,
Ctx: ctx,
MasterInterval: 0,
NodeInterval: 0,
BastionInterval: 0,
Expand Down Expand Up @@ -248,7 +247,7 @@ func RunDeleteInstance(ctx context.Context, f *util.Factory, out io.Writer, opti
}
d.ClusterValidator = clusterValidator

return d.UpdateSingleInstance(cloudMember, options.Surge)
return d.UpdateSingleInstance(ctx, cloudMember, options.Surge)
}

func getNodes(ctx context.Context, cluster *kopsapi.Cluster, verbose bool) (kubernetes.Interface, string, []v1.Node, error) {
Expand Down
3 changes: 1 addition & 2 deletions cmd/kops/rolling-update_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,6 @@ func RunRollingUpdateCluster(ctx context.Context, f *util.Factory, out io.Writer

d := &instancegroups.RollingUpdateCluster{
Clientset: clientset,
Ctx: ctx,
Cluster: cluster,
MasterInterval: options.ControlPlaneInterval,
NodeInterval: options.NodeInterval,
Expand Down Expand Up @@ -456,7 +455,7 @@ func RunRollingUpdateCluster(ctx context.Context, f *util.Factory, out io.Writer
}
d.ClusterValidator = clusterValidator

return d.RollingUpdate(groups, list)
return d.RollingUpdate(ctx, groups, list)
}

func completeInstanceGroup(f commandutils.Factory, selectedInstanceGroups *[]string, selectedInstanceGroupRoles *[]string) func(cmd *cobra.Command, args []string, toComplete string) ([]string, cobra.ShellCompDirective) {
Expand Down
44 changes: 22 additions & 22 deletions pkg/instancegroups/instancegroups.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func promptInteractive(upgradedHostID, upgradedHostName string) (stopPrompting b
}

// RollingUpdate performs a rolling update on a list of instances.
func (c *RollingUpdateCluster) rollingUpdateInstanceGroup(group *cloudinstances.CloudInstanceGroup, sleepAfterTerminate time.Duration) (err error) {
func (c *RollingUpdateCluster) rollingUpdateInstanceGroup(ctx context.Context, group *cloudinstances.CloudInstanceGroup, sleepAfterTerminate time.Duration) (err error) {
isBastion := group.InstanceGroup.IsBastion()
// Do not need a k8s client if you are doing cloudonly.
if c.K8sClient == nil && !c.CloudOnly {
Expand All @@ -123,7 +123,7 @@ func (c *RollingUpdateCluster) rollingUpdateInstanceGroup(group *cloudinstances.
}

if !c.CloudOnly {
err = c.taintAllNeedUpdate(group, update)
err = c.taintAllNeedUpdate(ctx, group, update)
if err != nil {
return err
}
Expand Down Expand Up @@ -221,7 +221,7 @@ func (c *RollingUpdateCluster) rollingUpdateInstanceGroup(group *cloudinstances.

for uIdx, u := range update {
go func(m *cloudinstances.CloudInstance) {
terminateChan <- c.drainTerminateAndWait(m, sleepAfterTerminate)
terminateChan <- c.drainTerminateAndWait(ctx, m, sleepAfterTerminate)
}(u)
runningDrains++

Expand Down Expand Up @@ -319,7 +319,7 @@ func waitForPendingBeforeReturningError(runningDrains int, terminateChan chan er
return err
}

func (c *RollingUpdateCluster) taintAllNeedUpdate(group *cloudinstances.CloudInstanceGroup, update []*cloudinstances.CloudInstance) error {
func (c *RollingUpdateCluster) taintAllNeedUpdate(ctx context.Context, group *cloudinstances.CloudInstanceGroup, update []*cloudinstances.CloudInstance) error {
var toTaint []*corev1.Node
for _, u := range update {
if u.Node != nil && !u.Node.Spec.Unschedulable {
Expand All @@ -341,7 +341,7 @@ func (c *RollingUpdateCluster) taintAllNeedUpdate(group *cloudinstances.CloudIns
}
klog.Infof("Tainting %d %s in %q instancegroup.", len(toTaint), noun, group.InstanceGroup.Name)
for _, n := range toTaint {
if err := c.patchTaint(n); err != nil {
if err := c.patchTaint(ctx, n); err != nil {
if c.FailOnDrainError {
return fmt.Errorf("failed to taint node %q: %v", n, err)
}
Expand All @@ -352,7 +352,7 @@ func (c *RollingUpdateCluster) taintAllNeedUpdate(group *cloudinstances.CloudIns
return nil
}

func (c *RollingUpdateCluster) patchTaint(node *corev1.Node) error {
func (c *RollingUpdateCluster) patchTaint(ctx context.Context, node *corev1.Node) error {
oldData, err := json.Marshal(node)
if err != nil {
return err
Expand All @@ -373,14 +373,14 @@ func (c *RollingUpdateCluster) patchTaint(node *corev1.Node) error {
return err
}

_, err = c.K8sClient.CoreV1().Nodes().Patch(c.Ctx, node.Name, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{})
_, err = c.K8sClient.CoreV1().Nodes().Patch(ctx, node.Name, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{})
if apierrors.IsNotFound(err) {
return nil
}
return err
}

func (c *RollingUpdateCluster) patchExcludeFromLB(node *corev1.Node) error {
func (c *RollingUpdateCluster) patchExcludeFromLB(ctx context.Context, node *corev1.Node) error {
oldData, err := json.Marshal(node)
if err != nil {
return err
Expand All @@ -405,14 +405,14 @@ func (c *RollingUpdateCluster) patchExcludeFromLB(node *corev1.Node) error {
return err
}

_, err = c.K8sClient.CoreV1().Nodes().Patch(c.Ctx, node.Name, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{})
_, err = c.K8sClient.CoreV1().Nodes().Patch(ctx, node.Name, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{})
if apierrors.IsNotFound(err) {
return nil
}
return err
}

func (c *RollingUpdateCluster) drainTerminateAndWait(u *cloudinstances.CloudInstance, sleepAfterTerminate time.Duration) error {
func (c *RollingUpdateCluster) drainTerminateAndWait(ctx context.Context, u *cloudinstances.CloudInstance, sleepAfterTerminate time.Duration) error {
instanceID := u.ID

nodeName := ""
Expand All @@ -430,7 +430,7 @@ func (c *RollingUpdateCluster) drainTerminateAndWait(u *cloudinstances.CloudInst
if u.Node != nil {
klog.Infof("Draining the node: %q.", nodeName)

if err := c.drainNode(u); err != nil {
if err := c.drainNode(ctx, u); err != nil {
if c.FailOnDrainError {
return fmt.Errorf("failed to drain node %q: %v", nodeName, err)
}
Expand All @@ -449,7 +449,7 @@ func (c *RollingUpdateCluster) drainTerminateAndWait(u *cloudinstances.CloudInst
klog.Warningf("no kubernetes Node associated with %s, skipping node deletion", instanceID)
} else {
klog.Infof("deleting node %q from kubernetes", nodeName)
if err := c.deleteNode(u.Node); err != nil {
if err := c.deleteNode(ctx, u.Node); err != nil {
return fmt.Errorf("error deleting node %q: %v", nodeName, err)
}
}
Expand All @@ -460,7 +460,7 @@ func (c *RollingUpdateCluster) drainTerminateAndWait(u *cloudinstances.CloudInst
return err
}

if err := c.reconcileInstanceGroup(); err != nil {
if err := c.reconcileInstanceGroup(ctx); err != nil {
klog.Errorf("error reconciling instance group %q: %v", u.CloudInstanceGroup.HumanName, err)
return err
}
Expand All @@ -472,7 +472,7 @@ func (c *RollingUpdateCluster) drainTerminateAndWait(u *cloudinstances.CloudInst
return nil
}

func (c *RollingUpdateCluster) reconcileInstanceGroup() error {
func (c *RollingUpdateCluster) reconcileInstanceGroup(ctx context.Context) error {
if c.Cluster.GetCloudProvider() != api.CloudProviderOpenstack &&
c.Cluster.GetCloudProvider() != api.CloudProviderHetzner &&
c.Cluster.GetCloudProvider() != api.CloudProviderScaleway &&
Expand All @@ -497,7 +497,7 @@ func (c *RollingUpdateCluster) reconcileInstanceGroup() error {
DeletionProcessing: fi.DeletionProcessingModeDeleteIfNotDeferrred,
}

_, err := applyCmd.Run(c.Ctx)
_, err := applyCmd.Run(ctx)
return err
}

Expand Down Expand Up @@ -645,7 +645,7 @@ func (c *RollingUpdateCluster) deleteInstance(u *cloudinstances.CloudInstance) e
}

// drainNode drains a K8s node.
func (c *RollingUpdateCluster) drainNode(u *cloudinstances.CloudInstance) error {
func (c *RollingUpdateCluster) drainNode(ctx context.Context, u *cloudinstances.CloudInstance) error {
if c.K8sClient == nil {
return fmt.Errorf("K8sClient not set")
}
Expand All @@ -659,7 +659,7 @@ func (c *RollingUpdateCluster) drainNode(u *cloudinstances.CloudInstance) error
}

helper := &drain.Helper{
Ctx: c.Ctx,
Ctx: ctx,
Client: c.K8sClient,
Force: true,
GracePeriodSeconds: -1,
Expand All @@ -679,7 +679,7 @@ func (c *RollingUpdateCluster) drainNode(u *cloudinstances.CloudInstance) error
return fmt.Errorf("error cordoning node: %v", err)
}

if err := c.patchExcludeFromLB(u.Node); err != nil {
if err := c.patchExcludeFromLB(ctx, u.Node); err != nil {
if apierrors.IsNotFound(err) {
return nil
}
Expand Down Expand Up @@ -720,9 +720,9 @@ func (c *RollingUpdateCluster) drainNode(u *cloudinstances.CloudInstance) error
}

// deleteNode deletes a node from the k8s API. It does not delete the underlying instance.
func (c *RollingUpdateCluster) deleteNode(node *corev1.Node) error {
func (c *RollingUpdateCluster) deleteNode(ctx context.Context, node *corev1.Node) error {
var options metav1.DeleteOptions
err := c.K8sClient.CoreV1().Nodes().Delete(c.Ctx, node.Name, options)
err := c.K8sClient.CoreV1().Nodes().Delete(ctx, node.Name, options)
if err != nil {
if apierrors.IsNotFound(err) {
return nil
Expand All @@ -735,7 +735,7 @@ func (c *RollingUpdateCluster) deleteNode(node *corev1.Node) error {
}

// UpdateSingleInstance performs a rolling update on a single instance
func (c *RollingUpdateCluster) UpdateSingleInstance(cloudMember *cloudinstances.CloudInstance, detach bool) error {
func (c *RollingUpdateCluster) UpdateSingleInstance(ctx context.Context, cloudMember *cloudinstances.CloudInstance, detach bool) error {
if detach {
if cloudMember.CloudInstanceGroup.InstanceGroup.IsControlPlane() {
klog.Warning("cannot detach control-plane instances. Assuming --surge=false")
Expand All @@ -750,5 +750,5 @@ func (c *RollingUpdateCluster) UpdateSingleInstance(cloudMember *cloudinstances.
}
}

return c.drainTerminateAndWait(cloudMember, 0)
return c.drainTerminateAndWait(ctx, cloudMember, 0)
}
2 changes: 1 addition & 1 deletion pkg/instancegroups/instancegroups_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func TestWarmPoolOnlyRoll(t *testing.T) {
instance.State = cloudinstances.WarmPool

{
err := c.rollingUpdateInstanceGroup(group, 0*time.Second)
err := c.rollingUpdateInstanceGroup(ctx, group, 0*time.Second)
if err != nil {
t.Fatalf("could not roll instance group: %v", err)
}
Expand Down
11 changes: 5 additions & 6 deletions pkg/instancegroups/rollingupdate.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ import (
// RollingUpdateCluster is a struct containing cluster information for a rolling update.
type RollingUpdateCluster struct {
Clientset simple.Clientset
Ctx context.Context
Cluster *api.Cluster
Cloud fi.Cloud

Expand Down Expand Up @@ -106,7 +105,7 @@ func (*RollingUpdateCluster) AdjustNeedUpdate(groups map[string]*cloudinstances.
}

// RollingUpdate performs a rolling update on a K8s Cluster.
func (c *RollingUpdateCluster) RollingUpdate(groups map[string]*cloudinstances.CloudInstanceGroup, instanceGroups *api.InstanceGroupList) error {
func (c *RollingUpdateCluster) RollingUpdate(ctx context.Context, groups map[string]*cloudinstances.CloudInstanceGroup, instanceGroups *api.InstanceGroupList) error {
if len(groups) == 0 {
klog.Info("Cloud Instance Group length is zero. Not doing a rolling-update.")
return nil
Expand Down Expand Up @@ -147,7 +146,7 @@ func (c *RollingUpdateCluster) RollingUpdate(groups map[string]*cloudinstances.C

defer wg.Done()

err := c.rollingUpdateInstanceGroup(bastionGroups[k], c.BastionInterval)
err := c.rollingUpdateInstanceGroup(ctx, bastionGroups[k], c.BastionInterval)

resultsMutex.Lock()
results[k] = err
Expand All @@ -172,7 +171,7 @@ func (c *RollingUpdateCluster) RollingUpdate(groups map[string]*cloudinstances.C
// and we don't want to roll all the control-plane nodes at the same time. See issue #284

for _, k := range sortGroups(masterGroups) {
err := c.rollingUpdateInstanceGroup(masterGroups[k], c.MasterInterval)
err := c.rollingUpdateInstanceGroup(ctx, masterGroups[k], c.MasterInterval)
// Do not continue update if control-plane node(s) failed; cluster is potentially in an unhealthy state.
if err != nil {
return fmt.Errorf("control-plane node not healthy after update, stopping rolling-update: %q", err)
Expand All @@ -187,7 +186,7 @@ func (c *RollingUpdateCluster) RollingUpdate(groups map[string]*cloudinstances.C
}

for _, k := range sortGroups(apiServerGroups) {
err := c.rollingUpdateInstanceGroup(apiServerGroups[k], c.NodeInterval)
err := c.rollingUpdateInstanceGroup(ctx, apiServerGroups[k], c.NodeInterval)
results[k] = err
if err != nil {
klog.Errorf("failed to roll InstanceGroup %q: %v", k, err)
Expand All @@ -212,7 +211,7 @@ func (c *RollingUpdateCluster) RollingUpdate(groups map[string]*cloudinstances.C
}

for _, k := range sortGroups(nodeGroups) {
err := c.rollingUpdateInstanceGroup(nodeGroups[k], c.NodeInterval)
err := c.rollingUpdateInstanceGroup(ctx, nodeGroups[k], c.NodeInterval)
results[k] = err
if err != nil {
klog.Errorf("failed to roll InstanceGroup %q: %v", k, err)
Expand Down
7 changes: 4 additions & 3 deletions pkg/instancegroups/rollingupdate_os_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@ func getTestSetupOS(t *testing.T, ctx context.Context) (*RollingUpdateCluster, *
ValidateTickDuration: 1 * time.Millisecond,
ValidateSuccessDuration: 5 * time.Millisecond,
ValidateCount: 2,
Ctx: ctx,
Cluster: cluster,
Clientset: clientset,
}
Expand All @@ -106,7 +105,7 @@ func TestRollingUpdateDisabledSurgeOS(t *testing.T) {
c, cloud := getTestSetupOS(t, ctx)

groups, igList := getGroupsAllNeedUpdateOS(t, c)
err := c.RollingUpdate(groups, igList)
err := c.RollingUpdate(ctx, groups, igList)
assert.NoError(t, err, "rolling update")

assertGroupInstanceCountOS(t, cloud, "node-1", 3)
Expand All @@ -117,6 +116,8 @@ func TestRollingUpdateDisabledSurgeOS(t *testing.T) {

func makeGroupOS(t *testing.T, groups map[string]*cloudinstances.CloudInstanceGroup, igList *kopsapi.InstanceGroupList,
c *RollingUpdateCluster, subnet string, role kopsapi.InstanceGroupRole, count int, needUpdate int) {
ctx := context.TODO()

cloud := c.Cloud.(*openstack.MockCloud)
igif := c.Clientset.InstanceGroupsFor(c.Cluster)
fakeClient := c.K8sClient.(*fake.Clientset)
Expand All @@ -136,7 +137,7 @@ func makeGroupOS(t *testing.T, groups map[string]*cloudinstances.CloudInstanceGr

igList.Items = append(igList.Items, newIg)

ig, err := igif.Create(c.Ctx, &newIg, v1meta.CreateOptions{})
ig, err := igif.Create(ctx, &newIg, v1meta.CreateOptions{})
if err != nil {
t.Fatalf("Failed to create ig %v: %v", subnet, err)
}
Expand Down
Loading

0 comments on commit cba6360

Please sign in to comment.