diff --git a/pkg/comp-functions/functions/spksmariadb/pvcresize.go b/pkg/comp-functions/functions/spksmariadb/pvcresize.go index b7ff0f263..6f1716bdb 100644 --- a/pkg/comp-functions/functions/spksmariadb/pvcresize.go +++ b/pkg/comp-functions/functions/spksmariadb/pvcresize.go @@ -45,6 +45,67 @@ func ResizeSpksPVCs(ctx context.Context, comp *spksv1alpha1.CompositeMariaDBInst return runtime.NewFatalResult(fmt.Errorf("cannot parse release values from desired release: %w", err)) } + if val, ok := release.GetAnnotations()["crossplane.io/paused"]; ok && val == "true" { + // The release has just been updated and paused and is waiting for the deletion job to finish + // The deletion job should remove the annotation once it's done. + + xJob := &xkubev1.Object{} + err = svc.GetObservedComposedResource(xJob, comp.Name+"-sts-deleter") + if err != nil && err != runtime.ErrNotFound { + return runtime.NewFatalResult(fmt.Errorf("cannot get observed deletion job: %w", err)) + } + // If there's no job observed, we're done here. + if err == runtime.ErrNotFound { + return runtime.NewFatalResult(fmt.Errorf("cannot get observed deletion job, but release is paused: %w", err)) + } + + sts := &appsv1.StatefulSet{} + err = svc.GetObservedKubeObject(sts, comp.Name+"-sts-observer") + if err != nil && err != runtime.ErrNotFound { + return runtime.NewFatalResult(fmt.Errorf("cannot get observed statefulset job: %w", err)) + } + + // If the xkube object has been created it's still possible that the actual job hasn't been observedJob. + observedJob := len(xJob.Status.AtProvider.Manifest.Raw) > 0 + + // Check the sts if it has been updated + stsSize := int64(0) + if len(sts.Spec.VolumeClaimTemplates) > 0 { + stsSize, _ = sts.Spec.VolumeClaimTemplates[0].Spec.Resources.Requests.Storage().AsInt64() + } + newSize, found, err := unstructured.NestedString(values, "persistence", "size") + if !found { + return runtime.NewFatalResult(fmt.Errorf("disk size not found in observed release")) + } + + if err != nil { + return runtime.NewFatalResult(fmt.Errorf("failed to read size from observed release: %w", err)) + } + desiredSize, err := getSizeAsInt(newSize) + if err != nil { + return runtime.NewFatalResult(fmt.Errorf("cannot parse desired size: %w", err)) + } + stsUpdated := stsSize == desiredSize + + deletionJob := &batchv1.Job{} + if observedJob { + err := json.Unmarshal(xJob.Status.AtProvider.Manifest.Raw, deletionJob) + if err != nil { + return runtime.NewFatalResult(fmt.Errorf("cannot unmarshal sts deleter job: %w", err)) + } + } + + // The job hasn't been observed yet, so we need to keep it in desired, or we will have a recreate loop + // Also as long as it hasn't finished we need to make sure it exists. + if (!observedJob || deletionJob.Status.Succeeded < 1) || (sts.Status.ReadyReplicas == 0 && !stsUpdated) { + err := addDeletionJob(svc, comp, newSize, release.GetName()) + if err != nil { + return runtime.NewFatalResult(fmt.Errorf("cannot create RBAC for the deletion job: %w", err)) + } + } + return nil + } + err = addStsObserver(svc, comp) if err != nil { return runtime.NewWarningResult(fmt.Errorf("cannot observe sts: %w", err).Error()) @@ -74,53 +135,17 @@ func ResizeSpksPVCs(ctx context.Context, comp *spksv1alpha1.CompositeMariaDBInst if err != nil { return runtime.NewFatalResult(fmt.Errorf("cannot create the deletion job: %w", err)) } - - return nil - } - - xJob := &xkubev1.Object{} - err = svc.GetObservedComposedResource(xJob, comp.Name+"-sts-deleter") - if err != nil && err != runtime.ErrNotFound { - return runtime.NewFatalResult(fmt.Errorf("cannot get observed deletion job: %w", err)) - } - // If there's no job observed, we're done here. - if err == runtime.ErrNotFound { - return nil - } - - err = svc.GetObservedKubeObject(sts, comp.Name+"-sts-observer") - if err != nil && err != runtime.ErrNotFound { - return runtime.NewFatalResult(fmt.Errorf("cannot get observed statefulset: %w", err)) - } - - // If the xkube object has been created it's still possible that the actual job hasn't been observedJob. - observedJob := len(xJob.Status.AtProvider.Manifest.Raw) > 0 - - // Check the sts if it has been updated - if len(sts.Spec.VolumeClaimTemplates) > 0 { - stsSize, _ = sts.Spec.VolumeClaimTemplates[0].Spec.Resources.Requests.Storage().AsInt64() - } - desiredSize, err := getSizeAsInt(newSize) - if err != nil { - return runtime.NewFatalResult(fmt.Errorf("cannot parse desired size: %w", err)) - } - stsUpdated := stsSize == desiredSize - - deletionJob := &batchv1.Job{} - if observedJob { - err := json.Unmarshal(xJob.Status.AtProvider.Manifest.Raw, deletionJob) + // We pause the release at this point to make sure that provider-helm doesn't update the + // release until the deletion job removed the sts + release.SetAnnotations(map[string]string{ + "crossplane.io/paused": "true", + }) + err = svc.SetDesiredKubeObject(release, mariadbRelease) if err != nil { - return runtime.NewFatalResult(fmt.Errorf("cannot unmarshal sts deleter job: %w", err)) + return runtime.NewFatalResult(fmt.Errorf("Can't pause the release: %w", err)) } - } - // The job hasn't been observed yet, so we need to keep it in desired, or we will have a recreate loop - // Also as long as it hasn't finished we need to make sure it exists. - if (!observedJob || deletionJob.Status.Succeeded < 1) || (sts.Status.ReadyReplicas == 0 && !stsUpdated) { - err := addDeletionJob(svc, comp, newSize, release.GetName()) - if err != nil { - return runtime.NewFatalResult(fmt.Errorf("cannot create the deletion job: %w", err)) - } + return nil } return nil diff --git a/pkg/comp-functions/functions/spksmariadb/script/recreate.sh b/pkg/comp-functions/functions/spksmariadb/script/recreate.sh index 01a215263..971f4884e 100644 --- a/pkg/comp-functions/functions/spksmariadb/script/recreate.sh +++ b/pkg/comp-functions/functions/spksmariadb/script/recreate.sh @@ -20,12 +20,11 @@ if [[ $foundsize != "$size" ]]; then # So if the delete hasn't returned after 5s we forcefully patch away the finalizer. kubectl -n "$namespace" delete sts "$name" --cascade=orphan --ignore-not-found --wait=true --timeout 5s || true kubectl -n "$namespace" patch sts "$name" -p '{"metadata":{"finalizers":null}}' || true - # Poke the release so it tries again to create the sts - # We first set it to garbage to ensure that the release is in an invalid state, we use an invalid state so it doesn't - # actually deploy anything. - # Then we patch the right size to enforce an upgrade - # This is necessary as provider-helm doesn't actually retry failed helm deployments unless the values change. + # Upause the release so that the sts is recreated. We pause the release to avoid provider-helm updating the release + # before the sts is deleted. + # Then we first patch the siye to garbage and afterwards to the right size to enforce an upgrade echo "Triggering sts re-creation" + kubectl annotate release "$release" "crossplane.io/paused-" kubectl patch release "$release" --type merge -p "{\"spec\":{\"forProvider\":{\"values\":{\"persistence\":{\"size\":\"foo\"}}}}}" kubectl patch release "$release" --type merge -p "{\"spec\":{\"forProvider\":{\"values\":{\"persistence\":{\"size\":\"$size\"}}}}}" count=0 diff --git a/pkg/comp-functions/functions/spksredis/pvcresize.go b/pkg/comp-functions/functions/spksredis/pvcresize.go index 3f349b522..f9e38679c 100644 --- a/pkg/comp-functions/functions/spksredis/pvcresize.go +++ b/pkg/comp-functions/functions/spksredis/pvcresize.go @@ -49,6 +49,67 @@ func ResizeSpksPVCs(ctx context.Context, comp *spksv1alpha1.CompositeRedisInstan return runtime.NewFatalResult(fmt.Errorf("cannot parse release values from desired release: %w", err)) } + if val, ok := release.GetAnnotations()["crossplane.io/paused"]; ok && val == "true" { + // The release has just been updated and paused and is waiting for the deletion job to finish + // The deletion job should remove the annotation once it's done. + + xJob := &xkubev1.Object{} + err = svc.GetObservedComposedResource(xJob, comp.Name+"-sts-deleter") + if err != nil && err != runtime.ErrNotFound { + return runtime.NewFatalResult(fmt.Errorf("cannot get observed deletion job: %w", err)) + } + // If there's no job observed, we're done here. + if err == runtime.ErrNotFound { + return runtime.NewFatalResult(fmt.Errorf("cannot get observed deletion job, but release is paused: %w", err)) + } + + sts := &appsv1.StatefulSet{} + err = svc.GetObservedKubeObject(sts, comp.Name+"-sts-observer") + if err != nil && err != runtime.ErrNotFound { + return runtime.NewFatalResult(fmt.Errorf("cannot get observed statefulset job: %w", err)) + } + + // If the xkube object has been created it's still possible that the actual job hasn't been observedJob. + observedJob := len(xJob.Status.AtProvider.Manifest.Raw) > 0 + + // Check the sts if it has been updated + stsSize := int64(0) + if len(sts.Spec.VolumeClaimTemplates) > 0 { + stsSize, _ = sts.Spec.VolumeClaimTemplates[0].Spec.Resources.Requests.Storage().AsInt64() + } + newSize, found, err := unstructured.NestedString(values, replicaKey, "persistence", "size") + if !found { + return runtime.NewFatalResult(fmt.Errorf("disk size not found in observed release")) + } + + if err != nil { + return runtime.NewFatalResult(fmt.Errorf("failed to read size from observed release: %w", err)) + } + desiredSize, err := getSizeAsInt(newSize) + if err != nil { + return runtime.NewFatalResult(fmt.Errorf("cannot parse desired size: %w", err)) + } + stsUpdated := stsSize == desiredSize + + deletionJob := &batchv1.Job{} + if observedJob { + err := json.Unmarshal(xJob.Status.AtProvider.Manifest.Raw, deletionJob) + if err != nil { + return runtime.NewFatalResult(fmt.Errorf("cannot unmarshal sts deleter job: %w", err)) + } + } + + // The job hasn't been observed yet, so we need to keep it in desired, or we will have a recreate loop + // Also as long as it hasn't finished we need to make sure it exists. + if (!observedJob || deletionJob.Status.Succeeded < 1) || (sts.Status.ReadyReplicas == 0 && !stsUpdated) { + err := addDeletionJob(svc, comp, newSize, release.GetName(), replicaKey) + if err != nil { + return runtime.NewFatalResult(fmt.Errorf("cannot create RBAC for the deletion job: %w", err)) + } + } + return nil + } + err = addStsObserver(svc, comp) if err != nil { return runtime.NewWarningResult(fmt.Errorf("cannot observe sts: %w", err).Error()) @@ -62,6 +123,7 @@ func ResizeSpksPVCs(ctx context.Context, comp *spksv1alpha1.CompositeRedisInstan if err != nil { return runtime.NewFatalResult(fmt.Errorf("cannot get observed statefulset: %w", err)) } + stsSize := int64(0) // Check the current size in the sts if len(sts.Spec.VolumeClaimTemplates) > 0 { @@ -78,53 +140,16 @@ func ResizeSpksPVCs(ctx context.Context, comp *spksv1alpha1.CompositeRedisInstan if err != nil { return runtime.NewFatalResult(fmt.Errorf("cannot create the deletion job: %w", err)) } - - return nil - } - - xJob := &xkubev1.Object{} - err = svc.GetObservedComposedResource(xJob, comp.Name+"-sts-deleter") - if err != nil && err != runtime.ErrNotFound { - return runtime.NewFatalResult(fmt.Errorf("cannot get observed deletion job: %w", err)) - } - // If there's no job observed, we're done here. - if err == runtime.ErrNotFound { - return nil - } - - err = svc.GetObservedKubeObject(sts, comp.Name+"-sts-observer") - if err != nil && err != runtime.ErrNotFound { - return runtime.NewFatalResult(fmt.Errorf("cannot get observed statefulset: %w", err)) - } - - // If the xkube object has been created it's still possible that the actual job hasn't been observedJob. - observedJob := len(xJob.Status.AtProvider.Manifest.Raw) > 0 - - // Check the sts if it has been updated - if len(sts.Spec.VolumeClaimTemplates) > 0 { - stsSize, _ = sts.Spec.VolumeClaimTemplates[0].Spec.Resources.Requests.Storage().AsInt64() - } - desiredSize, err := getSizeAsInt(newSize) - if err != nil { - return runtime.NewFatalResult(fmt.Errorf("cannot parse desired size: %w", err)) - } - stsUpdated := stsSize == desiredSize - - deletionJob := &batchv1.Job{} - if observedJob { - err := json.Unmarshal(xJob.Status.AtProvider.Manifest.Raw, deletionJob) - if err != nil { - return runtime.NewFatalResult(fmt.Errorf("cannot unmarshal sts deleter job: %w", err)) - } - } - - // The job hasn't been observed yet, so we need to keep it in desired, or we will have a recreate loop - // Also as long as it hasn't finished we need to make sure it exists. - if (!observedJob || deletionJob.Status.Succeeded < 1) || (sts.Status.ReadyReplicas == 0 && !stsUpdated) { - err := addDeletionJob(svc, comp, newSize, release.GetName(), replicaKey) + // We pause the release at this point to make sure that provider-helm doesn't update the + // release until the deletion job removed the sts + release.SetAnnotations(map[string]string{ + "crossplane.io/paused": "true", + }) + err = svc.SetDesiredKubeObject(release, redisRelease) if err != nil { - return runtime.NewFatalResult(fmt.Errorf("cannot create the deletion job: %w", err)) + return runtime.NewFatalResult(fmt.Errorf("Can't pause the release: %w", err)) } + return nil } return nil diff --git a/pkg/comp-functions/functions/spksredis/script/recreate.sh b/pkg/comp-functions/functions/spksredis/script/recreate.sh index eef787fcb..4739bc465 100644 --- a/pkg/comp-functions/functions/spksredis/script/recreate.sh +++ b/pkg/comp-functions/functions/spksredis/script/recreate.sh @@ -21,12 +21,10 @@ if [[ $foundsize != "$size" ]]; then # So if the delete hasn't returned after 5s we forcefully patch away the finalizer. kubectl -n "$namespace" delete sts "$name" --cascade=orphan --ignore-not-found --wait=true --timeout 5s || true kubectl -n "$namespace" patch sts "$name" -p '{"metadata":{"finalizers":null}}' || true - # Poke the release so it tries again to create the sts - # We first set it to garbage to ensure that the release is in an invalid state, we use an invalid state so it doesn't - # actually deploy anything. - # Then we patch the right size to enforce an upgrade - # This is necessary as provider-helm doesn't actually retry failed helm deployments unless the values change. + # Upause the release so that the sts is recreated. We pause the release to avoid provider-helm updating the release + # before the sts is deleted. echo "Triggering sts re-creation" + kubectl annotate release "$release" "crossplane.io/paused-" kubectl patch release "$release" --type merge -p "{\"spec\":{\"forProvider\":{\"values\":{\"$replica_key\":{\"persistence\":{\"size\":\"foo\"}}}}}}" kubectl patch release "$release" --type merge -p "{\"spec\":{\"forProvider\":{\"values\":{\"$replica_key\":{\"persistence\":{\"size\":\"$size\"}}}}}}" count=0