From 8582509c7dd7d2a8567e4cf0bc523607f54d37f5 Mon Sep 17 00:00:00 2001 From: d-kuro Date: Wed, 27 Mar 2024 13:41:42 +0900 Subject: [PATCH] wip --- api/v1beta2/statefulset_webhhok.go | 93 +++++++++ api/v1beta2/zz_generated.deepcopy.go | 15 ++ .../moco/templates/generated/generated.yaml | 26 +++ cmd/moco-controller/cmd/run.go | 14 ++ config/rbac/role.yaml | 6 + config/webhook/manifests.yaml | 20 ++ controllers/partition_controller.go | 183 ++++++++++++++++++ pkg/constants/meta.go | 1 + 8 files changed, 358 insertions(+) create mode 100644 api/v1beta2/statefulset_webhhok.go create mode 100644 controllers/partition_controller.go diff --git a/api/v1beta2/statefulset_webhhok.go b/api/v1beta2/statefulset_webhhok.go new file mode 100644 index 000000000..bafa9396d --- /dev/null +++ b/api/v1beta2/statefulset_webhhok.go @@ -0,0 +1,93 @@ +package v1beta2 + +import ( + "context" + "fmt" + + "github.com/cybozu-go/moco/pkg/constants" + admissionv1 "k8s.io/api/admission/v1" + appsv1 "k8s.io/api/apps/v1" + "k8s.io/apimachinery/pkg/api/equality" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/utils/ptr" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/webhook/admission" +) + +func SetupStatefulSetWebhookWithManager(mgr ctrl.Manager) error { + return ctrl.NewWebhookManagedBy(mgr). + For(&appsv1.StatefulSet{}). + WithDefaulter(&StatefulSetDefaulter{}). + Complete() +} + +//+kubebuilder:webhook:path=/mutate-apps-v1-statefulset,mutating=true,failurePolicy=fail,sideEffects=None,groups=apps,resources=statefulsets,verbs=create;update,versions=v1,name=statefulset.kb.io,admissionReviewVersions=v1 + +type StatefulSetDefaulter struct{} + +var _ admission.CustomDefaulter = &StatefulSetDefaulter{} + +// Default implements webhook.Defaulter so a webhook will be registered for the type +func (*StatefulSetDefaulter) Default(ctx context.Context, obj runtime.Object) error { + sts, ok := obj.(*appsv1.StatefulSet) + if !ok { + return fmt.Errorf("unknown obj type %T", obj) + } + + req, err := admission.RequestFromContext(ctx) + if err != nil { + return fmt.Errorf("failed to get admission request from context: %w", err) + } + + if sts.Annotations[constants.AnnForceRollingUpdate] == "true" && + sts.Spec.UpdateStrategy.RollingUpdate != nil && + sts.Spec.UpdateStrategy.RollingUpdate.Partition != nil { + sts.Spec.UpdateStrategy.RollingUpdate = nil + return nil + } + + if sts.Spec.UpdateStrategy.RollingUpdate == nil || sts.Spec.UpdateStrategy.RollingUpdate.Partition == nil { + sts.Spec.UpdateStrategy.RollingUpdate = &appsv1.RollingUpdateStatefulSetStrategy{ + Partition: ptr.To[int32](*sts.Spec.Replicas), + } + return nil + } + + if req.Operation != admissionv1.Update { + return nil + } + + oldSts, err := readStatefulSet(req.OldObject.Raw) + if err != nil { + return fmt.Errorf("failed to read old statefulset: %w", err) + } + + partition := *sts.Spec.UpdateStrategy.RollingUpdate.Partition + oldPartition := *oldSts.Spec.UpdateStrategy.RollingUpdate.Partition + + newSts := sts.DeepCopy() + newSts.Spec.UpdateStrategy = oldSts.Spec.UpdateStrategy + + if partition != oldPartition && equality.Semantic.DeepEqual(newSts, oldSts) { + return nil + } + + sts.Spec.UpdateStrategy.RollingUpdate = &appsv1.RollingUpdateStatefulSetStrategy{ + Partition: ptr.To[int32](partition), + } + + return nil +} + +func readStatefulSet(raw []byte) (*appsv1.StatefulSet, error) { + var sts appsv1.StatefulSet + + if _, _, err := unstructured.UnstructuredJSONScheme.Decode(raw, nil, &sts); err != nil { + return nil, err + } + + sts.TypeMeta.APIVersion = appsv1.SchemeGroupVersion.Group + "/" + appsv1.SchemeGroupVersion.Version + + return &sts, nil +} diff --git a/api/v1beta2/zz_generated.deepcopy.go b/api/v1beta2/zz_generated.deepcopy.go index a1f5cfc55..b12bdec49 100644 --- a/api/v1beta2/zz_generated.deepcopy.go +++ b/api/v1beta2/zz_generated.deepcopy.go @@ -559,6 +559,21 @@ func (in *ServiceTemplate) DeepCopy() *ServiceTemplate { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *StatefulSetDefaulter) DeepCopyInto(out *StatefulSetDefaulter) { + *out = *in +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new StatefulSetDefaulter. +func (in *StatefulSetDefaulter) DeepCopy() *StatefulSetDefaulter { + if in == nil { + return nil + } + out := new(StatefulSetDefaulter) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *VolumeApplyConfiguration) DeepCopyInto(out *VolumeApplyConfiguration) { clone := in.DeepCopy() diff --git a/charts/moco/templates/generated/generated.yaml b/charts/moco/templates/generated/generated.yaml index 1332da753..999d29635 100644 --- a/charts/moco/templates/generated/generated.yaml +++ b/charts/moco/templates/generated/generated.yaml @@ -155,6 +155,12 @@ rules: - patch - update - watch + - apiGroups: + - "" + resources: + - pods/status + verbs: + - get - apiGroups: - "" resources: @@ -460,6 +466,26 @@ webhooks: resources: - mysqlclusters sideEffects: None + - admissionReviewVersions: + - v1 + clientConfig: + service: + name: moco-webhook-service + namespace: '{{ .Release.Namespace }}' + path: /mutate-apps-v1-statefulset + failurePolicy: Fail + name: statefulset.kb.io + rules: + - apiGroups: + - apps + apiVersions: + - v1 + operations: + - CREATE + - UPDATE + resources: + - statefulsets + sideEffects: None --- apiVersion: admissionregistration.k8s.io/v1 kind: ValidatingWebhookConfiguration diff --git a/cmd/moco-controller/cmd/run.go b/cmd/moco-controller/cmd/run.go index 7e645b8d4..fa01dcfb1 100644 --- a/cmd/moco-controller/cmd/run.go +++ b/cmd/moco-controller/cmd/run.go @@ -113,6 +113,15 @@ func subMain(ns, addr string, port int) error { return err } + if err = (&controllers.StatefulSetPartitionReconciler{ + Client: mgr.GetClient(), + Recorder: mgr.GetEventRecorderFor("moco-controller"), + MaxConcurrentReconciles: config.maxConcurrentReconciles, + }).SetupWithManager(mgr); err != nil { + setupLog.Error(err, "unable to create controller", "controller", "Partition") + return err + } + if err = (&controllers.PodWatcher{ Client: mgr.GetClient(), ClusterManager: clusterMgr, @@ -132,6 +141,11 @@ func subMain(ns, addr string, port int) error { return err } + if err := mocov1beta2.SetupStatefulSetWebhookWithManager(mgr); err != nil { + setupLog.Error(err, "unable to setup webhook", "webhook", "StatefulSet") + return err + } + if err := mgr.AddHealthzCheck("health", healthz.Ping); err != nil { setupLog.Error(err, "unable to set up health check") return err diff --git a/config/rbac/role.yaml b/config/rbac/role.yaml index 753de4d32..52b5ee731 100644 --- a/config/rbac/role.yaml +++ b/config/rbac/role.yaml @@ -50,6 +50,12 @@ rules: - patch - update - watch +- apiGroups: + - "" + resources: + - pods/status + verbs: + - get - apiGroups: - "" resources: diff --git a/config/webhook/manifests.yaml b/config/webhook/manifests.yaml index 3d9a7003b..dacf05106 100644 --- a/config/webhook/manifests.yaml +++ b/config/webhook/manifests.yaml @@ -24,6 +24,26 @@ webhooks: resources: - mysqlclusters sideEffects: None +- admissionReviewVersions: + - v1 + clientConfig: + service: + name: webhook-service + namespace: system + path: /mutate-apps-v1-statefulset + failurePolicy: Fail + name: statefulset.kb.io + rules: + - apiGroups: + - apps + apiVersions: + - v1 + operations: + - CREATE + - UPDATE + resources: + - statefulsets + sideEffects: None --- apiVersion: admissionregistration.k8s.io/v1 kind: ValidatingWebhookConfiguration diff --git a/controllers/partition_controller.go b/controllers/partition_controller.go new file mode 100644 index 000000000..a4fee62c3 --- /dev/null +++ b/controllers/partition_controller.go @@ -0,0 +1,183 @@ +package controllers + +import ( + "context" + "errors" + "fmt" + + mocov1beta2 "github.com/cybozu-go/moco/api/v1beta2" + "github.com/cybozu-go/moco/pkg/constants" + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/types" + appsv1ac "k8s.io/client-go/applyconfigurations/apps/v1" + "k8s.io/client-go/tools/record" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" + crlog "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/controller-runtime/pkg/reconcile" +) + +var _ reconcile.Reconciler = &StatefulSetPartitionReconciler{} + +// StatefulSetPartitionReconciler reconciles a StatefulSet object +type StatefulSetPartitionReconciler struct { + client.Client + Recorder record.EventRecorder + MaxConcurrentReconciles int +} + +//+kubebuilder:rbac:groups=moco.cybozu.com,resources=mysqlclusters,verbs=get;list;watch +//+kubebuilder:rbac:groups=apps,resources=statefulsets,verbs=get;list;watch;update;patch +//+kubebuilder:rbac:groups=apps,resources=statefulsets/status,verbs=get +//+kubebuilder:rbac:groups="",resources=pods,verbs=get;list;watch +//+kubebuilder:rbac:groups="",resources=pods/status,verbs=get +//+kubebuilder:rbac:groups="",resources=events,verbs=create;update;patch + +// Reconcile implements Reconciler interface. +// See https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.8.3/pkg/reconcile#Reconciler +func (r *StatefulSetPartitionReconciler) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) { + log := crlog.FromContext(ctx) + + sts := &appsv1.StatefulSet{} + err := r.Get(ctx, req.NamespacedName, sts) + if err != nil { + if apierrors.IsNotFound(err) { + return reconcile.Result{}, nil + } + log.Error(err, "unable to fetch StatefulSet", "name", req.NamespacedName.Name, "namespace", req.NamespacedName.Namespace) + return reconcile.Result{}, err + } + + if !r.needPartitionUpdate(sts) { + return reconcile.Result{}, nil + } + + if r.isStatefulSetRolloutComplete(sts) { + return reconcile.Result{}, nil + } + + ready, err := r.isRolloutReady(ctx, sts) + if err != nil { + log.Error(err, "failed to check if rollout is ready", "name", req.NamespacedName.Name, "namespace", req.NamespacedName.Namespace) + return reconcile.Result{}, err + } + if !ready { + log.Info("rollout is not ready", "name", req.NamespacedName.Name, "namespace", req.NamespacedName.Namespace) + } + + if err := r.patchNewPartition(ctx, sts); err != nil { + log.Error(err, "failed to apply new partition", "name", req.NamespacedName.Name, "namespace", req.NamespacedName.Namespace) + return reconcile.Result{}, err + } + + return reconcile.Result{}, nil +} + +func (r *StatefulSetPartitionReconciler) SetupWithManager(mgr ctrl.Manager) error { + return ctrl.NewControllerManagedBy(mgr). + For(&appsv1.StatefulSet{}). + Owns(&corev1.Pod{}). + WithOptions( + controller.Options{MaxConcurrentReconciles: r.MaxConcurrentReconciles}, + ). + Complete(r) +} + +// isRolloutReady returns true if the StatefulSet is ready for rolling update. +func (r *StatefulSetPartitionReconciler) isRolloutReady(ctx context.Context, sts *appsv1.StatefulSet) (bool, error) { + cluster, err := r.getMySQLCluster(ctx, sts) + if err != nil { + return false, fmt.Errorf("failed to get MySQLCluster: %w", err) + } + + if !r.isMySQLClusterHealthy(cluster) { + return false, nil + } + + return false, nil +} + +// isMySQLClusterHealthy checks the health status of a given MySQLCluster. +func (r *StatefulSetPartitionReconciler) isMySQLClusterHealthy(cluster *mocov1beta2.MySQLCluster) bool { + return meta.IsStatusConditionTrue(cluster.Status.Conditions, mocov1beta2.ConditionHealthy) +} + +// getMySQLCluster retrieves the MySQLCluster release that owns a given StatefulSet. +func (r *StatefulSetPartitionReconciler) getMySQLCluster(ctx context.Context, sts *appsv1.StatefulSet) (*mocov1beta2.MySQLCluster, error) { + for _, ownerRef := range sts.GetOwnerReferences() { + if ownerRef.Kind != "MySQLCluster" { + continue + } + + cluster := &mocov1beta2.MySQLCluster{} + if err := r.Get(ctx, types.NamespacedName{Name: ownerRef.Name, Namespace: sts.Namespace}, cluster); err != nil { + return nil, err + } + + return cluster, nil + } + + return nil, fmt.Errorf("StatefulSet %s/%s has no owner reference to MySQLCluster", sts.Namespace, sts.Name) +} + +// isStatefulSetRolloutComplete returns true if the StatefulSet is update completed. +func (r *StatefulSetPartitionReconciler) isStatefulSetRolloutComplete(sts *appsv1.StatefulSet) bool { + return sts.Status.CurrentRevision == sts.Status.UpdateRevision +} + +// needPartitionUpdate returns true if the StatefulSet needs to update partition. +func (r *StatefulSetPartitionReconciler) needPartitionUpdate(sts *appsv1.StatefulSet) bool { + if sts.Annotations[constants.AnnForceRollingUpdate] == "true" { + return false + } + if sts.Spec.UpdateStrategy.RollingUpdate == nil || sts.Spec.UpdateStrategy.RollingUpdate.Partition == nil { + return false + } + + return *sts.Spec.UpdateStrategy.RollingUpdate.Partition > 0 +} + +// applyNewPartition applies a new partition to the StatefulSet, +// subtracting 1 from the current partition. +func (r *StatefulSetPartitionReconciler) applyNewPartition(ctx context.Context, sts *appsv1.StatefulSet) error { + newPartition := *sts.Spec.UpdateStrategy.RollingUpdate.Partition - 1 + + key := client.ObjectKey{ + Namespace: sts.Namespace, + Name: sts.Name, + } + + stsApplyCfg := appsv1ac.StatefulSet(sts.Name, sts.Namespace). + WithSpec(appsv1ac.StatefulSetSpec(). + WithUpdateStrategy(appsv1ac.StatefulSetUpdateStrategy(). + WithType(appsv1.RollingUpdateStatefulSetStrategyType). + WithRollingUpdate(appsv1ac.RollingUpdateStatefulSetStrategy().WithPartition(newPartition)), + ), + ) + + if _, err := apply(ctx, r.Client, key, stsApplyCfg, appsv1ac.ExtractStatefulSet); err != nil { + if errors.Is(err, ErrApplyConfigurationNotChanged) { + return nil + } + return fmt.Errorf("failed to apply new partition to StatefulSet %s/%s: %w", sts.Namespace, sts.Name, err) + } + + return nil +} + +func (r *StatefulSetPartitionReconciler) patchNewPartition(ctx context.Context, sts *appsv1.StatefulSet) error { + newPartition := *sts.Spec.UpdateStrategy.RollingUpdate.Partition - 1 + + patch := client.MergeFrom(sts.DeepCopy()) + sts.Spec.UpdateStrategy.RollingUpdate.Partition = &newPartition + + if err := r.Client.Patch(ctx, sts, patch); err != nil { + return fmt.Errorf("failed to patch new partition to StatefulSet %s/%s: %w", sts.Namespace, sts.Name, err) + } + + return nil +} diff --git a/pkg/constants/meta.go b/pkg/constants/meta.go index 061284ee7..f7cabf8e6 100644 --- a/pkg/constants/meta.go +++ b/pkg/constants/meta.go @@ -21,6 +21,7 @@ const ( AnnSecretVersion = "moco.cybozu.com/secret-version" AnnClusteringStopped = "moco.cybozu.com/clustering-stopped" AnnReconciliationStopped = "moco.cybozu.com/reconciliation-stopped" + AnnForceRollingUpdate = "moco.cybozu.com/force-rolling-update" ) // MySQLClusterFinalizer is the finalizer specifier for MySQLCluster.