Skip to content

Commit

Permalink
Separate backend health checks for other sub resource clients
Browse files Browse the repository at this point in the history
  • Loading branch information
nolancon authored and cnolan committed Nov 1, 2024
1 parent 7d1a0f8 commit 053f989
Show file tree
Hide file tree
Showing 7 changed files with 126 additions and 64 deletions.
23 changes: 15 additions & 8 deletions internal/controller/bucket/acl.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,15 @@ func (l *ACLClient) Observe(ctx context.Context, bucket *v1alpha1.Bucket, backen
for _, backendName := range backendNames {
beName := backendName
go func() {
if l.backendStore.GetBackendHealthStatus(backendName) == apisv1alpha1.HealthStatusUnhealthy {
// If a backend is marked as unhealthy, we can ignore it for now by returning NoAction.
// The backend may be down for some time and we do not want to block Create/Update/Delete
// calls on other backends. By returning NeedsUpdate here, we would never pass the Observe
// phase until the backend becomes Healthy or Disabled.
observationChan <- NoAction

return
}
observationChan <- l.observeBackend(bucket, beName)
}()
}
Expand All @@ -57,14 +66,6 @@ func (l *ACLClient) Observe(ctx context.Context, bucket *v1alpha1.Bucket, backen
func (l *ACLClient) observeBackend(bucket *v1alpha1.Bucket, backendName string) ResourceStatus {
l.log.Debug("Observing subresource acl on backend", consts.KeyBucketName, bucket.Name, consts.KeyBackendName, backendName)

if l.backendStore.GetBackendHealthStatus(backendName) == apisv1alpha1.HealthStatusUnhealthy {
// If a backend is marked as unhealthy, we can ignore it for now by returning Updated.
// The backend may be down for some time and we do not want to block Create/Update/Delete
// calls on other backends. By returning NeedsUpdate here, we would never pass the Observe
// phase until the backend becomes Healthy or Disabled.
return Updated
}

// If your bucket uses the bucket owner enforced setting for S3 Object
// Ownership, ACLs are disabled and no longer affect permissions.
if s3types.ObjectOwnership(aws.ToString(bucket.Spec.ForProvider.ObjectOwnership)) == s3types.ObjectOwnershipBucketOwnerEnforced {
Expand Down Expand Up @@ -92,6 +93,12 @@ func (l *ACLClient) Handle(ctx context.Context, b *v1alpha1.Bucket, backendName
ctx, span := otel.Tracer("").Start(ctx, "bucket.ACLClient.Handle")
defer span.End()

if l.backendStore.GetBackendHealthStatus(backendName) == apisv1alpha1.HealthStatusUnhealthy {
traces.SetAndRecordError(span, errUnhealthyBackend)

return errUnhealthyBackend
}

switch l.observeBackend(b, backendName) {
case NoAction, Updated:
return nil
Expand Down
24 changes: 16 additions & 8 deletions internal/controller/bucket/objectlockconfiguration.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,16 @@ func (l *ObjectLockConfigurationClient) Observe(ctx context.Context, bucket *v1a
for _, backendName := range backendNames {
beName := backendName
go func() {
if l.backendStore.GetBackendHealthStatus(backendName) == apisv1alpha1.HealthStatusUnhealthy {
// If a backend is marked as unhealthy, we can ignore it for now by returning NoAction.
// The backend may be down for some time and we do not want to block Create/Update/Delete
// calls on other backends. By returning NeedsUpdate here, we would never pass the Observe
// phase until the backend becomes Healthy or Disabled.
observationChan <- NoAction

return
}

observation, err := l.observeBackend(ctx, bucket, beName)
if err != nil {
errChan <- err
Expand Down Expand Up @@ -88,14 +98,6 @@ func (l *ObjectLockConfigurationClient) Observe(ctx context.Context, bucket *v1a
func (l *ObjectLockConfigurationClient) observeBackend(ctx context.Context, bucket *v1alpha1.Bucket, backendName string) (ResourceStatus, error) {
l.log.Debug("Observing subresource object lock configuration on backend", consts.KeyBucketName, bucket.Name, consts.KeyBackendName, backendName)

if l.backendStore.GetBackendHealthStatus(backendName) == apisv1alpha1.HealthStatusUnhealthy {
// If a backend is marked as unhealthy, we can ignore it for now by returning NoAction.
// The backend may be down for some time and we do not want to block Create/Update/Delete
// calls on other backends. By returning NeedsUpdate here, we would never pass the Observe
// phase until the backend becomes Healthy or Disabled.
return NoAction, nil
}

s3Client, err := l.s3ClientHandler.GetS3Client(ctx, bucket, backendName)
if err != nil {
return NeedsUpdate, err
Expand Down Expand Up @@ -129,6 +131,12 @@ func (l *ObjectLockConfigurationClient) Handle(ctx context.Context, b *v1alpha1.
return nil
}

if l.backendStore.GetBackendHealthStatus(backendName) == apisv1alpha1.HealthStatusUnhealthy {
traces.SetAndRecordError(span, errUnhealthyBackend)

return errUnhealthyBackend
}

observation, err := l.observeBackend(ctx, b, backendName)
if err != nil {
err = errors.Wrap(err, errHandleVersioningConfig)
Expand Down
66 changes: 35 additions & 31 deletions internal/controller/bucket/objectlockconfiguration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,37 +61,6 @@ func TestObjectLockConfigObserveBackend(t *testing.T) {
args args
want want
}{
"Attempt to observe object lock config on unhealthy backend (consider it NoAction to unblock)": {
fields: fields{
backendStore: func() *backendstore.BackendStore {
fake := backendstorefakes.FakeS3Client{}

bs := backendstore.NewBackendStore()
bs.AddOrUpdateBackend("s3-backend-1", &fake, nil, true, apisv1alpha1.HealthStatusUnhealthy)

return bs
}(),
},
args: args{
bucket: &v1alpha1.Bucket{
ObjectMeta: metav1.ObjectMeta{
Name: "bucket",
},
Spec: v1alpha1.BucketSpec{
ForProvider: v1alpha1.BucketParameters{
ObjectLockConfiguration: &v1alpha1.ObjectLockConfiguration{
ObjectLockEnabled: &objLockEnabled,
},
},
},
},
backendName: "s3-backend-1",
},
want: want{
status: NoAction,
err: nil,
},
},
"External error getting object lock": {
fields: fields{
backendStore: func() *backendstore.BackendStore {
Expand Down Expand Up @@ -265,6 +234,41 @@ func TestObjectLockConfigurationHandle(t *testing.T) {
args args
want want
}{
"Unhealthy backend": {
fields: fields{
backendStore: func() *backendstore.BackendStore {
fake := backendstorefakes.FakeS3Client{}
bs := backendstore.NewBackendStore()
bs.AddOrUpdateBackend(beName, &fake, nil, true, apisv1alpha1.HealthStatusUnhealthy)

return bs
}(),
},
args: args{
bucket: &v1alpha1.Bucket{
ObjectMeta: metav1.ObjectMeta{
Name: bucketName,
},
Spec: v1alpha1.BucketSpec{
ForProvider: v1alpha1.BucketParameters{
ObjectLockEnabledForBucket: &enabledTrue,
ObjectLockConfiguration: &v1alpha1.ObjectLockConfiguration{
ObjectLockEnabled: &objLockEnabled,
Rule: &v1alpha1.ObjectLockRule{
DefaultRetention: &v1alpha1.DefaultRetention{
Mode: v1alpha1.ModeGovernance,
},
},
},
},
},
},
backendName: beName,
},
want: want{
err: errUnhealthyBackend,
},
},
"Object lock is not enabled for Bucket CR - nil value": {
fields: fields{
backendStore: func() *backendstore.BackendStore {
Expand Down
24 changes: 16 additions & 8 deletions internal/controller/bucket/policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,16 @@ func (p *PolicyClient) Observe(ctx context.Context, bucket *v1alpha1.Bucket, bac
for _, backendName := range backendNames {
beName := backendName
go func() {
if p.backendStore.GetBackendHealthStatus(backendName) == apisv1alpha1.HealthStatusUnhealthy {
// If a backend is marked as unhealthy, we can ignore it for now by returning NoAction.
// The backend may be down for some time and we do not want to block Create/Update/Delete
// calls on other backends. By returning NeedsUpdate here, we would never pass the Observe
// phase until the backend becomes Healthy or Disabled.
observationChan <- NoAction

return
}

observation, err := p.observeBackend(ctx, bucket, beName)
if err != nil {
errChan <- err
Expand Down Expand Up @@ -77,14 +87,6 @@ func (p *PolicyClient) Observe(ctx context.Context, bucket *v1alpha1.Bucket, bac
func (p *PolicyClient) observeBackend(ctx context.Context, bucket *v1alpha1.Bucket, backendName string) (ResourceStatus, error) {
p.log.Debug("Observing subresource policy on backend", consts.KeyBucketName, bucket.Name, consts.KeyBackendName, backendName)

if p.backendStore.GetBackendHealthStatus(backendName) == apisv1alpha1.HealthStatusUnhealthy {
// If a backend is marked as unhealthy, we can ignore it for now by returning Updated.
// The backend may be down for some time and we do not want to block Create/Update/Delete
// calls on other backends. By returning NeedsUpdate here, we would never pass the Observe
// phase until the backend becomes Healthy or Disabled.
return Updated, nil
}

s3Client, err := p.s3ClientHandler.GetS3Client(ctx, bucket, backendName)
if err != nil {
return NeedsUpdate, err
Expand Down Expand Up @@ -131,6 +133,12 @@ func (p *PolicyClient) Handle(ctx context.Context, b *v1alpha1.Bucket, backendNa
ctx, span := otel.Tracer("").Start(ctx, "bucket.PolicyClient.Handle")
defer span.End()

if p.backendStore.GetBackendHealthStatus(backendName) == apisv1alpha1.HealthStatusUnhealthy {
traces.SetAndRecordError(span, errUnhealthyBackend)

return errUnhealthyBackend
}

observation, err := p.observeBackend(ctx, b, backendName)
if err != nil {
err = errors.Wrap(err, errHandlePolicy)
Expand Down
1 change: 0 additions & 1 deletion internal/controller/bucket/policy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ func TestPolicyObserveBackend(t *testing.T) {
status: Updated,
},
},

"s3 error": {
fields: fields{
backendStore: func() *backendstore.BackendStore {
Expand Down
25 changes: 17 additions & 8 deletions internal/controller/bucket/versioningconfiguration.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,16 @@ func (l *VersioningConfigurationClient) Observe(ctx context.Context, bucket *v1a
for _, backendName := range backendNames {
beName := backendName
go func() {
if l.backendStore.GetBackendHealthStatus(backendName) == apisv1alpha1.HealthStatusUnhealthy {
// If a backend is marked as unhealthy, we can ignore it for now by returning NoAction.
// The backend may be down for some time and we do not want to block Create/Update/Delete
// calls on other backends. By returning NeedsUpdate here, we would never pass the Observe
// phase until the backend becomes Healthy or Disabled.
observationChan <- NoAction

return
}

observation, err := l.observeBackend(ctx, bucket, beName)
if err != nil {
errChan <- err
Expand Down Expand Up @@ -83,18 +93,11 @@ func (l *VersioningConfigurationClient) Observe(ctx context.Context, bucket *v1a
func (l *VersioningConfigurationClient) observeBackend(ctx context.Context, bucket *v1alpha1.Bucket, backendName string) (ResourceStatus, error) {
l.log.Debug("Observing subresource versioning configuration on backend", consts.KeyBucketName, bucket.Name, consts.KeyBackendName, backendName)

if l.backendStore.GetBackendHealthStatus(backendName) == apisv1alpha1.HealthStatusUnhealthy {
// If a backend is marked as unhealthy, we can ignore it for now by returning NoAction.
// The backend may be down for some time and we do not want to block Create/Update/Delete
// calls on other backends. By returning NeedsUpdate here, we would never pass the Observe
// phase until the backend becomes Healthy or Disabled.
return NoAction, nil
}

s3Client, err := l.s3ClientHandler.GetS3Client(ctx, bucket, backendName)
if err != nil {
return NeedsUpdate, err
}

response, err := rgw.GetBucketVersioning(ctx, s3Client, aws.String(bucket.Name))
if err != nil {
return NeedsUpdate, err
Expand Down Expand Up @@ -144,6 +147,12 @@ func (l *VersioningConfigurationClient) Handle(ctx context.Context, b *v1alpha1.
ctx, span := otel.Tracer("").Start(ctx, "bucket.VersioningConfigurationClient.Handle")
defer span.End()

if l.backendStore.GetBackendHealthStatus(backendName) == apisv1alpha1.HealthStatusUnhealthy {
traces.SetAndRecordError(span, errUnhealthyBackend)

return errUnhealthyBackend
}

observation, err := l.observeBackend(ctx, b, backendName)
if err != nil {
err = errors.Wrap(err, errHandleVersioningConfig)
Expand Down
27 changes: 27 additions & 0 deletions internal/controller/bucket/versioningconfiguration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,33 @@ func TestVersioningConfigurationHandle(t *testing.T) {
args args
want want
}{
"Unhealthy backend": {
fields: fields{
backendStore: func() *backendstore.BackendStore {
fake := backendstorefakes.FakeS3Client{}
bs := backendstore.NewBackendStore()
bs.AddOrUpdateBackend("s3-backend-1", &fake, nil, true, apisv1alpha1.HealthStatusUnhealthy)

return bs
}(),
},
args: args{
bucket: &v1alpha1.Bucket{
ObjectMeta: metav1.ObjectMeta{
Name: bucketName,
},
Spec: v1alpha1.BucketSpec{
ForProvider: v1alpha1.BucketParameters{
ObjectLockEnabledForBucket: &enabledTrue,
},
},
},
backendName: beName,
},
want: want{
err: errUnhealthyBackend,
},
},
"Object lock enabled for bucket but no versioning config so set default enabled versioning": {
fields: fields{
backendStore: func() *backendstore.BackendStore {
Expand Down

0 comments on commit 053f989

Please sign in to comment.