diff --git a/controllers/partition_controller.go b/controllers/partition_controller.go index 79008da2..338f58bb 100644 --- a/controllers/partition_controller.go +++ b/controllers/partition_controller.go @@ -68,6 +68,12 @@ func (r *StatefulSetPartitionReconciler) Reconcile(ctx context.Context, req reco metrics.CurrentReplicasVec.WithLabelValues(cluster.Name, cluster.Namespace).Set(float64(sts.Status.CurrentReplicas)) metrics.UpdatedReplicasVec.WithLabelValues(cluster.Name, cluster.Namespace).Set(float64(sts.Status.UpdatedReplicas)) + // In this case, the reconciliation of MySQLClusterReconciler has not been completed. + // Wait until completion. + if cluster.Generation != cluster.Status.ReconcileInfo.Generation || sts.Generation != sts.Status.ObservedGeneration { + return reconcile.Result{RequeueAfter: 10 * time.Second}, nil + } + if !r.needPartitionUpdate(sts) { return reconcile.Result{}, nil } @@ -134,6 +140,12 @@ func (r *StatefulSetPartitionReconciler) SetupWithManager(mgr ctrl.Manager) erro func (r *StatefulSetPartitionReconciler) isRolloutReady(ctx context.Context, cluster *mocov1beta2.MySQLCluster, sts *appsv1.StatefulSet) (bool, error) { log := crlog.FromContext(ctx) + if sts.Spec.Replicas == &sts.Status.UpdatedReplicas { + // In this case, a rolling update has been completed. + // Update the partition to the initial value (`sts.spec.replicas`) and finish the reconciliation. + return true, nil + } + podList, err := r.getSortedPodList(ctx, sts) if err != nil { return false, fmt.Errorf("failed to get pod list: %w", err) @@ -156,6 +168,10 @@ func (r *StatefulSetPartitionReconciler) isRolloutReady(ctx context.Context, clu return false, nil } + if podList.Items[nextRolloutTarget].Labels[appsv1.ControllerRevisionHashLabelKey] == sts.Status.UpdateRevision { + return true, nil + } + // If not all Pods are ready, the MySQLCluster becomes Unhealthy. // Even if the MySQLCluster is not healthy, the rollout continues if the rollout target Pod is not ready. // This is because there is an expectation that restarting the Not Ready Pod might improve its state. @@ -164,10 +180,7 @@ func (r *StatefulSetPartitionReconciler) isRolloutReady(ctx context.Context, clu return false, nil } - ready, err := r.areAllChildPodsRolloutReady(ctx, sts, podList) - if err != nil { - return false, fmt.Errorf("failed to check if all child pods are ready: %w", err) - } + ready := r.areAllChildPodsRolloutReady(ctx, sts, podList) return ready, nil } @@ -193,19 +206,9 @@ func (r *StatefulSetPartitionReconciler) getSortedPodList(ctx context.Context, s return podList, nil } -func (r *StatefulSetPartitionReconciler) areAllChildPodsRolloutReady(ctx context.Context, sts *appsv1.StatefulSet, sortedPodList *corev1.PodList) (bool, error) { +func (r *StatefulSetPartitionReconciler) areAllChildPodsRolloutReady(ctx context.Context, sts *appsv1.StatefulSet, sortedPodList *corev1.PodList) bool { log := crlog.FromContext(ctx) - firstReivision := sortedPodList.Items[0].Labels[appsv1.ControllerRevisionHashLabelKey] - lastIndex := len(sortedPodList.Items) - 1 - lastRevision := sortedPodList.Items[lastIndex].Labels[appsv1.ControllerRevisionHashLabelKey] - revisionCounts := make(map[string]int) - - for _, pod := range sortedPodList.Items { - revision := pod.Labels[appsv1.ControllerRevisionHashLabelKey] - revisionCounts[revision]++ - } - nextRolloutTarget := r.nextRolloutTargetIndex(sts) // Proceed with the rollout for the next Pod to be rolled out, even if it is not Ready. @@ -217,46 +220,27 @@ func (r *StatefulSetPartitionReconciler) areAllChildPodsRolloutReady(ctx context for _, pod := range excludeNextRolloutTagetPodList { if pod.DeletionTimestamp != nil { log.Info("Pod is in the process of being terminated", "name", pod.Name, "namespace", pod.Namespace) - return false, nil + return false } if !podutils.IsPodAvailable(&pod, 5, metav1.Now()) { log.Info("Pod is not ready", "name", pod.Name, "namespace", pod.Namespace) - return false, nil + return false } - } - - partition := *sts.Spec.UpdateStrategy.RollingUpdate.Partition - expectedPodsCount := int(*sts.Spec.Replicas) - int(partition) - - // If the number of partitions is equal to the number of replicas, the rollout has not started. - // Decrease the partition to start the rollout. - // If there is only one revision type in all other cases, the rollout is complete. - if int(*sts.Spec.Replicas) == int(partition) { - return true, nil - } else if len(revisionCounts) == 1 { - return true, nil - } - - // If the first and last revisions are the same, it is a rollback. - // To consider rollbacks, search from the beginning of the Pod list, - // and if a Pod with the same revision as the last one is found, add it to the expected number of Pods. - if firstReivision == lastRevision { - for _, pod := range sortedPodList.Items { - revision := pod.Labels[appsv1.ControllerRevisionHashLabelKey] - if revision == lastRevision { - expectedPodsCount++ - } else { - break + for _, c := range pod.Status.InitContainerStatuses { + log.Info("Pod is not ready", "name", pod.Name, "namespace", pod.Namespace, "container", c.Name) + if !c.Ready { + return false + } + } + for _, c := range pod.Status.ContainerStatuses { + log.Info("Pod is not ready", "name", pod.Name, "namespace", pod.Namespace, "container", c.Name) + if !c.Ready { + return false } } } - if revisionCounts[lastRevision] != expectedPodsCount { - log.Info("Pod count is different from the expected number", "revision", lastRevision, "expected", expectedPodsCount, "actual", revisionCounts[lastRevision]) - return false, nil - } - - return true, nil + return true } // isMySQLClusterHealthy checks the health status of a given MySQLCluster. @@ -307,6 +291,8 @@ func (r *StatefulSetPartitionReconciler) needPartitionUpdate(sts *appsv1.Statefu // patchNewPartition patches the new partition of a StatefulSet. func (r *StatefulSetPartitionReconciler) patchNewPartition(ctx context.Context, sts *appsv1.StatefulSet) error { + log := crlog.FromContext(ctx) + oldPartition := *sts.Spec.UpdateStrategy.RollingUpdate.Partition newPartition := oldPartition - 1 @@ -321,6 +307,7 @@ func (r *StatefulSetPartitionReconciler) patchNewPartition(ctx context.Context, return fmt.Errorf("failed to patch new partition to StatefulSet %s/%s: %w", sts.Namespace, sts.Name, err) } + log.Info("updated partition", "newPartition", newPartition, "oldPartition", oldPartition) r.Recorder.Eventf(sts, corev1.EventTypeNormal, "PartitionUpdate", "Updated partition from %d to %d", oldPartition, newPartition) return nil