Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
d-kuro committed Jan 17, 2024
1 parent 1338c63 commit 08421b3
Show file tree
Hide file tree
Showing 4 changed files with 226 additions and 0 deletions.
6 changes: 6 additions & 0 deletions charts/moco/templates/generated/generated.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,12 @@ rules:
- patch
- update
- watch
- apiGroups:
- ""
resources:
- pods/status
verbs:
- get
- apiGroups:
- ""
resources:
Expand Down
6 changes: 6 additions & 0 deletions config/rbac/role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,12 @@ rules:
- patch
- update
- watch
- apiGroups:
- ""
resources:
- pods/status
verbs:
- get
- apiGroups:
- ""
resources:
Expand Down
213 changes: 213 additions & 0 deletions controllers/partition_controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,213 @@
package controllers

import (
"context"
"errors"
"fmt"

mocov1beta2 "github.com/cybozu-go/moco/api/v1beta2"
"github.com/cybozu-go/moco/pkg/constants"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
appsv1ac "k8s.io/client-go/applyconfigurations/apps/v1"
"k8s.io/client-go/tools/record"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
crlog "sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
)

var _ reconcile.Reconciler = &StatefulSetPartitionReconciler{}

// StatefulSetPartitionReconciler reconciles a StatefulSet object
type StatefulSetPartitionReconciler struct {
client.Client
Recorder record.EventRecorder
}

//+kubebuilder:rbac:groups=moco.cybozu.com,resources=mysqlclusters,verbs=get;list;watch
//+kubebuilder:rbac:groups=apps,resources=statefulsets,verbs=get;list;watch;update;patch
//+kubebuilder:rbac:groups=apps,resources=statefulsets/status,verbs=get
//+kubebuilder:rbac:groups="",resources=pods,verbs=get;list;watch
//+kubebuilder:rbac:groups="",resources=pods/status,verbs=get
//+kubebuilder:rbac:groups="",resources=events,verbs=create;update;patch

// Reconcile implements Reconciler interface.
// See https://pkg.go.dev/sigs.k8s.io/[email protected]/pkg/reconcile#Reconciler
func (r *StatefulSetPartitionReconciler) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) {
log := crlog.FromContext(ctx)

sts := &appsv1.StatefulSet{}
err := r.Get(ctx, req.NamespacedName, sts)
if err != nil {
if apierrors.IsNotFound(err) {
return reconcile.Result{}, nil
}
log.Error(err, "unable to fetch StatefulSet", "name", req.NamespacedName.Name, "namespace", req.NamespacedName.Namespace)
return reconcile.Result{}, err
}

if !r.needPartitionUpdate(sts) {
return reconcile.Result{}, nil
}

if r.isStatefulSetRolloutComplete(sts) {
return reconcile.Result{}, nil
}

ready, err := r.isRolloutReady(ctx, sts)
if err != nil {
log.Error(err, "failed to check if rollout is ready", "name", req.NamespacedName.Name, "namespace", req.NamespacedName.Namespace)
return reconcile.Result{}, err
}
if !ready {
log.Info("rollout is not ready", "name", req.NamespacedName.Name, "namespace", req.NamespacedName.Namespace)
}

if err := r.applyNewPartition(ctx, sts); err != nil {
log.Error(err, "failed to apply new partition", "name", req.NamespacedName.Name, "namespace", req.NamespacedName.Namespace)
return reconcile.Result{}, err
}

return reconcile.Result{}, nil
}

func (r *StatefulSetPartitionReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&appsv1.StatefulSet{}).
Owns(&corev1.Pod{}).
Complete(r)
}

// isRolloutReady returns true if the StatefulSet is ready for rolling update.
func (r *StatefulSetPartitionReconciler) isRolloutReady(ctx context.Context, sts *appsv1.StatefulSet) (bool, error) {
isPodsReady, err := r.areAllChildPodsReady(ctx, sts)
if err != nil {
return false, fmt.Errorf("failed to check if all child pods are ready: %w", err)
}

if !isPodsReady {
return false, nil
}

cluster, err := r.getMySQLCluster(ctx, sts)
if err != nil {
return false, fmt.Errorf("failed to get MySQLCluster: %w", err)
}

if !r.isMySQLClusterHealthy(cluster) {
return false, nil
}

return false, nil
}

// areAllChildPodsReady checks if all child Pods of a given StatefulSet are in Ready state.
// It lists all Pods that match the StatefulSet's selector in the same namespace,
// and checks their Ready condition status.
// The function returns true if all Pods are Ready, and false otherwise.
func (r *StatefulSetPartitionReconciler) areAllChildPodsReady(ctx context.Context, sts *appsv1.StatefulSet) (bool, error) {
podList := &corev1.PodList{}
listOpts := []client.ListOption{
client.InNamespace(sts.Namespace),
client.MatchingLabels(sts.Spec.Selector.MatchLabels),
}

if err := r.List(ctx, podList, listOpts...); err != nil {
return false, err
}

if sts.Status.Replicas != int32(len(podList.Items)) {
return false, fmt.Errorf("number of child pods %d is not equal to replicas %d", len(podList.Items), sts.Status.Replicas)
}

for _, pod := range podList.Items {
for _, condition := range pod.Status.Conditions {
if condition.Type == corev1.PodReady && condition.Status != corev1.ConditionTrue {
return false, nil
}
}
}

return true, nil
}

// isMySQLClusterHealthy checks the health status of a given MySQLCluster.
// It verifies that the 'Available' and 'Healthy' conditions in the cluster's status are 'True',
// and that the value of '.status.syncedReplicas' matches that of '.spec.replicas'.
// The function returns true if all conditions are met, and false otherwise.
func (r *StatefulSetPartitionReconciler) isMySQLClusterHealthy(cluster *mocov1beta2.MySQLCluster) bool {
for _, condition := range cluster.Status.Conditions {
if (condition.Type == mocov1beta2.ConditionAvailable || condition.Type == mocov1beta2.ConditionHealthy) && condition.Status != metav1.ConditionTrue {
return false
}
}

return int32(cluster.Status.SyncedReplicas) == cluster.Spec.Replicas
}

// getMySQLCluster retrieves the MySQLCluster release that owns a given StatefulSet.
func (r *StatefulSetPartitionReconciler) getMySQLCluster(ctx context.Context, sts *appsv1.StatefulSet) (*mocov1beta2.MySQLCluster, error) {
for _, ownerRef := range sts.GetOwnerReferences() {
if ownerRef.Kind != "MySQLCluster" {
continue
}

cluster := &mocov1beta2.MySQLCluster{}
if err := r.Get(ctx, types.NamespacedName{Name: ownerRef.Name, Namespace: sts.Namespace}, cluster); err != nil {
return nil, err
}

return cluster, nil
}

return nil, fmt.Errorf("StatefulSet %s/%s has no owner reference to MySQLCluster", sts.Namespace, sts.Name)
}

// isStatefulSetRolloutComplete returns true if the StatefulSet is update completed.
func (r *StatefulSetPartitionReconciler) isStatefulSetRolloutComplete(sts *appsv1.StatefulSet) bool {
return sts.Status.CurrentRevision == sts.Status.UpdateRevision
}

// needPartitionUpdate returns true if the StatefulSet needs to update partition.
func (r *StatefulSetPartitionReconciler) needPartitionUpdate(sts *appsv1.StatefulSet) bool {
if sts.Annotations[constants.AnnDisablePartitioning] == "true" {
return false
}
if sts.Spec.UpdateStrategy.RollingUpdate == nil || sts.Spec.UpdateStrategy.RollingUpdate.Partition == nil {
return false
}

return *sts.Spec.UpdateStrategy.RollingUpdate.Partition > 0
}

// applyNewPartition applies a new partition to the StatefulSet,
// subtracting 1 from the current partition.
func (r *StatefulSetPartitionReconciler) applyNewPartition(ctx context.Context, sts *appsv1.StatefulSet) error {
newPartition := *sts.Spec.UpdateStrategy.RollingUpdate.Partition - 1

key := client.ObjectKey{
Namespace: sts.Namespace,
Name: sts.Name,
}

stsApplyCfg := appsv1ac.StatefulSet(sts.Name, sts.Namespace).
WithSpec(appsv1ac.StatefulSetSpec().
WithUpdateStrategy(appsv1ac.StatefulSetUpdateStrategy().
WithType(appsv1.RollingUpdateStatefulSetStrategyType).
WithRollingUpdate(appsv1ac.RollingUpdateStatefulSetStrategy().WithPartition(newPartition)),
),
)

if _, err := apply(ctx, r.Client, key, stsApplyCfg, appsv1ac.ExtractStatefulSet); err != nil {
if errors.Is(err, ErrApplyConfigurationNotChanged) {
return nil
}
return fmt.Errorf("failed to apply new partition to StatefulSet %s/%s: %w", sts.Namespace, sts.Name, err)
}

return nil
}
1 change: 1 addition & 0 deletions pkg/constants/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ const (
AnnSecretVersion = "moco.cybozu.com/secret-version"
AnnClusteringStopped = "moco.cybozu.com/clustering-stopped"
AnnReconciliationStopped = "moco.cybozu.com/reconciliation-stopped"
AnnDisablePartitioning = "moco.cybozu.com/disable-partitioning"
)

// MySQLClusterFinalizer is the finalizer specifier for MySQLCluster.
Expand Down

0 comments on commit 08421b3

Please sign in to comment.