Skip to content

Commit

Permalink
remove creation of VRC for ODF internal mode
Browse files Browse the repository at this point in the history
Signed-off-by: Umanga Chapagain <[email protected]>
  • Loading branch information
umangachapagain committed Feb 26, 2025
1 parent 28b3288 commit 902a1c0
Show file tree
Hide file tree
Showing 2 changed files with 0 additions and 213 deletions.
196 changes: 0 additions & 196 deletions controllers/drpolicy_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,21 @@ package controllers

import (
"context"
"encoding/json"
"fmt"
"log/slog"
"time"

"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

replicationv1alpha1 "github.com/csi-addons/kubernetes-csi-addons/apis/replication.storage/v1alpha1"
ramenv1alpha1 "github.com/ramendr/ramen/api/v1alpha1"
multiclusterv1alpha1 "github.com/red-hat-storage/odf-multicluster-orchestrator/api/v1alpha1"
"github.com/red-hat-storage/odf-multicluster-orchestrator/controllers/utils"
corev1 "k8s.io/api/core/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
workv1 "open-cluster-management.io/api/work/v1"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -162,197 +157,6 @@ func (r *DRPolicyReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
return ctrl.Result{}, nil
}

if mirrorPeer.Spec.Type == multiclusterv1alpha1.Async {
logger.Info("Fetching Cluster StorageIDs")
clusterStorageIds, err := r.fetchClusterStorageIds(ctx, mirrorPeer)
if err != nil {
if k8serrors.IsNotFound(err) {
logger.Info("Cluster StorageIds not found, requeuing")
return ctrl.Result{RequeueAfter: 10 * time.Second}, nil
}
logger.Error("An unknown error occurred while fetching the cluster FSIDs, retrying", "error", err)
return ctrl.Result{}, fmt.Errorf("an unknown error occurred while fetching the cluster FSIDs, retrying: %v", err)
}

err = r.createOrUpdateManifestWorkForVRC(ctx, mirrorPeer, &drpolicy, clusterStorageIds)
if err != nil {
logger.Error("Failed to create VolumeReplicationClass via ManifestWork", "error", err)
return ctrl.Result{}, fmt.Errorf("failed to create VolumeReplicationClass via ManifestWork: %v", err)
}
}

logger.Info("Successfully reconciled DRPolicy")
return ctrl.Result{}, nil
}

func (r *DRPolicyReconciler) createOrUpdateManifestWorkForVRC(ctx context.Context, mp *multiclusterv1alpha1.MirrorPeer, dp *ramenv1alpha1.DRPolicy, storageIdsMap map[string]map[string]string) error {
logger := r.Logger.With("DRPolicy", dp.Name, "MirrorPeer", mp.Name)

storageId1 := storageIdsMap[mp.Spec.Items[0].ClusterName]["rbd"]
storageId2 := storageIdsMap[mp.Spec.Items[1].ClusterName]["rbd"]
replicationId, err := utils.CreateUniqueReplicationId(storageId1, storageId2)
if err != nil {
logger.Error("Failed to create unique replication ID", "error", err)
return err
}

manifestWorkName := fmt.Sprintf("vrc-%v", utils.FnvHash(dp.Name)) // Two ManifestWork per DRPolicy

for _, pr := range mp.Spec.Items {
found := &workv1.ManifestWork{
ObjectMeta: metav1.ObjectMeta{
Name: manifestWorkName,
Namespace: pr.ClusterName,
},
}

err := r.HubClient.Get(ctx, types.NamespacedName{Name: found.Name, Namespace: pr.ClusterName}, found)

switch {
case err == nil:
logger.Info("ManifestWork already exists, updating", "ManifestWorkName", manifestWorkName)
case !k8serrors.IsNotFound(err):
logger.Error("Failed to get ManifestWork", "ManifestWorkName", manifestWorkName, "error", err)
return err
}

interval := dp.Spec.SchedulingInterval
params := make(map[string]string)
params[MirroringModeKey] = DefaultMirroringMode
params[SchedulingIntervalKey] = interval
params[ReplicationSecretNameKey] = RBDReplicationSecretName
params[ReplicationSecretNamespaceKey] = pr.StorageClusterRef.Namespace
vrcName := fmt.Sprintf(RBDVolumeReplicationClassNameTemplate, utils.FnvHash(interval))
labels := make(map[string]string)
labels[fmt.Sprintf(RamenLabelTemplate, ReplicationIDKey)] = replicationId
labels[fmt.Sprintf(RamenLabelTemplate, "maintenancemodes")] = "Failover"
labels[fmt.Sprintf(RamenLabelTemplate, StorageIDKey)] = storageIdsMap[pr.ClusterName]["rbd"]
vrc := replicationv1alpha1.VolumeReplicationClass{
TypeMeta: metav1.TypeMeta{
Kind: "VolumeReplicationClass",
APIVersion: "replication.storage.openshift.io/v1alpha1",
},
ObjectMeta: metav1.ObjectMeta{
Name: vrcName,
Labels: labels,
Annotations: map[string]string{
RBDVolumeReplicationClassDefaultAnnotation: "true",
},
},
Spec: replicationv1alpha1.VolumeReplicationClassSpec{
Parameters: params,
Provisioner: fmt.Sprintf(RBDProvisionerTemplate, pr.StorageClusterRef.Namespace),
},
}

objJson, err := json.Marshal(vrc)
if err != nil {
logger.Error("Failed to marshal VolumeReplicationClass to JSON", "VolumeReplicationClass", vrcName, "error", err)
return fmt.Errorf("failed to marshal %v to JSON, error %w", vrc, err)
}

manifestList := []workv1.Manifest{
{
RawExtension: runtime.RawExtension{
Raw: objJson,
},
},
}

if dp.Spec.ReplicationClassSelector.MatchLabels[RBDFlattenVolumeReplicationClassLabelKey] == RBDFlattenVolumeReplicationClassLabelValue {
vrcFlatten := vrc.DeepCopy()
vrcFlatten.Name = fmt.Sprintf(RBDFlattenVolumeReplicationClassNameTemplate, utils.FnvHash(interval))
vrcFlatten.Labels[RBDFlattenVolumeReplicationClassLabelKey] = RBDFlattenVolumeReplicationClassLabelValue
vrcFlatten.Annotations = map[string]string{}
vrcFlatten.Spec.Parameters["flattenMode"] = "force"
vrcFlattenJson, err := json.Marshal(vrcFlatten)
if err != nil {
return fmt.Errorf("failed to marshal %v to JSON, error %w", vrcFlatten, err)
}
manifestList = append(manifestList, workv1.Manifest{
RawExtension: runtime.RawExtension{
Raw: vrcFlattenJson,
},
})
}

mw := workv1.ManifestWork{
ObjectMeta: metav1.ObjectMeta{
Name: manifestWorkName,
Namespace: pr.ClusterName,
OwnerReferences: []metav1.OwnerReference{
{
Kind: dp.Kind,
Name: dp.Name,
UID: dp.UID,
APIVersion: dp.APIVersion,
},
},
},
}

_, err = controllerutil.CreateOrUpdate(ctx, r.HubClient, &mw, func() error {
mw.Spec = workv1.ManifestWorkSpec{
Workload: workv1.ManifestsTemplate{
Manifests: manifestList,
},
}
return nil
})

if err != nil {
logger.Error("Failed to create/update ManifestWork", "ManifestWorkName", manifestWorkName, "error", err)
return err
}

logger.Info("ManifestWork created/updated successfully", "ManifestWorkName", manifestWorkName, "VolumeReplicationClassName", vrcName)
}

return nil
}

func (r *DRPolicyReconciler) fetchClusterStorageIds(ctx context.Context, peer *multiclusterv1alpha1.MirrorPeer) (map[string]map[string]string, error) {
clusterStorageIds := make(map[string]map[string]string)

for _, pr := range peer.Spec.Items {
logger := r.Logger.With("MirrorPeer", peer.Name, "ClusterName", pr.ClusterName)
rookSecretName := utils.GetSecretNameByPeerRef(pr)
logger.Info("Fetching rook secret", "SecretName", rookSecretName)

hs, err := utils.FetchSecretWithName(ctx, r.HubClient, types.NamespacedName{
Name: rookSecretName,
Namespace: pr.ClusterName,
})
if err != nil {
if k8serrors.IsNotFound(err) {
logger.Info("Secret not found, will attempt to fetch again after a delay",
"SecretName", rookSecretName)
return nil, err
}
logger.Error("Failed to fetch rook secret",
"SecretName", rookSecretName,
"error", err)
return nil, err
}

logger.Info("Unmarshalling rook secret", "SecretName", rookSecretName)
storageIds, err := utils.GetStorageIdsFromHubSecret(hs)
if err != nil {
logger.Error("Failed to unmarshal rook secret",
"SecretName", rookSecretName,
"error", err)
return nil, err
}

// Store the storage IDs mapped to cluster name
clusterStorageIds[pr.ClusterName] = storageIds
logger.Info("Successfully fetched StorageIds for cluster",
"ClusterName", pr.ClusterName,
"StorageIDs", storageIds)
}

r.Logger.Info("Successfully fetched all cluster StorageIDs",
"MirrorPeer", peer.Name,
"ClusterCount", len(clusterStorageIds))
return clusterStorageIds, nil
}
17 changes: 0 additions & 17 deletions controllers/drpolicy_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package controllers

import (
"context"
"fmt"
"os"
"testing"

Expand All @@ -13,7 +12,6 @@ import (
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
workv1 "open-cluster-management.io/api/work/v1"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
)
Expand Down Expand Up @@ -83,21 +81,6 @@ func TestDRPolicyReconcile(t *testing.T) {
t.Errorf("DRPolicyReconciler Reconcile() failed. Error: %s", err)
}

for _, clusterName := range drpolicy.Spec.DRClusters {
name := fmt.Sprintf("vrc-%v", utils.FnvHash(drpolicy.Name))
var found workv1.ManifestWork
err := r.HubClient.Get(ctx, types.NamespacedName{
Namespace: clusterName,
Name: name,
}, &found)

if err != nil {
t.Errorf("Failed to get ManifestWork. Error: %s", err)
}
if len(found.Spec.Workload.Manifests) < 2 {
t.Errorf("Expected at least 2 VRC")
}
}
}

func getFakeDRPolicyReconciler(drpolicy *ramenv1alpha1.DRPolicy, mp *multiclusterv1alpha1.MirrorPeer) DRPolicyReconciler {
Expand Down

0 comments on commit 902a1c0

Please sign in to comment.