Skip to content

Commit

Permalink
chore: add context to rolling update functions
Browse files Browse the repository at this point in the history
Move it out of the struct, and into the function parameters.

This is more go idiomatic.
  • Loading branch information
justinsb committed Dec 27, 2024
1 parent 5c80c56 commit ebcfebe
Show file tree
Hide file tree
Showing 8 changed files with 105 additions and 76 deletions.
3 changes: 1 addition & 2 deletions cmd/kops/delete_instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,6 @@ func RunDeleteInstance(ctx context.Context, f *util.Factory, out io.Writer, opti
d := &instancegroups.RollingUpdateCluster{
Clientset: clientSet,
Cluster: cluster,
Ctx: ctx,
MasterInterval: 0,
NodeInterval: 0,
BastionInterval: 0,
Expand Down Expand Up @@ -248,7 +247,7 @@ func RunDeleteInstance(ctx context.Context, f *util.Factory, out io.Writer, opti
}
d.ClusterValidator = clusterValidator

return d.UpdateSingleInstance(cloudMember, options.Surge)
return d.UpdateSingleInstance(ctx, cloudMember, options.Surge)
}

func getNodes(ctx context.Context, cluster *kopsapi.Cluster, verbose bool) (kubernetes.Interface, string, []v1.Node, error) {
Expand Down
3 changes: 1 addition & 2 deletions cmd/kops/rolling-update_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,6 @@ func RunRollingUpdateCluster(ctx context.Context, f *util.Factory, out io.Writer

d := &instancegroups.RollingUpdateCluster{
Clientset: clientset,
Ctx: ctx,
Cluster: cluster,
MasterInterval: options.ControlPlaneInterval,
NodeInterval: options.NodeInterval,
Expand Down Expand Up @@ -456,7 +455,7 @@ func RunRollingUpdateCluster(ctx context.Context, f *util.Factory, out io.Writer
}
d.ClusterValidator = clusterValidator

return d.RollingUpdate(groups, list)
return d.RollingUpdate(ctx, groups, list)
}

func completeInstanceGroup(f commandutils.Factory, selectedInstanceGroups *[]string, selectedInstanceGroupRoles *[]string) func(cmd *cobra.Command, args []string, toComplete string) ([]string, cobra.ShellCompDirective) {
Expand Down
44 changes: 22 additions & 22 deletions pkg/instancegroups/instancegroups.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func promptInteractive(upgradedHostID, upgradedHostName string) (stopPrompting b
}

// RollingUpdate performs a rolling update on a list of instances.
func (c *RollingUpdateCluster) rollingUpdateInstanceGroup(group *cloudinstances.CloudInstanceGroup, sleepAfterTerminate time.Duration) (err error) {
func (c *RollingUpdateCluster) rollingUpdateInstanceGroup(ctx context.Context, group *cloudinstances.CloudInstanceGroup, sleepAfterTerminate time.Duration) (err error) {
isBastion := group.InstanceGroup.IsBastion()
// Do not need a k8s client if you are doing cloudonly.
if c.K8sClient == nil && !c.CloudOnly {
Expand All @@ -123,7 +123,7 @@ func (c *RollingUpdateCluster) rollingUpdateInstanceGroup(group *cloudinstances.
}

if !c.CloudOnly {
err = c.taintAllNeedUpdate(group, update)
err = c.taintAllNeedUpdate(ctx, group, update)
if err != nil {
return err
}
Expand Down Expand Up @@ -221,7 +221,7 @@ func (c *RollingUpdateCluster) rollingUpdateInstanceGroup(group *cloudinstances.

for uIdx, u := range update {
go func(m *cloudinstances.CloudInstance) {
terminateChan <- c.drainTerminateAndWait(m, sleepAfterTerminate)
terminateChan <- c.drainTerminateAndWait(ctx, m, sleepAfterTerminate)
}(u)
runningDrains++

Expand Down Expand Up @@ -319,7 +319,7 @@ func waitForPendingBeforeReturningError(runningDrains int, terminateChan chan er
return err
}

func (c *RollingUpdateCluster) taintAllNeedUpdate(group *cloudinstances.CloudInstanceGroup, update []*cloudinstances.CloudInstance) error {
func (c *RollingUpdateCluster) taintAllNeedUpdate(ctx context.Context, group *cloudinstances.CloudInstanceGroup, update []*cloudinstances.CloudInstance) error {
var toTaint []*corev1.Node
for _, u := range update {
if u.Node != nil && !u.Node.Spec.Unschedulable {
Expand All @@ -341,7 +341,7 @@ func (c *RollingUpdateCluster) taintAllNeedUpdate(group *cloudinstances.CloudIns
}
klog.Infof("Tainting %d %s in %q instancegroup.", len(toTaint), noun, group.InstanceGroup.Name)
for _, n := range toTaint {
if err := c.patchTaint(n); err != nil {
if err := c.patchTaint(ctx, n); err != nil {
if c.FailOnDrainError {
return fmt.Errorf("failed to taint node %q: %v", n, err)
}
Expand All @@ -352,7 +352,7 @@ func (c *RollingUpdateCluster) taintAllNeedUpdate(group *cloudinstances.CloudIns
return nil
}

func (c *RollingUpdateCluster) patchTaint(node *corev1.Node) error {
func (c *RollingUpdateCluster) patchTaint(ctx context.Context, node *corev1.Node) error {
oldData, err := json.Marshal(node)
if err != nil {
return err
Expand All @@ -373,14 +373,14 @@ func (c *RollingUpdateCluster) patchTaint(node *corev1.Node) error {
return err
}

_, err = c.K8sClient.CoreV1().Nodes().Patch(c.Ctx, node.Name, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{})
_, err = c.K8sClient.CoreV1().Nodes().Patch(ctx, node.Name, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{})
if apierrors.IsNotFound(err) {
return nil
}
return err
}

func (c *RollingUpdateCluster) patchExcludeFromLB(node *corev1.Node) error {
func (c *RollingUpdateCluster) patchExcludeFromLB(ctx context.Context, node *corev1.Node) error {
oldData, err := json.Marshal(node)
if err != nil {
return err
Expand All @@ -405,14 +405,14 @@ func (c *RollingUpdateCluster) patchExcludeFromLB(node *corev1.Node) error {
return err
}

_, err = c.K8sClient.CoreV1().Nodes().Patch(c.Ctx, node.Name, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{})
_, err = c.K8sClient.CoreV1().Nodes().Patch(ctx, node.Name, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{})
if apierrors.IsNotFound(err) {
return nil
}
return err
}

func (c *RollingUpdateCluster) drainTerminateAndWait(u *cloudinstances.CloudInstance, sleepAfterTerminate time.Duration) error {
func (c *RollingUpdateCluster) drainTerminateAndWait(ctx context.Context, u *cloudinstances.CloudInstance, sleepAfterTerminate time.Duration) error {
instanceID := u.ID

nodeName := ""
Expand All @@ -430,7 +430,7 @@ func (c *RollingUpdateCluster) drainTerminateAndWait(u *cloudinstances.CloudInst
if u.Node != nil {
klog.Infof("Draining the node: %q.", nodeName)

if err := c.drainNode(u); err != nil {
if err := c.drainNode(ctx, u); err != nil {
if c.FailOnDrainError {
return fmt.Errorf("failed to drain node %q: %v", nodeName, err)
}
Expand All @@ -449,7 +449,7 @@ func (c *RollingUpdateCluster) drainTerminateAndWait(u *cloudinstances.CloudInst
klog.Warningf("no kubernetes Node associated with %s, skipping node deletion", instanceID)
} else {
klog.Infof("deleting node %q from kubernetes", nodeName)
if err := c.deleteNode(u.Node); err != nil {
if err := c.deleteNode(ctx, u.Node); err != nil {
return fmt.Errorf("error deleting node %q: %v", nodeName, err)
}
}
Expand All @@ -460,7 +460,7 @@ func (c *RollingUpdateCluster) drainTerminateAndWait(u *cloudinstances.CloudInst
return err
}

if err := c.reconcileInstanceGroup(); err != nil {
if err := c.reconcileInstanceGroup(ctx); err != nil {
klog.Errorf("error reconciling instance group %q: %v", u.CloudInstanceGroup.HumanName, err)
return err
}
Expand All @@ -472,7 +472,7 @@ func (c *RollingUpdateCluster) drainTerminateAndWait(u *cloudinstances.CloudInst
return nil
}

func (c *RollingUpdateCluster) reconcileInstanceGroup() error {
func (c *RollingUpdateCluster) reconcileInstanceGroup(ctx context.Context) error {
if c.Cluster.GetCloudProvider() != api.CloudProviderOpenstack &&
c.Cluster.GetCloudProvider() != api.CloudProviderHetzner &&
c.Cluster.GetCloudProvider() != api.CloudProviderScaleway &&
Expand All @@ -497,7 +497,7 @@ func (c *RollingUpdateCluster) reconcileInstanceGroup() error {
DeletionProcessing: fi.DeletionProcessingModeDeleteIfNotDeferrred,
}

_, err := applyCmd.Run(c.Ctx)
_, err := applyCmd.Run(ctx)
return err
}

Expand Down Expand Up @@ -645,7 +645,7 @@ func (c *RollingUpdateCluster) deleteInstance(u *cloudinstances.CloudInstance) e
}

// drainNode drains a K8s node.
func (c *RollingUpdateCluster) drainNode(u *cloudinstances.CloudInstance) error {
func (c *RollingUpdateCluster) drainNode(ctx context.Context, u *cloudinstances.CloudInstance) error {
if c.K8sClient == nil {
return fmt.Errorf("K8sClient not set")
}
Expand All @@ -659,7 +659,7 @@ func (c *RollingUpdateCluster) drainNode(u *cloudinstances.CloudInstance) error
}

helper := &drain.Helper{
Ctx: c.Ctx,
Ctx: ctx,
Client: c.K8sClient,
Force: true,
GracePeriodSeconds: -1,
Expand All @@ -679,7 +679,7 @@ func (c *RollingUpdateCluster) drainNode(u *cloudinstances.CloudInstance) error
return fmt.Errorf("error cordoning node: %v", err)
}

if err := c.patchExcludeFromLB(u.Node); err != nil {
if err := c.patchExcludeFromLB(ctx, u.Node); err != nil {
if apierrors.IsNotFound(err) {
return nil
}
Expand Down Expand Up @@ -720,9 +720,9 @@ func (c *RollingUpdateCluster) drainNode(u *cloudinstances.CloudInstance) error
}

// deleteNode deletes a node from the k8s API. It does not delete the underlying instance.
func (c *RollingUpdateCluster) deleteNode(node *corev1.Node) error {
func (c *RollingUpdateCluster) deleteNode(ctx context.Context, node *corev1.Node) error {
var options metav1.DeleteOptions
err := c.K8sClient.CoreV1().Nodes().Delete(c.Ctx, node.Name, options)
err := c.K8sClient.CoreV1().Nodes().Delete(ctx, node.Name, options)
if err != nil {
if apierrors.IsNotFound(err) {
return nil
Expand All @@ -735,7 +735,7 @@ func (c *RollingUpdateCluster) deleteNode(node *corev1.Node) error {
}

// UpdateSingleInstance performs a rolling update on a single instance
func (c *RollingUpdateCluster) UpdateSingleInstance(cloudMember *cloudinstances.CloudInstance, detach bool) error {
func (c *RollingUpdateCluster) UpdateSingleInstance(ctx context.Context, cloudMember *cloudinstances.CloudInstance, detach bool) error {
if detach {
if cloudMember.CloudInstanceGroup.InstanceGroup.IsControlPlane() {
klog.Warning("cannot detach control-plane instances. Assuming --surge=false")
Expand All @@ -750,5 +750,5 @@ func (c *RollingUpdateCluster) UpdateSingleInstance(cloudMember *cloudinstances.
}
}

return c.drainTerminateAndWait(cloudMember, 0)
return c.drainTerminateAndWait(ctx, cloudMember, 0)
}
2 changes: 1 addition & 1 deletion pkg/instancegroups/instancegroups_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func TestWarmPoolOnlyRoll(t *testing.T) {
instance.State = cloudinstances.WarmPool

{
err := c.rollingUpdateInstanceGroup(group, 0*time.Second)
err := c.rollingUpdateInstanceGroup(ctx, group, 0*time.Second)
if err != nil {
t.Fatalf("could not roll instance group: %v", err)
}
Expand Down
11 changes: 5 additions & 6 deletions pkg/instancegroups/rollingupdate.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ import (
// RollingUpdateCluster is a struct containing cluster information for a rolling update.
type RollingUpdateCluster struct {
Clientset simple.Clientset
Ctx context.Context
Cluster *api.Cluster
Cloud fi.Cloud

Expand Down Expand Up @@ -106,7 +105,7 @@ func (*RollingUpdateCluster) AdjustNeedUpdate(groups map[string]*cloudinstances.
}

// RollingUpdate performs a rolling update on a K8s Cluster.
func (c *RollingUpdateCluster) RollingUpdate(groups map[string]*cloudinstances.CloudInstanceGroup, instanceGroups *api.InstanceGroupList) error {
func (c *RollingUpdateCluster) RollingUpdate(ctx context.Context, groups map[string]*cloudinstances.CloudInstanceGroup, instanceGroups *api.InstanceGroupList) error {
if len(groups) == 0 {
klog.Info("Cloud Instance Group length is zero. Not doing a rolling-update.")
return nil
Expand Down Expand Up @@ -147,7 +146,7 @@ func (c *RollingUpdateCluster) RollingUpdate(groups map[string]*cloudinstances.C

defer wg.Done()

err := c.rollingUpdateInstanceGroup(bastionGroups[k], c.BastionInterval)
err := c.rollingUpdateInstanceGroup(ctx, bastionGroups[k], c.BastionInterval)

resultsMutex.Lock()
results[k] = err
Expand All @@ -172,7 +171,7 @@ func (c *RollingUpdateCluster) RollingUpdate(groups map[string]*cloudinstances.C
// and we don't want to roll all the control-plane nodes at the same time. See issue #284

for _, k := range sortGroups(masterGroups) {
err := c.rollingUpdateInstanceGroup(masterGroups[k], c.MasterInterval)
err := c.rollingUpdateInstanceGroup(ctx, masterGroups[k], c.MasterInterval)
// Do not continue update if control-plane node(s) failed; cluster is potentially in an unhealthy state.
if err != nil {
return fmt.Errorf("control-plane node not healthy after update, stopping rolling-update: %q", err)
Expand All @@ -187,7 +186,7 @@ func (c *RollingUpdateCluster) RollingUpdate(groups map[string]*cloudinstances.C
}

for _, k := range sortGroups(apiServerGroups) {
err := c.rollingUpdateInstanceGroup(apiServerGroups[k], c.NodeInterval)
err := c.rollingUpdateInstanceGroup(ctx, apiServerGroups[k], c.NodeInterval)
results[k] = err
if err != nil {
klog.Errorf("failed to roll InstanceGroup %q: %v", k, err)
Expand All @@ -212,7 +211,7 @@ func (c *RollingUpdateCluster) RollingUpdate(groups map[string]*cloudinstances.C
}

for _, k := range sortGroups(nodeGroups) {
err := c.rollingUpdateInstanceGroup(nodeGroups[k], c.NodeInterval)
err := c.rollingUpdateInstanceGroup(ctx, nodeGroups[k], c.NodeInterval)
results[k] = err
if err != nil {
klog.Errorf("failed to roll InstanceGroup %q: %v", k, err)
Expand Down
7 changes: 4 additions & 3 deletions pkg/instancegroups/rollingupdate_os_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@ func getTestSetupOS(t *testing.T, ctx context.Context) (*RollingUpdateCluster, *
ValidateTickDuration: 1 * time.Millisecond,
ValidateSuccessDuration: 5 * time.Millisecond,
ValidateCount: 2,
Ctx: ctx,
Cluster: cluster,
Clientset: clientset,
}
Expand All @@ -106,7 +105,7 @@ func TestRollingUpdateDisabledSurgeOS(t *testing.T) {
c, cloud := getTestSetupOS(t, ctx)

groups, igList := getGroupsAllNeedUpdateOS(t, c)
err := c.RollingUpdate(groups, igList)
err := c.RollingUpdate(ctx, groups, igList)
assert.NoError(t, err, "rolling update")

assertGroupInstanceCountOS(t, cloud, "node-1", 3)
Expand All @@ -117,6 +116,8 @@ func TestRollingUpdateDisabledSurgeOS(t *testing.T) {

func makeGroupOS(t *testing.T, groups map[string]*cloudinstances.CloudInstanceGroup, igList *kopsapi.InstanceGroupList,
c *RollingUpdateCluster, subnet string, role kopsapi.InstanceGroupRole, count int, needUpdate int) {
ctx := context.TODO()

cloud := c.Cloud.(*openstack.MockCloud)
igif := c.Clientset.InstanceGroupsFor(c.Cluster)
fakeClient := c.K8sClient.(*fake.Clientset)
Expand All @@ -136,7 +137,7 @@ func makeGroupOS(t *testing.T, groups map[string]*cloudinstances.CloudInstanceGr

igList.Items = append(igList.Items, newIg)

ig, err := igif.Create(c.Ctx, &newIg, v1meta.CreateOptions{})
ig, err := igif.Create(ctx, &newIg, v1meta.CreateOptions{})
if err != nil {
t.Fatalf("Failed to create ig %v: %v", subnet, err)
}
Expand Down
Loading

0 comments on commit ebcfebe

Please sign in to comment.