Skip to content

Commit

Permalink
Fix check partition logic.
Browse files Browse the repository at this point in the history
Signed-off-by: d-kuro <[email protected]>
  • Loading branch information
d-kuro committed Jul 31, 2024
1 parent 69a62a0 commit abecf35
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 54 deletions.
81 changes: 34 additions & 47 deletions controllers/partition_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,12 @@ func (r *StatefulSetPartitionReconciler) Reconcile(ctx context.Context, req reco
metrics.CurrentReplicasVec.WithLabelValues(cluster.Name, cluster.Namespace).Set(float64(sts.Status.CurrentReplicas))
metrics.UpdatedReplicasVec.WithLabelValues(cluster.Name, cluster.Namespace).Set(float64(sts.Status.UpdatedReplicas))

// In this case, the reconciliation of MySQLClusterReconciler has not been completed.
// Wait until completion.
if cluster.Generation != cluster.Status.ReconcileInfo.Generation || sts.Generation != sts.Status.ObservedGeneration {
return reconcile.Result{RequeueAfter: 10 * time.Second}, nil
}

if !r.needPartitionUpdate(sts) {
return reconcile.Result{}, nil
}
Expand Down Expand Up @@ -134,6 +140,12 @@ func (r *StatefulSetPartitionReconciler) SetupWithManager(mgr ctrl.Manager) erro
func (r *StatefulSetPartitionReconciler) isRolloutReady(ctx context.Context, cluster *mocov1beta2.MySQLCluster, sts *appsv1.StatefulSet) (bool, error) {
log := crlog.FromContext(ctx)

if sts.Spec.Replicas == &sts.Status.UpdatedReplicas {
// In this case, a rolling update has been completed.
// Update the partition to the initial value (`sts.spec.replicas`) and finish the reconciliation.
return true, nil
}

podList, err := r.getSortedPodList(ctx, sts)
if err != nil {
return false, fmt.Errorf("failed to get pod list: %w", err)
Expand All @@ -156,6 +168,10 @@ func (r *StatefulSetPartitionReconciler) isRolloutReady(ctx context.Context, clu
return false, nil
}

if podList.Items[nextRolloutTarget].Labels[appsv1.ControllerRevisionHashLabelKey] == sts.Status.UpdateRevision {
return true, nil
}

// If not all Pods are ready, the MySQLCluster becomes Unhealthy.
// Even if the MySQLCluster is not healthy, the rollout continues if the rollout target Pod is not ready.
// This is because there is an expectation that restarting the Not Ready Pod might improve its state.
Expand All @@ -164,10 +180,7 @@ func (r *StatefulSetPartitionReconciler) isRolloutReady(ctx context.Context, clu
return false, nil
}

ready, err := r.areAllChildPodsRolloutReady(ctx, sts, podList)
if err != nil {
return false, fmt.Errorf("failed to check if all child pods are ready: %w", err)
}
ready := r.areAllChildPodsRolloutReady(ctx, sts, podList)

return ready, nil
}
Expand All @@ -193,19 +206,9 @@ func (r *StatefulSetPartitionReconciler) getSortedPodList(ctx context.Context, s
return podList, nil
}

func (r *StatefulSetPartitionReconciler) areAllChildPodsRolloutReady(ctx context.Context, sts *appsv1.StatefulSet, sortedPodList *corev1.PodList) (bool, error) {
func (r *StatefulSetPartitionReconciler) areAllChildPodsRolloutReady(ctx context.Context, sts *appsv1.StatefulSet, sortedPodList *corev1.PodList) bool {
log := crlog.FromContext(ctx)

firstReivision := sortedPodList.Items[0].Labels[appsv1.ControllerRevisionHashLabelKey]
lastIndex := len(sortedPodList.Items) - 1
lastRevision := sortedPodList.Items[lastIndex].Labels[appsv1.ControllerRevisionHashLabelKey]
revisionCounts := make(map[string]int)

for _, pod := range sortedPodList.Items {
revision := pod.Labels[appsv1.ControllerRevisionHashLabelKey]
revisionCounts[revision]++
}

nextRolloutTarget := r.nextRolloutTargetIndex(sts)

// Proceed with the rollout for the next Pod to be rolled out, even if it is not Ready.
Expand All @@ -217,46 +220,27 @@ func (r *StatefulSetPartitionReconciler) areAllChildPodsRolloutReady(ctx context
for _, pod := range excludeNextRolloutTagetPodList {
if pod.DeletionTimestamp != nil {
log.Info("Pod is in the process of being terminated", "name", pod.Name, "namespace", pod.Namespace)
return false, nil
return false
}
if !podutils.IsPodAvailable(&pod, 5, metav1.Now()) {
log.Info("Pod is not ready", "name", pod.Name, "namespace", pod.Namespace)
return false, nil
return false
}
}

partition := *sts.Spec.UpdateStrategy.RollingUpdate.Partition
expectedPodsCount := int(*sts.Spec.Replicas) - int(partition)

// If the number of partitions is equal to the number of replicas, the rollout has not started.
// Decrease the partition to start the rollout.
// If there is only one revision type in all other cases, the rollout is complete.
if int(*sts.Spec.Replicas) == int(partition) {
return true, nil
} else if len(revisionCounts) == 1 {
return true, nil
}

// If the first and last revisions are the same, it is a rollback.
// To consider rollbacks, search from the beginning of the Pod list,
// and if a Pod with the same revision as the last one is found, add it to the expected number of Pods.
if firstReivision == lastRevision {
for _, pod := range sortedPodList.Items {
revision := pod.Labels[appsv1.ControllerRevisionHashLabelKey]
if revision == lastRevision {
expectedPodsCount++
} else {
break
for _, c := range pod.Status.InitContainerStatuses {
log.Info("Container is not ready", "pod", pod.Name, "namespace", pod.Namespace, "container", c.Name)
if !c.Ready {
return false
}
}
for _, c := range pod.Status.ContainerStatuses {
log.Info("Container is not ready", "pod", pod.Name, "namespace", pod.Namespace, "container", c.Name)
if !c.Ready {
return false
}
}
}

if revisionCounts[lastRevision] != expectedPodsCount {
log.Info("Pod count is different from the expected number", "revision", lastRevision, "expected", expectedPodsCount, "actual", revisionCounts[lastRevision])
return false, nil
}

return true, nil
return true
}

// isMySQLClusterHealthy checks the health status of a given MySQLCluster.
Expand Down Expand Up @@ -307,6 +291,8 @@ func (r *StatefulSetPartitionReconciler) needPartitionUpdate(sts *appsv1.Statefu

// patchNewPartition patches the new partition of a StatefulSet.
func (r *StatefulSetPartitionReconciler) patchNewPartition(ctx context.Context, sts *appsv1.StatefulSet) error {
log := crlog.FromContext(ctx)

oldPartition := *sts.Spec.UpdateStrategy.RollingUpdate.Partition
newPartition := oldPartition - 1

Expand All @@ -321,6 +307,7 @@ func (r *StatefulSetPartitionReconciler) patchNewPartition(ctx context.Context,
return fmt.Errorf("failed to patch new partition to StatefulSet %s/%s: %w", sts.Namespace, sts.Name, err)
}

log.Info("Updated partition", "newPartition", newPartition, "oldPartition", oldPartition)
r.Recorder.Eventf(sts, corev1.EventTypeNormal, "PartitionUpdate", "Updated partition from %d to %d", oldPartition, newPartition)

return nil
Expand Down
48 changes: 42 additions & 6 deletions controllers/partition_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/util/retry"
"k8s.io/utils/ptr"

mocov1beta2 "github.com/cybozu-go/moco/api/v1beta2"
Expand All @@ -29,6 +30,7 @@ func testNewStatefulSet(cluster *mocov1beta2.MySQLCluster) *appsv1.StatefulSet {
OwnerReferences: []metav1.OwnerReference{
*metav1.NewControllerRef(cluster, mocov1beta2.GroupVersion.WithKind("MySQLCluster")),
},
Generation: 1,
},
Spec: appsv1.StatefulSetSpec{
Replicas: ptr.To[int32](cluster.Spec.Replicas),
Expand Down Expand Up @@ -82,7 +84,7 @@ func testNewPods(sts *appsv1.StatefulSet) []*corev1.Pod {
return pods
}

func rolloutPods(ctx context.Context, rev1 int, rev2 int) {
func rolloutPods(ctx context.Context, sts *appsv1.StatefulSet, rev1 int, rev2 int) {
pods := &corev1.PodList{}
err := k8sClient.List(ctx, pods, client.InNamespace("partition"), client.MatchingLabels(map[string]string{"foo": "bar"}))
Expect(err).NotTo(HaveOccurred())
Expand All @@ -105,6 +107,39 @@ func rolloutPods(ctx context.Context, rev1 int, rev2 int) {

err = k8sClient.Update(ctx, &pod)
Expect(err).NotTo(HaveOccurred())

err = retry.RetryOnConflict(retry.DefaultRetry, func() error {
newSts := &appsv1.StatefulSet{}
err = k8sClient.Get(ctx, client.ObjectKey{Namespace: sts.Namespace, Name: sts.Name}, newSts)
if err != nil {
return err
}
if rev1 == 0 {
newSts.Status = appsv1.StatefulSetStatus{
CurrentRevision: "rev2",
UpdateRevision: "rev2",
Replicas: int32(rev1) + int32(rev2),
UpdatedReplicas: int32(rev2),
}
} else if rev2 == 0 {
newSts.Status = appsv1.StatefulSetStatus{
CurrentRevision: "rev1",
UpdateRevision: "rev1",
Replicas: int32(rev1) + int32(rev2),
UpdatedReplicas: int32(rev1),
}
} else {
newSts.Status = appsv1.StatefulSetStatus{
CurrentRevision: "rev1",
UpdateRevision: "rev2",
Replicas: int32(rev1) + int32(rev2),
UpdatedReplicas: int32(rev2),
}
}
newSts.Status.ObservedGeneration = newSts.Generation
return k8sClient.Status().Update(ctx, newSts)
})
Expect(err).NotTo(HaveOccurred())
}
}

Expand Down Expand Up @@ -163,14 +198,15 @@ var _ = Describe("StatefulSet reconciler", func() {
Reason: "healthy",
},
)
cluster.Status.ReconcileInfo.Generation = 1
err = k8sClient.Status().Update(ctx, cluster)
Expect(err).NotTo(HaveOccurred())

sts := testNewStatefulSet(cluster)
err = k8sClient.Create(ctx, sts)
Expect(err).NotTo(HaveOccurred())
sts.Status = appsv1.StatefulSetStatus{
ObservedGeneration: 2,
ObservedGeneration: 1,
CurrentRevision: "rev1",
UpdateRevision: "rev1",
Replicas: 3,
Expand Down Expand Up @@ -211,16 +247,16 @@ var _ = Describe("StatefulSet reconciler", func() {

switch *sts.Spec.UpdateStrategy.RollingUpdate.Partition {
case 3:
rolloutPods(ctx, 2, 1)
rolloutPods(ctx, sts, 2, 1)
case 2:
rolloutPods(ctx, 1, 2)
rolloutPods(ctx, sts, 1, 2)
case 1:
rolloutPods(ctx, 0, 3)
rolloutPods(ctx, sts, 0, 3)
case 0:
return nil
}

return errors.New("unexpected partition")
return fmt.Errorf("unexpected partition: %d", *sts.Spec.UpdateStrategy.RollingUpdate.Partition)
}).Should(Succeed())

events := &corev1.EventList{}
Expand Down
2 changes: 1 addition & 1 deletion e2e/partition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ var _ = Context("partition_test", func() {
Expect(err).NotTo(HaveOccurred())
Expect(sts.Spec.UpdateStrategy.RollingUpdate).NotTo(BeNil())
Expect(sts.Spec.UpdateStrategy.RollingUpdate.Partition).NotTo(BeNil())
Expect(*sts.Spec.UpdateStrategy.RollingUpdate.Partition).To(Equal(int32(1)))
Expect(*sts.Spec.UpdateStrategy.RollingUpdate.Partition).To(Equal(int32(2)))
})

It("should rollback succeed", func() {
Expand Down

0 comments on commit abecf35

Please sign in to comment.