Skip to content

Commit

Permalink
General tidy ups (#260)
Browse files Browse the repository at this point in the history
* Add comments and some var renames in create

* Add comments and some var renames in update

* Add comments and tidy ups with test case to Delete

* Fix TODOs - remove unnecessary checks

* Check that bucket is Ready before making Synced

* Remove validation from test case causing flakes
  • Loading branch information
nolancon authored May 31, 2024
1 parent 48aae79 commit 6ffe331
Show file tree
Hide file tree
Showing 6 changed files with 234 additions and 67 deletions.
2 changes: 0 additions & 2 deletions e2e/tests/stable/chainsaw-test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -943,8 +943,6 @@ spec:
kind: Bucket
metadata:
name: edge-case-bucket
labels:
provider-ceph.crossplane.io/validation-required: "true"
spec:
autoPause: true
forProvider: {}
Expand Down
96 changes: 65 additions & 31 deletions internal/controller/bucket/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"k8s.io/apimachinery/pkg/types"

xpv1 "github.com/crossplane/crossplane-runtime/apis/common/v1"
"github.com/crossplane/crossplane-runtime/pkg/errors"
Expand All @@ -15,7 +14,6 @@ import (
"github.com/crossplane/crossplane-runtime/pkg/resource"

"github.com/linode/provider-ceph/apis/provider-ceph/v1alpha1"
apisv1alpha1 "github.com/linode/provider-ceph/apis/v1alpha1"
"github.com/linode/provider-ceph/internal/consts"
"github.com/linode/provider-ceph/internal/otel/traces"
"github.com/linode/provider-ceph/internal/rgw"
Expand Down Expand Up @@ -82,66 +80,83 @@ func (c *external) Create(ctx context.Context, mg resource.Managed) (managed.Ext
ctx, cancel := context.WithTimeout(ctx, c.operationTimeout)
defer cancel()

// A disabled Bucket CR means we log and return no error as we do not wish to requeue.
if bucket.Spec.Disabled {
c.log.Info("Bucket is disabled - no buckets to be created on backends", consts.KeyBucketName, bucket.Name)

return managed.ExternalCreation{}, nil
}

// If no backends are stored then we return an error in order to requeue until
// backends appear in the store. The backend store is updated by the backend-monitor
// which reconciles ProviderConfig objects representing backends.
if !c.backendStore.BackendsAreStored() {
err := errors.New(errNoS3BackendsStored)
traces.SetAndRecordError(span, err)

return managed.ExternalCreation{}, err
}

// allBackendNames is a list of the names of all backends from backend store which
// are Healthy. These backends can be active or inactive. A backend is marked
// as inactive in the backend store when its ProviderConfig object has been deleted.
// Inactive backends are included in this list so that we can attempt to recreate
// this bucket on those backends should they become active again.
allBackendNames := c.backendStore.GetAllBackendNames(false)
providerNames := getBucketProvidersFilterDisabledLabel(bucket, allBackendNames)

// Create the bucket on each backend in a separate go routine
activeBackends := c.backendStore.GetActiveBackends(providerNames)
if len(activeBackends) == 0 {
// allBackendsToCreateOn is a list of names of all backends on which this S3 bucket
// is to be created. This will either be:
// 1. The list of bucket.Spec.Providers, if specified.
// 2. Otherwise, the allBackendNames list.
// In either case, the list will exclude any backends which have been specified as
// disabled on the Bucket CR. A backend is specified as disabled for a given bucket
// if it has been given the backend label (eg 'provider-ceph.backends.<backend-name>: "false"').
// This means that Provider Ceph will NOT create the bucket on this backend.
allBackendsToCreateOn := getBucketProvidersFilterDisabledLabel(bucket, allBackendNames)

// If none of the backends on which we wish to create the bucket are active then we
// return an error in order to requeue until backends become active.
// Otherwise we do a quick sanity check to see if there are backends
// that the bucket will not be created on and log these backends.
activeBackendsToCreateOn := c.backendStore.GetActiveBackends(allBackendsToCreateOn)
if len(activeBackendsToCreateOn) == 0 {
err := errors.New(errNoActiveS3Backends)
traces.SetAndRecordError(span, err)

return managed.ExternalCreation{}, err
} else if len(activeBackends) != len(providerNames) {
c.log.Info("Missing S3 backends", consts.KeyBucketName, bucket.Name, "missing", utils.MissingStrings(providerNames, allBackendNames))
} else if len(activeBackendsToCreateOn) != len(allBackendsToCreateOn) {
c.log.Info("Bucket will not be created on the following S3 backends", consts.KeyBucketName, bucket.Name, "backends", utils.MissingStrings(allBackendsToCreateOn, allBackendNames))
traces.SetAndRecordError(span, errors.New(errMissingS3Backend))
}

// This value shows a bucket on one backend is already created.
// It is used to prevent goroutines from sending duplicated messages to `readyChan`.
// This value shows a bucket on the given backend is already created.
// It is used to prevent go routines from sending duplicated messages to `readyChan`.
bucketAlreadyCreated := atomic.Bool{}
backendCount := 0
errChan := make(chan error, len(activeBackends))
errChan := make(chan error, len(activeBackendsToCreateOn))
readyChan := make(chan string)

for beName := range activeBackends {
// Now we're ready to start creating S3 buckets on our desired active backends.
for beName := range activeBackendsToCreateOn {
originalBucket := bucket.DeepCopy()

// Attempt to get an S3 client for the backend. This will either be the default
// S3 client created for each backend by the backend monitor or it will be a new
// temporary S3 client created via the STS AssumeRole endpoint. The latter will
// be used if the user has specified an "assume-role-arn" at start-up. If an error
// occurs, continue to the next backend.
cl, err := c.s3ClientHandler.GetS3Client(ctx, bucket, beName)
if err != nil {
traces.SetAndRecordError(span, err)
c.log.Info("Failed to get client for backend - bucket cannot be created on backend", consts.KeyBucketName, originalBucket.Name, consts.KeyBackendName, beName, "error", err.Error())

continue
}

c.log.Info("Creating bucket on backend", consts.KeyBucketName, originalBucket.Name, consts.KeyBackendName, beName)

pc := &apisv1alpha1.ProviderConfig{}
if err := c.kubeClient.Get(ctx, types.NamespacedName{Name: beName}, pc); err != nil {
c.log.Info("Failed to fetch provider config", consts.KeyBucketName, originalBucket.Name, consts.KeyBackendName, beName, "err", err.Error())
err := errors.Wrap(err, errGetPC)
traces.SetAndRecordError(span, err)

return managed.ExternalCreation{}, err
}

// Increment the backend counter. We need this later to know when the operation should finish.
backendCount++

c.log.Info("Creating bucket on backend", consts.KeyBucketName, originalBucket.Name, consts.KeyBackendName, beName)
// Launch a go routine for each backend, creating buckets concurrently.
beName := beName
go func() {
_, err := rgw.CreateBucket(ctx, cl, rgw.BucketToCreateBucketInput(originalBucket))
Expand Down Expand Up @@ -176,16 +191,21 @@ func (c *external) Create(ctx context.Context, mg resource.Managed) (managed.Ext
}()
}

// We couldn't attempt to create a bucket on any backend. We update the bucket CR
// with the relevant labels and return no error as we do not wish to requeue this
// Bucket CR while there are no backends for us to create on.
if backendCount == 0 {
c.log.Info("Failed to find any backend for bucket", consts.KeyBucketName, bucket.Name)

if err := c.updateBucketCR(ctx, bucket, func(_, bucketLatest *v1alpha1.Bucket) UpdateRequired {
// Although no backends were found for the bucket, we still apply the backend
// label to the Bucket CR for each backend that the bucket was intended to be
// created on. This is to ensure the bucket will eventually be created on these
// backends whenever they become active again.
setAllBackendLabels(bucketLatest, providerNames)

setAllBackendLabels(bucketLatest, allBackendsToCreateOn)
// Pause the Bucket CR because there is no backend for it to be created on.
// If a backend for which it was intended becomes active, the health-check
// controller will un-pause the Bucket CR (identifying it by its backend label)
// and it will be re-reconciled.
bucketLatest.Labels[meta.AnnotationKeyReconciliationPaused] = True

return NeedsObjectUpdate
Expand All @@ -200,10 +220,10 @@ func (c *external) Create(ctx context.Context, mg resource.Managed) (managed.Ext
return managed.ExternalCreation{}, nil
}

return c.waitForCreationAndUpdateBucketCR(ctx, bucket, providerNames, readyChan, errChan, backendCount)
return c.waitForCreationAndUpdateBucketCR(ctx, bucket, allBackendsToCreateOn, readyChan, errChan, backendCount)
}

func (c *external) waitForCreationAndUpdateBucketCR(ctx context.Context, bucket *v1alpha1.Bucket, providerNames []string, readyChan <-chan string, errChan <-chan error, backendCount int) (managed.ExternalCreation, error) {
func (c *external) waitForCreationAndUpdateBucketCR(ctx context.Context, bucket *v1alpha1.Bucket, allBackendsToCreateOn []string, readyChan <-chan string, errChan <-chan error, backendCount int) (managed.ExternalCreation, error) {
ctx, span := otel.Tracer("").Start(ctx, "waitForCreationAndUpdateBucketCR")
defer span.End()

Expand All @@ -212,11 +232,20 @@ func (c *external) waitForCreationAndUpdateBucketCR(ctx context.Context, bucket
for i := 0; i < backendCount; i++ {
select {
case <-ctx.Done():
// The create bucket request timed out. Update createErr value, if this is the last error to
// occur for all our backends then it will be the error that is seen in the Bucket CR Status.
c.log.Info("Context timeout waiting for bucket creation", consts.KeyBucketName, bucket.Name)
createErr = ctx.Err()
case beName := <-readyChan:
// This channel receives the value of the backend name that the bucket is first created on.
// We only need the bucket to be created on a single backend for the Bucket CR to be
// considered in the Ready condition. Therefore we update the Bucket CR with:
// 1. The backend labels.
// 2. The Bucket CR Status with the Ready condition.
// 3. The Bucket CR Status Backends with a Ready condition for the backend the bucket
// was created on.
err := c.updateBucketCR(ctx, bucket, func(_, bucketLatest *v1alpha1.Bucket) UpdateRequired {
setAllBackendLabels(bucketLatest, providerNames)
setAllBackendLabels(bucketLatest, allBackendsToCreateOn)

return NeedsObjectUpdate
}, func(_, bucketLatest *v1alpha1.Bucket) UpdateRequired {
Expand All @@ -238,6 +267,9 @@ func (c *external) waitForCreationAndUpdateBucketCR(ctx context.Context, bucket

return managed.ExternalCreation{}, err
case createErr = <-errChan:
// If this channel receives an error it means bucket creation failed on a backend.
// Therefore, we simply log and continue to the next backend as we only need one
// successful creation.
if createErr != nil {
traces.SetAndRecordError(span, createErr)
}
Expand All @@ -247,7 +279,9 @@ func (c *external) waitForCreationAndUpdateBucketCR(ctx context.Context, bucket
}

c.log.Info("Failed to create bucket on any backend", consts.KeyBucketName, bucket.Name)

// Update the Bucket CR Status condition to Unavailable. This means the Bucket CR will
// not be seen as Ready. If that update is successful, we return the createErr which will
// be the most recent error receieved from a backend's failed creation.
if err := c.updateBucketCR(ctx, bucket, func(_, bucketLatest *v1alpha1.Bucket) UpdateRequired {
bucketLatest.Status.SetConditions(xpv1.Unavailable())

Expand Down
21 changes: 14 additions & 7 deletions internal/controller/bucket/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,29 +120,36 @@ func (c *external) Delete(ctx context.Context, mg resource.Managed) error {
return err
}

// If an error occurred during deletion, we must return for requeue.
if deleteErr != nil {
if deleteErr != nil { //nolint:nestif // Multiple checks required.
c.log.Info("Failed to delete bucket on one or more backends", "error", deleteErr.Error())
traces.SetAndRecordError(span, deleteErr)

// If the error is BucketNotEmpty error, the DeleteBucket operation should be failed
// and the client should be able to use the bucket with non-empty buckends.
if errors.Is(deleteErr, rgw.ErrBucketNotEmpty) {
c.log.Info("Cannot delete non-empty bucket - this error will not be requeued", consts.KeyBucketName, bucket.Name)
// An error occurred attempting to delete the bucket because it is not empty.
// If this Delete operation was triggered because the Bucket CR was "Disabled",
// we need to unset this value so as not to continue attempting Delete.
// Otherwise we can return no error as we do not wish to requeue the Delete.
if !bucket.Spec.Disabled {
return nil
}
if err := c.updateBucketCR(ctx, bucket, func(bucketDeepCopy, bucketLatest *v1alpha1.Bucket) UpdateRequired {
c.log.Info("Change 'disabled' flag to false", consts.KeyBucketName, bucket.Name)
c.log.Info("Bucket CRs with non-empty buckets should not be disabled - setting 'disabled' flag to false", consts.KeyBucketName, bucket.Name)

bucketLatest.Spec.Disabled = false

return NeedsObjectUpdate
}); err != nil {
err = errors.Wrap(err, errUpdateBucketCR)
c.log.Info("Failed to change 'disabled' flag to false", consts.KeyBackendName, bucket.Name, "error", err.Error())
c.log.Info("Failed to set 'disabled' flag to false", consts.KeyBucketName, bucket.Name, "error", err.Error())
traces.SetAndRecordError(span, err)

return err
}
}

return nil
}
// In all other cases we should return the deletion error for requeue.
return deleteErr
}

Expand Down
106 changes: 105 additions & 1 deletion internal/controller/bucket/delete_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -682,7 +682,111 @@ func TestDelete(t *testing.T) {
},
},
want: want{
err: rgw.ErrBucketNotEmpty,
err: nil,
statusDiff: func(t *testing.T, mg resource.Managed) {
t.Helper()
bucket, _ := mg.(*v1alpha1.Bucket)

// s3-backend-1 failed so is stuck in Deleting status.
assert.True(t,
bucket.Status.AtProvider.Backends[s3Backend1].BucketCondition.Equal(xpv1.Deleting().WithMessage(fmt.Errorf("%w: %w", rgw.ErrBucketNotEmpty, rgw.BucketNotEmptyError{}).Error())),
"unexpected bucket condition on s3-backend-1")

// s3-backend-2 was successfully deleted so was removed from status.
assert.False(t,
func(b v1alpha1.Backends) bool {
if _, ok := b[s3Backend2]; ok {
return true
}

return false
}(bucket.Status.AtProvider.Backends),
"s3-backend-2 should not exist in backends")

// If backend deletion fails due to BucketNotEmpty error, Disabled flag should be false.
assert.False(t,
bucket.Spec.Disabled,
"Disabled flag should be false",
)
},
finalizerDiff: func(t *testing.T, mg resource.Managed) {
t.Helper()
bucket, _ := mg.(*v1alpha1.Bucket)

assert.Equal(t,
[]string{v1alpha1.InUseFinalizer},
bucket.Finalizers,
"unexpeceted finalizers",
)
},
},
},
"Error deleting disabled bucket because one specified bucket is not empty": {
fields: fields{
backendStore: func() *backendstore.BackendStore {
// DeleteBucket first calls HeadBucket to establish
// if a bucket exists, so return a random error
// to mimic a failed delete.
fakeClient := &backendstorefakes.FakeS3Client{}
fakeClient.HeadBucketReturns(
&s3.HeadBucketOutput{},
nil,
)
fakeClient.DeleteBucketReturns(
nil,
rgw.BucketNotEmptyError{},
)

// DeleteBucket first calls HeadBucket to establish
// if a bucket exists, so return not found
// error to short circuit a successful delete.
var notFoundError *s3types.NotFound
fakeClientOK := &backendstorefakes.FakeS3Client{}
fakeClientOK.HeadBucketReturns(
&s3.HeadBucketOutput{},
notFoundError,
)

bs := backendstore.NewBackendStore()
bs.AddOrUpdateBackend(s3Backend1, fakeClient, nil, true, apisv1alpha1.HealthStatusHealthy)
bs.AddOrUpdateBackend(s3Backend2, fakeClientOK, nil, true, apisv1alpha1.HealthStatusHealthy)

return bs
}(),
},
args: args{
mg: &v1alpha1.Bucket{
ObjectMeta: metav1.ObjectMeta{
Name: "bucket",
Finalizers: []string{v1alpha1.InUseFinalizer},
Labels: map[string]string{
v1alpha1.BackendLabelPrefix + s3Backend1: "true",
v1alpha1.BackendLabelPrefix + s3Backend2: "true",
},
},
Spec: v1alpha1.BucketSpec{
Providers: []string{
s3Backend1,
s3Backend2,
},
Disabled: true,
},
Status: v1alpha1.BucketStatus{
AtProvider: v1alpha1.BucketObservation{
Backends: v1alpha1.Backends{
s3Backend1: &v1alpha1.BackendInfo{
BucketCondition: xpv1.Available(),
},
s3Backend2: &v1alpha1.BackendInfo{
BucketCondition: xpv1.Available(),
},
},
},
},
},
},
want: want{
err: nil,
statusDiff: func(t *testing.T, mg resource.Managed) {
t.Helper()
bucket, _ := mg.(*v1alpha1.Bucket)
Expand Down
6 changes: 4 additions & 2 deletions internal/controller/bucket/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,8 +160,10 @@ func setBucketStatus(bucket *v1alpha1.Bucket, bucketBackends *bucketBackends, pr
bucket.Status.SetConditions(xpv1.Available())
}
// The Bucket CR is considered Synced (ReconcileSuccess) once the bucket is available
// on the lesser of all backends or minimum replicas.
if float64(ok) >= math.Min(float64(len(providerNames)), float64(minReplicas)) {
// on the lesser of all backends or minimum replicas. We also ensure that the overall
// Bucket CR is available (in a Ready state) - this should already be the case.
if float64(ok) >= math.Min(float64(len(providerNames)), float64(minReplicas)) &&
bucket.Status.GetCondition(xpv1.TypeReady).Equal(xpv1.Available()) {
bucket.Status.SetConditions(xpv1.ReconcileSuccess())

return
Expand Down
Loading

0 comments on commit 6ffe331

Please sign in to comment.