Skip to content

Commit

Permalink
Merge pull request #2519 from leelavg/5489-default-pool
Browse files Browse the repository at this point in the history
controllers: do not skip cephfs creation in provider mode
  • Loading branch information
openshift-merge-bot[bot] authored Mar 21, 2024
2 parents 45e1370 + 20649de commit 6db2c5d
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 130 deletions.
98 changes: 27 additions & 71 deletions controllers/storagecluster/cephfilesystem.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/klog/v2"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
)
Expand Down Expand Up @@ -46,80 +45,37 @@ func (r *StorageClusterReconciler) newCephFilesystemInstances(initStorageCluster
},
}

// not in provider mode
if !initStorageCluster.Spec.AllowRemoteStorageConsumers {
// standalone deployment that isn't in provider cluster will not
// have storageProfile, we need to define default dataPool, if
// storageProfile is set this will be overridden.
// Use the specified poolSpec, if it is unset then the default poolSpec will be used
ret.Spec.DataPools = []cephv1.NamedPoolSpec{
{
PoolSpec: initStorageCluster.Spec.ManagedResources.CephFilesystems.DataPoolSpec,
},
}
// Use the specified poolSpec, if it is unset then the default poolSpec will be used
ret.Spec.DataPools = []cephv1.NamedPoolSpec{
{
PoolSpec: initStorageCluster.Spec.ManagedResources.CephFilesystems.DataPoolSpec,
},
}

// Append additional pools from specified additional data pools
ret.Spec.DataPools = append(ret.Spec.DataPools, initStorageCluster.Spec.ManagedResources.CephFilesystems.AdditionalDataPools...)
// Append additional pools from specified additional data pools
ret.Spec.DataPools = append(ret.Spec.DataPools, initStorageCluster.Spec.ManagedResources.CephFilesystems.AdditionalDataPools...)

// Iterate over each pool and set default values if necessary
defaultPoolSpec := generateDefaultPoolSpec(initStorageCluster)
for i := range ret.Spec.DataPools {
pool := &ret.Spec.DataPools[i]
// Set default device class if not specified
if pool.PoolSpec.DeviceClass == "" {
pool.PoolSpec.DeviceClass = defaultPoolSpec.DeviceClass
}
// Set default replication settings if not specified
if pool.PoolSpec.Replicated.Size == 0 {
pool.PoolSpec.Replicated.Size = defaultPoolSpec.Replicated.Size
}
if pool.PoolSpec.Replicated.ReplicasPerFailureDomain == 0 {
pool.PoolSpec.Replicated.ReplicasPerFailureDomain = defaultPoolSpec.Replicated.ReplicasPerFailureDomain
}
if pool.PoolSpec.Replicated.TargetSizeRatio == 0 {
pool.PoolSpec.Replicated.TargetSizeRatio = defaultPoolSpec.Replicated.TargetSizeRatio
}
// Set default failure domain if not specified
if pool.PoolSpec.FailureDomain == "" {
pool.PoolSpec.FailureDomain = defaultPoolSpec.FailureDomain
}
// Iterate over each pool and set default values if necessary
defaultPoolSpec := generateDefaultPoolSpec(initStorageCluster)
for i := range ret.Spec.DataPools {
pool := &ret.Spec.DataPools[i]
// Set default device class if not specified
if pool.PoolSpec.DeviceClass == "" {
pool.PoolSpec.DeviceClass = defaultPoolSpec.DeviceClass
}
} else {
// Load all StorageProfile objects in the StorageCluster's namespace
storageProfiles := &ocsv1.StorageProfileList{}
err := r.Client.List(r.ctx, storageProfiles, client.InNamespace(initStorageCluster.GetNamespace()))
if err != nil {
r.Log.Error(err, "unable to list StorageProfile objects")
// Set default replication settings if not specified
if pool.PoolSpec.Replicated.Size == 0 {
pool.PoolSpec.Replicated.Size = defaultPoolSpec.Replicated.Size
}
// set deviceClass and parameters from storageProfile
for i := range storageProfiles.Items {
storageProfile := storageProfiles.Items[i]
spSpec := &storageProfile.Spec
deviceClass := spSpec.DeviceClass
if len(deviceClass) == 0 {
r.Log.Error(nil, "Storage profile has an empty device class. Skipping.", "StorageProfile", klog.KRef(storageProfile.Namespace, storageProfile.Name))
storageProfile.Status.Phase = ocsv1.StorageProfilePhaseRejected
if updateErr := r.Client.Status().Update(r.ctx, &storageProfile); updateErr != nil {
r.Log.Error(updateErr, "Could not update StorageProfile.", "StorageProfile", klog.KRef(storageProfile.Namespace, storageProfile.Name))
return nil, updateErr
}
continue
}
storageProfile.Status.Phase = ""
if updateErr := r.Client.Status().Update(r.ctx, &storageProfile); updateErr != nil {
r.Log.Error(updateErr, "Could not update StorageProfile.", "StorageProfile", klog.KRef(storageProfile.Namespace, storageProfile.Name))
return nil, updateErr
}
parameters := spSpec.SharedFilesystemConfiguration.Parameters
ret.Spec.DataPools = append(ret.Spec.DataPools, cephv1.NamedPoolSpec{
Name: deviceClass,
PoolSpec: cephv1.PoolSpec{
Replicated: generateCephReplicatedSpec(initStorageCluster, "data"),
DeviceClass: deviceClass,
Parameters: parameters,
FailureDomain: initStorageCluster.Status.FailureDomain,
},
})
if pool.PoolSpec.Replicated.ReplicasPerFailureDomain == 0 {
pool.PoolSpec.Replicated.ReplicasPerFailureDomain = defaultPoolSpec.Replicated.ReplicasPerFailureDomain
}
if pool.PoolSpec.Replicated.TargetSizeRatio == 0 {
pool.PoolSpec.Replicated.TargetSizeRatio = defaultPoolSpec.Replicated.TargetSizeRatio
}
// Set default failure domain if not specified
if pool.PoolSpec.FailureDomain == "" {
pool.PoolSpec.FailureDomain = defaultPoolSpec.FailureDomain
}
}

Expand Down
61 changes: 2 additions & 59 deletions controllers/storagecluster/cephfilesystem_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package storagecluster

import (
"context"
"strings"
"testing"

cephv1 "github.com/rook/rook/pkg/apis/ceph.rook.io/v1"
Expand Down Expand Up @@ -43,69 +42,13 @@ func TestCephFileSystem(t *testing.T) {
ProviderAPIServerServiceType: "",
}

t, reconcilerOCSInit, cr, requestOCSInit, requestsStorageProfiles := initStorageClusterResourceCreateUpdateTestProviderMode(
t, reconcilerOCSInit, cr, requestOCSInit, _ := initStorageClusterResourceCreateUpdateTestProviderMode(
t, objects, providerModeSpec, spList, c.remoteStorageConsumers)
if c.createRuntimeObjects {
objects = createUpdateRuntimeObjects(t) //nolint:staticcheck //no need to use objects as they update in runtime
}
if c.remoteStorageConsumers {
assertCephFileSystemProviderMode(t, reconcilerOCSInit, cr, requestOCSInit, requestsStorageProfiles)
} else {
assertCephFileSystem(t, reconcilerOCSInit, cr, requestOCSInit)
}

}
}

func assertCephFileSystemProviderMode(t *testing.T, reconciler StorageClusterReconciler, cr *api.StorageCluster, requestOCSInit reconcile.Request, requestsStorageProfiles []reconcile.Request) {
actualFs := &cephv1.CephFilesystem{
ObjectMeta: metav1.ObjectMeta{
Name: "ocsinit-cephfilesystem",
},
Spec: cephv1.FilesystemSpec{
DataPools: []cephv1.NamedPoolSpec{
{Name: "fast", PoolSpec: cephv1.PoolSpec{DeviceClass: "fast"}},
{Name: "med", PoolSpec: cephv1.PoolSpec{DeviceClass: "med"}},
{Name: "slow", PoolSpec: cephv1.PoolSpec{DeviceClass: "slow"}},
},
},
}
requestOCSInit.Name = "ocsinit-cephfilesystem"
err := reconciler.Client.Get(context.TODO(), requestOCSInit.NamespacedName, actualFs)
assert.NoError(t, err)

storageProfiles := &api.StorageProfileList{}
err = reconciler.Client.List(context.TODO(), storageProfiles)
assert.NoError(t, err)
assert.Equal(t, len(storageProfiles.Items), len(requestsStorageProfiles))
assert.Equal(t, len(storageProfiles.Items)-1, len(actualFs.Spec.DataPools))

expectedCephFS, err := reconciler.newCephFilesystemInstances(cr)
assert.NoError(t, err)
assertCephFileSystem(t, reconcilerOCSInit, cr, requestOCSInit)

assert.Equal(t, len(expectedCephFS[0].OwnerReferences), 1)

assert.Equal(t, expectedCephFS[0].ObjectMeta.Name, actualFs.ObjectMeta.Name)
assert.Equal(t, expectedCephFS[0].Spec, actualFs.Spec)
assert.Equal(t, expectedCephFS[0].Spec.DataPools[0].Name, actualFs.Spec.DataPools[0].Name)
assert.Equal(t, expectedCephFS[0].Spec.DataPools[1].Name, actualFs.Spec.DataPools[1].Name)
assert.Equal(t, expectedCephFS[0].Spec.DataPools[2].Name, actualFs.Spec.DataPools[2].Name)
assert.Equal(t, expectedCephFS[0].Spec.DataPools[0].PoolSpec.DeviceClass, actualFs.Spec.DataPools[0].PoolSpec.DeviceClass)
assert.Equal(t, expectedCephFS[0].Spec.DataPools[1].PoolSpec.DeviceClass, actualFs.Spec.DataPools[1].PoolSpec.DeviceClass)
assert.Equal(t, expectedCephFS[0].Spec.DataPools[2].PoolSpec.DeviceClass, actualFs.Spec.DataPools[2].PoolSpec.DeviceClass)

for i := range requestsStorageProfiles {
actualStorageProfile := &api.StorageProfile{}
requestStorageProfile := requestsStorageProfiles[i]
err = reconciler.Client.Get(context.TODO(), requestStorageProfile.NamespacedName, actualStorageProfile)
assert.NoError(t, err)
assert.Equal(t, requestStorageProfile.Name, actualStorageProfile.Name)

phaseStorageProfile := api.StorageProfilePhase("")
if strings.Contains(requestStorageProfile.Name, "blank") {
phaseStorageProfile = api.StorageProfilePhaseRejected
}
assert.Equal(t, phaseStorageProfile, actualStorageProfile.Status.Phase)
}
}

Expand Down

0 comments on commit 6db2c5d

Please sign in to comment.