Skip to content

Commit

Permalink
Merge pull request #177 from red-hat-storage/sync_us--main
Browse files Browse the repository at this point in the history
Syncing latest changes from upstream main for ramen
  • Loading branch information
ShyamsundarR authored Jan 24, 2024
2 parents 1560948 + 040137e commit 1e70913
Show file tree
Hide file tree
Showing 9 changed files with 158 additions and 94 deletions.
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,13 @@ WORKDIR /workspace
# Copy the Go Modules manifests
COPY go.mod go.mod
COPY go.sum go.sum
COPY api/ api/
# cache deps before building and copying source so that we don't need to re-download as much
# and so that source changes don't invalidate our downloaded layer
RUN go mod download

# Copy the go source
COPY main.go main.go
COPY api/ api/
COPY controllers/ controllers/

# Build
Expand Down
4 changes: 4 additions & 0 deletions api/v1alpha1/drplacementcontrol_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ type DRState string

// These are the valid values for DRState
const (
// WaitForUser, state recorded in DRPC status to indicate that we are
// waiting for the user to take an action after hub recover.
WaitForUser = DRState("WaitForUser")

// Initiating, state recorded in the DRPC status to indicate that this
// action (Deploy/Failover/Relocate) is preparing for execution. There
// is NO follow up state called 'Initiated'
Expand Down
1 change: 1 addition & 0 deletions controllers/drplacementcontrol.go
Original file line number Diff line number Diff line change
Expand Up @@ -2335,6 +2335,7 @@ func (d *DRPCInstance) setConditionOnInitialDeploymentCompletion() {

func (d *DRPCInstance) setStatusInitiating() {
if !(d.instance.Status.Phase == "" ||
d.instance.Status.Phase == rmn.WaitForUser ||
d.instance.Status.Phase == rmn.Deployed ||
d.instance.Status.Phase == rmn.FailedOver ||
d.instance.Status.Phase == rmn.Relocated) {
Expand Down
103 changes: 61 additions & 42 deletions controllers/drplacementcontrol_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -856,6 +856,8 @@ func (r *DRPlacementControlReconciler) createDRPCInstance(
placementObj client.Object,
log logr.Logger,
) (*DRPCInstance, error) {
log.Info("Creating DRPC instance")

drClusters, err := getDRClusters(ctx, r.Client, drPolicy)
if err != nil {
return nil, err
Expand Down Expand Up @@ -2168,8 +2170,10 @@ func (r *DRPlacementControlReconciler) ensureDRPCStatusConsistency(
) (bool, error) {
requeue := true

log.Info("Ensure DRPC Status Consistency")

// This will always be false the first time the DRPC resource is first created OR after hub recovery
if drpc.Status.Phase != "" {
if drpc.Status.Phase != "" && drpc.Status.Phase != rmn.WaitForUser {
return !requeue, nil
}

Expand All @@ -2178,7 +2182,10 @@ func (r *DRPlacementControlReconciler) ensureDRPCStatusConsistency(
dstCluster = drpc.Spec.FailoverCluster
}

progress, err := r.determineDRPCState(ctx, drpc, drPolicy, placementObj, dstCluster, log)
progress, msg, err := r.determineDRPCState(ctx, drpc, drPolicy, placementObj, dstCluster, log)

log.Info(msg)

if err != nil {
return requeue, err
}
Expand All @@ -2187,22 +2194,23 @@ func (r *DRPlacementControlReconciler) ensureDRPCStatusConsistency(
case Continue:
return !requeue, nil
case AllowFailover:
drpc.Status.Phase = rmn.WaitForUser
updateDRPCProgression(drpc, rmn.ProgressionActionPaused, log)
addOrUpdateCondition(&drpc.Status.Conditions, rmn.ConditionAvailable,
drpc.Generation, metav1.ConditionTrue, rmn.ReasonSuccess, "Failover allowed")
drpc.Generation, metav1.ConditionTrue, rmn.ReasonSuccess, msg)
addOrUpdateCondition(&drpc.Status.Conditions, rmn.ConditionPeerReady, drpc.Generation,
metav1.ConditionTrue, rmn.ReasonSuccess, "Failover allowed")

return requeue, nil
default:
msg := "Operation Paused - User Intervention Required."
msg := fmt.Sprintf("Operation Paused - User Intervention Required. %s", msg)

log.Info(fmt.Sprintf("err:%v - msg:%s", err, msg))
log.Info(msg)
updateDRPCProgression(drpc, rmn.ProgressionActionPaused, log)
addOrUpdateCondition(&drpc.Status.Conditions, rmn.ConditionAvailable,
drpc.Generation, metav1.ConditionFalse, rmn.ReasonPaused, msg)
addOrUpdateCondition(&drpc.Status.Conditions, rmn.ConditionPeerReady, drpc.Generation,
metav1.ConditionFalse, rmn.ReasonPaused, msg)
metav1.ConditionFalse, rmn.ReasonPaused, "User Intervention Required")

return requeue, nil
}
Expand Down Expand Up @@ -2249,41 +2257,49 @@ func (r *DRPlacementControlReconciler) determineDRPCState(
placementObj client.Object,
dstCluster string,
log logr.Logger,
) (Progress, error) {
) (Progress, string, error) {
log.Info("Rebuild DRPC state")

vrgNamespace, err := selectVRGNamespace(r.Client, log, drpc, placementObj)
if err != nil {
log.Info("Failed to select VRG namespace")

return Stop, err
return Stop, "", err
}

drClusters, err := getDRClusters(ctx, r.Client, drPolicy)
if err != nil {
return Stop, err
return Stop, "", err
}

vrgs, successfullyQueriedClusterCount, failedCluster, err := getVRGsFromManagedClusters(
r.MCVGetter, drpc, drClusters, vrgNamespace, log)
if err != nil {
log.Info("Failed to get a list of VRGs")

return Stop, err
return Stop, "", err
}

// IF 2 clusters queried, and both queries failed, then STOP
if successfullyQueriedClusterCount == 0 {
log.Info("Number of clusters queried is 0. Stop...")
msg := "Stop - Number of clusters queried is 0"

return Stop, nil
return Stop, msg, nil
}

// IF 2 clusters queried successfully and no VRGs, then continue with initial deployment
if successfullyQueriedClusterCount == 2 && len(vrgs) == 0 {
log.Info("Queried 2 clusters successfully")

return Continue, nil
return Continue, "", nil
}

if drpc.Status.Phase == rmn.WaitForUser &&
drpc.Spec.Action == rmn.ActionFailover &&
drpc.Spec.FailoverCluster != failedCluster {
log.Info("Continue. The action is failover and the failoverCluster is accessible")

return Continue, "", nil
}

// IF queried 2 clusters queried, 1 failed and 0 VRG found, then check s3 store.
Expand All @@ -2297,35 +2313,36 @@ func (r *DRPlacementControlReconciler) determineDRPCState(
if vrg == nil {
// IF the failed cluster is not the dest cluster, then this could be an initial deploy
if failedCluster != dstCluster {
return Continue, nil
return Continue, "", nil
}

log.Info("Unable to query all clusters and failed to get VRG from s3 store")
msg := fmt.Sprintf("Unable to query all clusters and failed to get VRG from s3 store. Failed to query %s",
failedCluster)

return Stop, nil
return Stop, msg, nil
}

log.Info("VRG From s3", "VRG Spec", vrg.Spec, "VRG Annotations", vrg.GetAnnotations())
log.Info("Got VRG From s3", "VRG Spec", vrg.Spec, "VRG Annotations", vrg.GetAnnotations())

if drpc.Spec.Action != rmn.DRAction(vrg.Spec.Action) {
log.Info(fmt.Sprintf("Two different actions - drpc action is '%s'/vrg action from s3 is '%s'",
drpc.Spec.Action, vrg.Spec.Action))
msg := fmt.Sprintf("Failover is allowed - Two different actions - drpcAction is '%s' and vrgAction from s3 is '%s'",
drpc.Spec.Action, vrg.Spec.Action)

return AllowFailover, nil
return AllowFailover, msg, nil
}

if dstCluster == vrg.GetAnnotations()[DestinationClusterAnnotationKey] &&
dstCluster != failedCluster {
log.Info(fmt.Sprintf("VRG from s3. Same dstCluster %s/%s. Proceeding...",
dstCluster, vrg.GetAnnotations()[DestinationClusterAnnotationKey]))

return Continue, nil
return Continue, "", nil
}

log.Info(fmt.Sprintf("VRG from s3. DRPCAction/vrgAction/DRPCDstClstr/vrgDstClstr %s/%s/%s/%s. Allow Failover...",
drpc.Spec.Action, vrg.Spec.Action, dstCluster, vrg.GetAnnotations()[DestinationClusterAnnotationKey]))
msg := fmt.Sprintf("Failover is allowed - drpcAction:'%s'. vrgAction:'%s'. DRPCDstClstr:'%s'. vrgDstClstr:'%s'.",
drpc.Spec.Action, vrg.Spec.Action, dstCluster, vrg.GetAnnotations()[DestinationClusterAnnotationKey])

return AllowFailover, nil
return AllowFailover, msg, nil
}

// IF 2 clusters queried, 1 failed and 1 VRG found on the failover cluster, then check the action, if they don't
Expand All @@ -2341,25 +2358,26 @@ func (r *DRPlacementControlReconciler) determineDRPCState(
break
}

if drpc.Spec.Action != rmn.DRAction(vrg.Spec.Action) {
log.Info(fmt.Sprintf("Stop! Two different actions - drpc action is '%s'/vrg action is '%s'",
drpc.Spec.Action, vrg.Spec.Action))
if drpc.Spec.Action != rmn.DRAction(vrg.Spec.Action) &&
dstCluster == clusterName {
msg := fmt.Sprintf("Stop - Two different actions for the same cluster - drpcAction:'%s'. vrgAction:'%s'",
drpc.Spec.Action, vrg.Spec.Action)

return Stop, nil
return Stop, msg, nil
}

if dstCluster != clusterName && vrg.Spec.ReplicationState == rmn.Secondary {
log.Info(fmt.Sprintf("Same Action and dstCluster and ReplicationState %s/%s/%s",
log.Info(fmt.Sprintf("Failover is allowed. Action/dstCluster/ReplicationState %s/%s/%s",
drpc.Spec.Action, dstCluster, vrg.Spec.ReplicationState))

log.Info("Failover is allowed - Primary is assumed in the failed cluster")
msg := "Failover is allowed - Primary is assumed to be on the failed cluster"

return AllowFailover, nil
return AllowFailover, msg, nil
}

log.Info("Allow to continue")
log.Info("Same action, dstCluster, and ReplicationState is primary. Continuing")

return Continue, nil
return Continue, "", nil
}

// Finally, IF 2 clusters queried successfully and 1 or more VRGs found, and if one of the VRGs is on the dstCluster,
Expand All @@ -2379,25 +2397,26 @@ func (r *DRPlacementControlReconciler) determineDRPCState(

// This can happen if a hub is recovered in the middle of a Relocate
if vrg.Spec.ReplicationState == rmn.Secondary && len(vrgs) == 2 {
log.Info("Both VRGs are in secondary state")
msg := "Stop - Both VRGs have the same secondary state"

return Stop, nil
return Stop, msg, nil
}

if drpc.Spec.Action == rmn.DRAction(vrg.Spec.Action) && dstCluster == clusterName {
log.Info(fmt.Sprintf("Same Action %s", drpc.Spec.Action))
log.Info(fmt.Sprintf("Same Action and dest cluster %s/%s", drpc.Spec.Action, dstCluster))

return Continue, nil
return Continue, "", nil
}

log.Info("Failover is allowed", "vrgs count", len(vrgs), "drpc action",
drpc.Spec.Action, "vrg action", vrg.Spec.Action, "dstCluster/clusterName", dstCluster+"/"+clusterName)
msg := fmt.Sprintf("Failover is allowed - VRGs count:'%d'. drpcAction:'%s'."+
" vrgAction:'%s'. DstCluster:'%s'. vrgOnCluste '%s'",
len(vrgs), drpc.Spec.Action, vrg.Spec.Action, dstCluster, clusterName)

return AllowFailover, nil
return AllowFailover, msg, nil
}

// IF none of the above, then allow failover (set PeerReady), but stop until someone makes the change
log.Info("Failover is allowed, but user intervention is required")
msg := "Failover is allowed - User intervention is required"

return AllowFailover, nil
return AllowFailover, msg, nil
}
Loading

0 comments on commit 1e70913

Please sign in to comment.