Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Syncing latest changes from upstream main for ramen #386

Open
wants to merge 15 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions api/v1alpha1/drpolicy_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,10 @@ type PeerClass struct {
// StorageClassName is the name of a StorageClass that is available across the peers
//+optional
StorageClassName string `json:"storageClassName,omitempty"`

// ClusterIDs is a list of two clusterIDs that represent this peer relationship for a common StorageClassName
// The IDs are based on the value of the metadata.uid of the kube-system namespace
ClusterIDs []string `json:"clusterIDs,omitempty"`
}

const (
Expand Down
5 changes: 5 additions & 0 deletions api/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 8 additions & 4 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,10 +206,14 @@ func setupReconcilersCluster(mgr ctrl.Manager, ramenConfig *ramendrv1alpha1.Rame

func setupReconcilersHub(mgr ctrl.Manager) {
if err := (&controllers.DRPolicyReconciler{
Client: mgr.GetClient(),
APIReader: mgr.GetAPIReader(),
Log: ctrl.Log.WithName("controllers").WithName("DRPolicy"),
Scheme: mgr.GetScheme(),
Client: mgr.GetClient(),
APIReader: mgr.GetAPIReader(),
Log: ctrl.Log.WithName("controllers").WithName("DRPolicy"),
Scheme: mgr.GetScheme(),
MCVGetter: rmnutil.ManagedClusterViewGetterImpl{
Client: mgr.GetClient(),
APIReader: mgr.GetAPIReader(),
},
ObjectStoreGetter: controllers.S3ObjectStoreGetter(),
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "DRPolicy")
Expand Down
14 changes: 14 additions & 0 deletions config/crd/bases/ramendr.openshift.io_drpolicies.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,13 @@ spec:
that have related async relationships. (one per pair of peers in the policy)
items:
properties:
clusterIDs:
description: |-
ClusterIDs is a list of two clusterIDs that represent this peer relationship for a common StorageClassName
The IDs are based on the value of the metadata.uid of the kube-system namespace
items:
type: string
type: array
replicationID:
description: |-
ReplicationID is the common value for the label "ramendr.openshift.io/replicationID" on the corresponding
Expand Down Expand Up @@ -333,6 +340,13 @@ spec:
that have related sync relationships. (one per pair of peers in the policy)
items:
properties:
clusterIDs:
description: |-
ClusterIDs is a list of two clusterIDs that represent this peer relationship for a common StorageClassName
The IDs are based on the value of the metadata.uid of the kube-system namespace
items:
type: string
type: array
replicationID:
description: |-
ReplicationID is the common value for the label "ramendr.openshift.io/replicationID" on the corresponding
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,13 @@ spec:
creates a PVC using a newer StorageClass that is determined to be common across the peers.
items:
properties:
clusterIDs:
description: |-
ClusterIDs is a list of two clusterIDs that represent this peer relationship for a common StorageClassName
The IDs are based on the value of the metadata.uid of the kube-system namespace
items:
type: string
type: array
replicationID:
description: |-
ReplicationID is the common value for the label "ramendr.openshift.io/replicationID" on the corresponding
Expand Down Expand Up @@ -457,6 +464,13 @@ spec:
creates a PVC using a newer StorageClass that is determined to be common across the peers.
items:
properties:
clusterIDs:
description: |-
ClusterIDs is a list of two clusterIDs that represent this peer relationship for a common StorageClassName
The IDs are based on the value of the metadata.uid of the kube-system namespace
items:
type: string
type: array
replicationID:
description: |-
ReplicationID is the common value for the label "ramendr.openshift.io/replicationID" on the corresponding
Expand Down
14 changes: 14 additions & 0 deletions config/crd/bases/ramendr.openshift.io_volumereplicationgroups.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,13 @@ spec:
creates a PVC using a newer StorageClass that is determined to be common across the peers.
items:
properties:
clusterIDs:
description: |-
ClusterIDs is a list of two clusterIDs that represent this peer relationship for a common StorageClassName
The IDs are based on the value of the metadata.uid of the kube-system namespace
items:
type: string
type: array
replicationID:
description: |-
ReplicationID is the common value for the label "ramendr.openshift.io/replicationID" on the corresponding
Expand Down Expand Up @@ -406,6 +413,13 @@ spec:
creates a PVC using a newer StorageClass that is determined to be common across the peers.
items:
properties:
clusterIDs:
description: |-
ClusterIDs is a list of two clusterIDs that represent this peer relationship for a common StorageClassName
The IDs are based on the value of the metadata.uid of the kube-system namespace
items:
type: string
type: array
replicationID:
description: |-
ReplicationID is the common value for the label "ramendr.openshift.io/replicationID" on the corresponding
Expand Down
17 changes: 6 additions & 11 deletions internal/controller/drclusterconfig_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,6 @@ const (
drCConfigOwnerName = "ramen"

maxReconcileBackoff = 5 * time.Minute

// Prefixes for various ClusterClaims
ccSCPrefix = "storage.class"
ccVSCPrefix = "snapshot.class"
ccVRCPrefix = "replication.class"
)

// DRClusterConfigReconciler reconciles a DRClusterConfig object
Expand Down Expand Up @@ -237,11 +232,11 @@ func (r *DRClusterConfigReconciler) createSCClusterClaims(
continue
}

if err := r.ensureClusterClaim(ctx, log, ccSCPrefix, sClasses.Items[i].GetName()); err != nil {
if err := r.ensureClusterClaim(ctx, log, util.CCSCPrefix, sClasses.Items[i].GetName()); err != nil {
return nil, err
}

claims = append(claims, claimName(ccSCPrefix, sClasses.Items[i].GetName()))
claims = append(claims, claimName(util.CCSCPrefix, sClasses.Items[i].GetName()))
}

return claims, nil
Expand All @@ -263,11 +258,11 @@ func (r *DRClusterConfigReconciler) createVSCClusterClaims(
continue
}

if err := r.ensureClusterClaim(ctx, log, ccVSCPrefix, vsClasses.Items[i].GetName()); err != nil {
if err := r.ensureClusterClaim(ctx, log, util.CCVSCPrefix, vsClasses.Items[i].GetName()); err != nil {
return nil, err
}

claims = append(claims, claimName(ccVSCPrefix, vsClasses.Items[i].GetName()))
claims = append(claims, claimName(util.CCVSCPrefix, vsClasses.Items[i].GetName()))
}

return claims, nil
Expand All @@ -289,11 +284,11 @@ func (r *DRClusterConfigReconciler) createVRCClusterClaims(
continue
}

if err := r.ensureClusterClaim(ctx, log, ccVRCPrefix, vrClasses.Items[i].GetName()); err != nil {
if err := r.ensureClusterClaim(ctx, log, util.CCVRCPrefix, vrClasses.Items[i].GetName()); err != nil {
return nil, err
}

claims = append(claims, claimName(ccVRCPrefix, vrClasses.Items[i].GetName()))
claims = append(claims, claimName(util.CCVRCPrefix, vrClasses.Items[i].GetName()))
}

return claims, nil
Expand Down
4 changes: 4 additions & 0 deletions internal/controller/drclusters.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,10 @@ func drClusterUndeploy(
return fmt.Errorf("drcluster '%v' referenced in one or more existing drPolicy resources", drcluster.Name)
}

if err := deleteViewsForClasses(mcv, log, drcluster.GetName()); err != nil {
return err
}

if err := mwu.DeleteManifestWork(mwu.BuildManifestWorkName(util.MWTypeDRCConfig), drcluster.GetName()); err != nil {
return err
}
Expand Down
93 changes: 63 additions & 30 deletions internal/controller/drplacementcontrol.go
Original file line number Diff line number Diff line change
Expand Up @@ -874,7 +874,7 @@ func (d *DRPCInstance) RunRelocate() (bool, error) {
addOrUpdateCondition(&d.instance.Status.Conditions, rmn.ConditionAvailable, d.instance.Generation,
d.getConditionStatusForTypeAvailable(), string(d.instance.Status.Phase), errMsg)

return !done, fmt.Errorf(errMsg)
return !done, fmt.Errorf("%s", errMsg)
}

if d.getLastDRState() != rmn.Relocating && !d.validatePeerReady() {
Expand Down Expand Up @@ -935,8 +935,7 @@ func (d *DRPCInstance) ensureCleanupAndVolSyncReplicationSetup(srcCluster string
// in the MW, but the VRGs in the vrgs slice are fetched using MCV.
vrg, ok := d.vrgs[srcCluster]
if !ok || len(vrg.Spec.VolSync.RDSpec) != 0 {
return fmt.Errorf(fmt.Sprintf("Waiting for RDSpec count on cluster %s to go to zero. VRG OK? %v",
srcCluster, ok))
return fmt.Errorf("waiting for RDSpec count on cluster %s to go to zero. VRG OK? %v", srcCluster, ok)
}

err = d.EnsureCleanup(srcCluster)
Expand Down Expand Up @@ -1406,6 +1405,8 @@ func (d *DRPCInstance) moveVRGToSecondaryEverywhere() bool {
}

func (d *DRPCInstance) cleanupSecondaries(skipCluster string) (bool, error) {
d.log.Info("Cleaning up secondaries.")

for _, clusterName := range rmnutil.DRPolicyClusterNames(d.drPolicy) {
if skipCluster == clusterName {
continue
Expand Down Expand Up @@ -1520,18 +1521,17 @@ func (d *DRPCInstance) createVRGManifestWork(homeCluster string, repState rmn.Re
}

// create VRG ManifestWork
d.log.Info("Creating VRG ManifestWork",
d.log.Info("Creating VRG ManifestWork", "ReplicationState", repState,
"Last State:", d.getLastDRState(), "cluster", homeCluster)

vrg := d.generateVRG(homeCluster, repState)
vrg.Spec.VolSync.Disabled = d.volSyncDisabled
vrg := d.newVRG(homeCluster, repState)

annotations := make(map[string]string)

annotations[DRPCNameAnnotation] = d.instance.Name
annotations[DRPCNamespaceAnnotation] = d.instance.Namespace

if err := d.mwu.CreateOrUpdateVRGManifestWork(
if _, err := d.mwu.CreateOrUpdateVRGManifestWork(
d.instance.Name, d.vrgNamespace,
homeCluster, vrg, annotations); err != nil {
d.log.Error(err, "failed to create or update VolumeReplicationGroup manifest")
Expand All @@ -1548,12 +1548,57 @@ func (d *DRPCInstance) ensureVRGManifestWork(homeCluster string) error {
d.log.Info("Ensure VRG ManifestWork",
"Last State:", d.getLastDRState(), "cluster", homeCluster)

cachedVrg := d.vrgs[homeCluster]
if cachedVrg == nil {
return fmt.Errorf("failed to get vrg from cluster %s", homeCluster)
mw, mwErr := d.mwu.FindManifestWorkByType(rmnutil.MWTypeVRG, homeCluster)
if mwErr != nil {
if errors.IsNotFound(mwErr) {
cachedVrg := d.vrgs[homeCluster]
if cachedVrg == nil {
return fmt.Errorf("failed to get vrg from cluster %s", homeCluster)
}

return d.createVRGManifestWork(homeCluster, cachedVrg.Spec.ReplicationState)
}

return fmt.Errorf("ensure VRG ManifestWork failed (%w)", mwErr)
}

vrg, err := rmnutil.ExtractVRGFromManifestWork(mw)
if err != nil {
return fmt.Errorf("error extracting VRG from ManifestWork for cluster %s. Error: %w", homeCluster, err)
}

return d.createVRGManifestWork(homeCluster, cachedVrg.Spec.ReplicationState)
d.updateVRGOptionalFields(vrg, homeCluster)

return d.mwu.UpdateVRGManifestWork(vrg, mw)
}

// updateVRGOptionalFields ensures that the optional fields in the VRG object are up to date.
// This function does not modify the following fields:
// - ObjectMeta.Name
// - ObjectMeta.Namespace
// - Spec.PVCSelector
// - Spec.ReplicationState
// - Spec.PrepareForFinalSync
// - Spec.RunFinalSync
// - Spec.VolSync.RDSpec
//
// These fields are either set during the initial creation of the VRG (e.g., name and namespace)
// or updated as needed, such as the PrepareForFinalSync and RunFinalSync fields.
func (d *DRPCInstance) updateVRGOptionalFields(vrg *rmn.VolumeReplicationGroup, homeCluster string) {
vrg.ObjectMeta.Annotations = map[string]string{
DestinationClusterAnnotationKey: homeCluster,
DoNotDeletePVCAnnotation: d.instance.GetAnnotations()[DoNotDeletePVCAnnotation],
DRPCUIDAnnotation: string(d.instance.UID),
rmnutil.IsCGEnabledAnnotation: d.instance.GetAnnotations()[rmnutil.IsCGEnabledAnnotation],
}

vrg.Spec.ProtectedNamespaces = d.instance.Spec.ProtectedNamespaces
vrg.Spec.S3Profiles = AvailableS3Profiles(d.drClusters)
vrg.Spec.KubeObjectProtection = d.instance.Spec.KubeObjectProtection
vrg.Spec.Async = d.generateVRGSpecAsync()
vrg.Spec.Sync = d.generateVRGSpecSync()
vrg.Spec.VolSync.Disabled = d.volSyncDisabled
d.setVRGAction(vrg)
}

func (d *DRPCInstance) ensurePlacement(homeCluster string) error {
Expand Down Expand Up @@ -1588,31 +1633,20 @@ func (d *DRPCInstance) setVRGAction(vrg *rmn.VolumeReplicationGroup) {
vrg.Spec.Action = action
}

func (d *DRPCInstance) generateVRG(dstCluster string, repState rmn.ReplicationState) rmn.VolumeReplicationGroup {
func (d *DRPCInstance) newVRG(dstCluster string, repState rmn.ReplicationState) rmn.VolumeReplicationGroup {
vrg := rmn.VolumeReplicationGroup{
TypeMeta: metav1.TypeMeta{Kind: "VolumeReplicationGroup", APIVersion: "ramendr.openshift.io/v1alpha1"},
ObjectMeta: metav1.ObjectMeta{
Name: d.instance.Name,
Namespace: d.vrgNamespace,
Annotations: map[string]string{
DestinationClusterAnnotationKey: dstCluster,
DoNotDeletePVCAnnotation: d.instance.GetAnnotations()[DoNotDeletePVCAnnotation],
DRPCUIDAnnotation: string(d.instance.UID),
rmnutil.IsCGEnabledAnnotation: d.instance.GetAnnotations()[rmnutil.IsCGEnabledAnnotation],
},
},
Spec: rmn.VolumeReplicationGroupSpec{
PVCSelector: d.instance.Spec.PVCSelector,
ProtectedNamespaces: d.instance.Spec.ProtectedNamespaces,
ReplicationState: repState,
S3Profiles: AvailableS3Profiles(d.drClusters),
KubeObjectProtection: d.instance.Spec.KubeObjectProtection,
PVCSelector: d.instance.Spec.PVCSelector,
ReplicationState: repState,
},
}

d.setVRGAction(&vrg)
vrg.Spec.Async = d.generateVRGSpecAsync()
vrg.Spec.Sync = d.generateVRGSpecSync()
d.updateVRGOptionalFields(&vrg, dstCluster)

return vrg
}
Expand Down Expand Up @@ -1760,7 +1794,7 @@ func (d *DRPCInstance) EnsureCleanup(clusterToSkip string) error {
}

if !clean {
msg := "cleaning secondaries"
msg := "cleaning up secondaries"
addOrUpdateCondition(&d.instance.Status.Conditions, rmn.ConditionPeerReady, d.instance.Generation,
metav1.ConditionFalse, rmn.ReasonCleaning, msg)

Expand All @@ -1775,8 +1809,7 @@ func (d *DRPCInstance) EnsureCleanup(clusterToSkip string) error {

//nolint:gocognit
func (d *DRPCInstance) cleanupForVolSync(clusterToSkip string) error {
d.log.Info("VolSync needs both VRGs. No need to clean up secondary")
d.log.Info("Ensure secondary on peer")
d.log.Info("VolSync needs both VRGs. Ensure secondary setup on peer")

peersReady := true

Expand All @@ -1793,7 +1826,7 @@ func (d *DRPCInstance) cleanupForVolSync(clusterToSkip string) error {

// Recreate the VRG ManifestWork for the secondary. This typically happens during Hub Recovery.
if errors.IsNotFound(err) {
err := d.createVolSyncDestManifestWork(clusterToSkip)
err := d.ensureVolSyncSetup(clusterToSkip)
if err != nil {
return err
}
Expand Down
6 changes: 3 additions & 3 deletions internal/controller/drplacementcontrol_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -1134,7 +1134,7 @@ func getVRGsFromManagedClusters(

vrgs[drCluster.Name] = vrg

log.Info("VRG location", "VRG on", drCluster.Name)
log.Info("VRG location", "VRG on", drCluster.Name, "replicationState", vrg.Spec.ReplicationState)
}

// We are done if we successfully queried all drClusters
Expand Down Expand Up @@ -2244,7 +2244,7 @@ func adoptExistingVRGManifestWork(
annotations[DRPCNameAnnotation] = drpc.Name
annotations[DRPCNamespaceAnnotation] = drpc.Namespace

err := mwu.CreateOrUpdateVRGManifestWork(drpc.Name, vrgNamespace, cluster, *vrg, annotations)
_, err := mwu.CreateOrUpdateVRGManifestWork(drpc.Name, vrgNamespace, cluster, *vrg, annotations)
if err != nil {
log.Info("error updating VRG via ManifestWork during adoption", "error", err, "cluster", cluster)
}
Expand Down Expand Up @@ -2281,7 +2281,7 @@ func adoptOrphanVRG(

vrg.Annotations[DRPCUIDAnnotation] = string(drpc.UID)

if err := mwu.CreateOrUpdateVRGManifestWork(
if _, err := mwu.CreateOrUpdateVRGManifestWork(
drpc.Name, vrgNamespace,
cluster, *vrg, annotations); err != nil {
log.Info("error creating VRG via ManifestWork during adoption", "error", err, "cluster", cluster)
Expand Down
Loading