Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Syncing latest changes from upstream main for ramen #163

Merged
merged 6 commits into from
Dec 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion controllers/drcluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ func (r *DRClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request) (

u.initializeStatus()

if !u.object.ObjectMeta.DeletionTimestamp.IsZero() {
if drClusterIsDeleted(drcluster) {
return r.processDeletion(u)
}

Expand Down Expand Up @@ -360,6 +360,11 @@ func (r DRClusterReconciler) processCreateOrUpdate(u *drclusterInstance) (ctrl.R
return ctrl.Result{Requeue: requeue || u.requeue}, reconcileError
}

// Return true if dr cluster was marked for deletion.
func drClusterIsDeleted(c *ramen.DRCluster) bool {
return !c.GetDeletionTimestamp().IsZero()
}

func (u *drclusterInstance) initializeStatus() {
// Save a copy of the instance status to be used for the DRCluster status update comparison
u.object.Status.DeepCopyInto(&u.savedInstanceStatus)
Expand Down
7 changes: 6 additions & 1 deletion controllers/drcluster_mmode.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,11 @@ func (u *drclusterInstance) mModeActivationsRequired() (map[string]ramen.Storage

// getVRGs is a helper function to get the VRGs for the passed in DRPC and DRPolicy association
func (u *drclusterInstance) getVRGs(drpcCollection DRPCAndPolicy) (map[string]*ramen.VolumeReplicationGroup, error) {
drClusters, err := getDRClusters(u.ctx, u.client, drpcCollection.drPolicy)
if err != nil {
return nil, err
}

placementObj, err := getPlacementOrPlacementRule(u.ctx, u.client, drpcCollection.drpc, u.log)
if err != nil {
return nil, err
Expand All @@ -113,7 +118,7 @@ func (u *drclusterInstance) getVRGs(drpcCollection DRPCAndPolicy) (map[string]*r
vrgs, failedToQueryCluster, err := getVRGsFromManagedClusters(
u.reconciler.MCVGetter,
drpcCollection.drpc,
drpcCollection.drPolicy,
drClusters,
vrgNamespace,
u.log)
if err != nil {
Expand Down
29 changes: 19 additions & 10 deletions controllers/drplacementcontrol.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"sigs.k8s.io/yaml"

rmn "github.com/ramendr/ramen/api/v1alpha1"
Expand Down Expand Up @@ -1453,16 +1454,9 @@ func (d *DRPCInstance) createVRGManifestWork(homeCluster string, repState rmn.Re
return nil
}

// ensureVRGManifestWork ensures that the VRG ManifestWork exists and matches the current VRG state.
// TODO: This may be safe only when the VRG is primary - check if callers use this correctly.
func (d *DRPCInstance) ensureVRGManifestWork(homeCluster string) error {
mw, mwErr := d.mwu.FindManifestWorkByType(rmnutil.MWTypeVRG, homeCluster)
if mwErr != nil {
d.log.Info("Ensure VRG ManifestWork", "Error", mwErr)
}

if mw != nil {
return nil
}

d.log.Info("Ensure VRG ManifestWork",
"Last State:", d.getLastDRState(), "cluster", homeCluster)

Expand Down Expand Up @@ -1501,7 +1495,7 @@ func (d *DRPCInstance) generateVRG(repState rmn.ReplicationState) rmn.VolumeRepl
Spec: rmn.VolumeReplicationGroupSpec{
PVCSelector: d.instance.Spec.PVCSelector,
ReplicationState: repState,
S3Profiles: rmnutil.DRPolicyS3Profiles(d.drPolicy, d.drClusters).List(),
S3Profiles: d.availableS3Profiles(),
KubeObjectProtection: d.instance.Spec.KubeObjectProtection,
},
}
Expand All @@ -1513,6 +1507,21 @@ func (d *DRPCInstance) generateVRG(repState rmn.ReplicationState) rmn.VolumeRepl
return vrg
}

func (d *DRPCInstance) availableS3Profiles() []string {
profiles := sets.New[string]()

for i := range d.drClusters {
drCluster := &d.drClusters[i]
if drClusterIsDeleted(drCluster) {
continue
}

profiles.Insert(drCluster.Spec.S3ProfileName)
}

return sets.List(profiles)
}

func (d *DRPCInstance) generateVRGSpecAsync() *rmn.VRGAsyncSpec {
if dRPolicySupportsRegional(d.drPolicy, d.drClusters) {
return &rmn.VRGAsyncSpec{
Expand Down
39 changes: 26 additions & 13 deletions controllers/drplacementcontrol_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -775,7 +775,7 @@ func (r *DRPlacementControlReconciler) createDRPCInstance(
return nil, err
}

vrgs, err := updateVRGsFromManagedClusters(r.MCVGetter, drpc, drPolicy, vrgNamespace, log)
vrgs, err := updateVRGsFromManagedClusters(r.MCVGetter, drpc, drClusters, vrgNamespace, log)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1064,8 +1064,13 @@ func (r *DRPlacementControlReconciler) finalizeDRPC(ctx context.Context, drpc *r
}
}

drClusters, err := getDRClusters(ctx, r.Client, drPolicy)
if err != nil {
return fmt.Errorf("failed to get drclusters. Error (%w)", err)
}

// Verify VRGs have been deleted
vrgs, _, err := getVRGsFromManagedClusters(r.MCVGetter, drpc, drPolicy, vrgNamespace, log)
vrgs, _, err := getVRGsFromManagedClusters(r.MCVGetter, drpc, drClusters, vrgNamespace, log)
if err != nil {
return fmt.Errorf("failed to retrieve VRGs. We'll retry later. Error (%w)", err)
}
Expand Down Expand Up @@ -1421,9 +1426,9 @@ func (r *DRPlacementControlReconciler) clonePlacementRule(ctx context.Context,
}

func updateVRGsFromManagedClusters(mcvGetter rmnutil.ManagedClusterViewGetter, drpc *rmn.DRPlacementControl,
drPolicy *rmn.DRPolicy, vrgNamespace string, log logr.Logger,
drClusters []rmn.DRCluster, vrgNamespace string, log logr.Logger,
) (map[string]*rmn.VolumeReplicationGroup, error) {
vrgs, failedClusterToQuery, err := getVRGsFromManagedClusters(mcvGetter, drpc, drPolicy, vrgNamespace, log)
vrgs, failedClusterToQuery, err := getVRGsFromManagedClusters(mcvGetter, drpc, drClusters, vrgNamespace, log)
if err != nil {
return nil, err
}
Expand All @@ -1442,7 +1447,7 @@ func updateVRGsFromManagedClusters(mcvGetter rmnutil.ManagedClusterViewGetter, d
}

func getVRGsFromManagedClusters(mcvGetter rmnutil.ManagedClusterViewGetter, drpc *rmn.DRPlacementControl,
drPolicy *rmn.DRPolicy, vrgNamespace string, log logr.Logger,
drClusters []rmn.DRCluster, vrgNamespace string, log logr.Logger,
) (map[string]*rmn.VolumeReplicationGroup, string, error) {
vrgs := map[string]*rmn.VolumeReplicationGroup{}

Expand All @@ -1455,33 +1460,41 @@ func getVRGsFromManagedClusters(mcvGetter rmnutil.ManagedClusterViewGetter, drpc

var clustersQueriedSuccessfully int

for _, drCluster := range rmnutil.DrpolicyClusterNames(drPolicy) {
vrg, err := mcvGetter.GetVRGFromManagedCluster(drpc.Name, vrgNamespace, drCluster, annotations)
for i := range drClusters {
drCluster := &drClusters[i]

vrg, err := mcvGetter.GetVRGFromManagedCluster(drpc.Name, vrgNamespace, drCluster.Name, annotations)
if err != nil {
// Only NotFound error is accepted
if errors.IsNotFound(err) {
log.Info(fmt.Sprintf("VRG not found on %q", drCluster))
log.Info(fmt.Sprintf("VRG not found on %q", drCluster.Name))
clustersQueriedSuccessfully++

continue
}

failedClusterToQuery = drCluster
failedClusterToQuery = drCluster.Name

log.Info(fmt.Sprintf("failed to retrieve VRG from %s. err (%v)", drCluster, err))
log.Info(fmt.Sprintf("failed to retrieve VRG from %s. err (%v)", drCluster.Name, err))

continue
}

clustersQueriedSuccessfully++

vrgs[drCluster] = vrg
if drClusterIsDeleted(drCluster) {
log.Info("Skipping VRG on deleted drcluster", "drcluster", drCluster.Name, "vrg", vrg.Name)

continue
}

vrgs[drCluster.Name] = vrg

log.Info("VRG location", "VRG on", drCluster)
log.Info("VRG location", "VRG on", drCluster.Name)
}

// We are done if we successfully queried all drClusters
if clustersQueriedSuccessfully == len(rmnutil.DrpolicyClusterNames(drPolicy)) {
if clustersQueriedSuccessfully == len(drClusters) {
return vrgs, "", nil
}

Expand Down
32 changes: 7 additions & 25 deletions controllers/util/mw_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -481,50 +481,32 @@ func (mwu *MWUtil) createOrUpdateManifestWork(
mw *ocmworkv1.ManifestWork,
managedClusternamespace string,
) error {
key := types.NamespacedName{Name: mw.Name, Namespace: managedClusternamespace}
foundMW := &ocmworkv1.ManifestWork{}

err := mwu.Client.Get(mwu.Ctx,
types.NamespacedName{Name: mw.Name, Namespace: managedClusternamespace},
foundMW)
err := mwu.Client.Get(mwu.Ctx, key, foundMW)
if err != nil {
if !errors.IsNotFound(err) {
return errorswrapper.Wrap(err, fmt.Sprintf("failed to fetch ManifestWork %s", mw.Name))
return errorswrapper.Wrap(err, fmt.Sprintf("failed to fetch ManifestWork %s", key))
}

// Let DRPC receive notification for any changes to ManifestWork CR created by it.
// if err := ctrl.SetControllerReference(d.instance, mw, d.reconciler.Scheme); err != nil {
// return fmt.Errorf("failed to set owner reference to ManifestWork resource (%s/%s) (%v)",
// mw.Name, mw.Namespace, err)
// }

mwu.Log.Info("Creating ManifestWork", "cluster", managedClusternamespace, "MW", mw)

return mwu.Client.Create(mwu.Ctx, mw)
}

if !reflect.DeepEqual(foundMW.Spec, mw.Spec) {
mwu.Log.Info("ManifestWork exists.", "name", mw.Name, "namespace", foundMW.Namespace)

retryErr := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
var err error
mwu.Log.Info("Updating ManifestWork", "name", mw.Name, "namespace", foundMW.Namespace)

err = mwu.Client.Get(mwu.Ctx,
types.NamespacedName{Name: mw.Name, Namespace: managedClusternamespace},
foundMW)
if err != nil {
return retry.RetryOnConflict(retry.DefaultBackoff, func() error {
if err := mwu.Client.Get(mwu.Ctx, key, foundMW); err != nil {
return err
}

mw.Spec.DeepCopyInto(&foundMW.Spec)

err = mwu.Client.Update(mwu.Ctx, foundMW)

return err
return mwu.Client.Update(mwu.Ctx, foundMW)
})

if retryErr != nil {
return retryErr
}
}

return nil
Expand Down