Skip to content

Commit

Permalink
vrg: annotate pvc and pv when they are archived
Browse files Browse the repository at this point in the history
We annotate the PV and PVC after we upload them to the s3 store. Use the
annotation to skip upload if annotation exists.

Signed-off-by: Raghavendra Talur <[email protected]>
(cherry picked from commit c52f352)
  • Loading branch information
raghavendra-talur committed Oct 5, 2023
1 parent 1ff72a8 commit d78eb0b
Show file tree
Hide file tree
Showing 3 changed files with 124 additions and 40 deletions.
35 changes: 18 additions & 17 deletions controllers/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,23 +50,24 @@ const (

// VRG condition reasons
const (
VRGConditionReasonInitializing = "Initializing"
VRGConditionReasonReplicating = "Replicating"
VRGConditionReasonReplicated = "Replicated"
VRGConditionReasonReady = "Ready"
VRGConditionReasonDataProtected = "DataProtected"
VRGConditionReasonProgressing = "Progressing"
VRGConditionReasonClusterDataRestored = "Restored"
VRGConditionReasonError = "Error"
VRGConditionReasonErrorUnknown = "UnknownError"
VRGConditionReasonUploading = "Uploading"
VRGConditionReasonUploaded = "Uploaded"
VRGConditionReasonUploadError = "UploadError"
VRGConditionReasonVolSyncRepSourceInited = "SourceInitialized"
VRGConditionReasonVolSyncRepDestInited = "DestinationInitialized"
VRGConditionReasonVolSyncPVsRestored = "Restored"
VRGConditionReasonVolSyncFinalSyncInProgress = "Syncing"
VRGConditionReasonVolSyncFinalSyncComplete = "Synced"
VRGConditionReasonInitializing = "Initializing"
VRGConditionReasonReplicating = "Replicating"
VRGConditionReasonReplicated = "Replicated"
VRGConditionReasonReady = "Ready"
VRGConditionReasonDataProtected = "DataProtected"
VRGConditionReasonProgressing = "Progressing"
VRGConditionReasonClusterDataRestored = "Restored"
VRGConditionReasonError = "Error"
VRGConditionReasonErrorUnknown = "UnknownError"
VRGConditionReasonUploading = "Uploading"
VRGConditionReasonUploaded = "Uploaded"
VRGConditionReasonUploadError = "UploadError"
VRGConditionReasonVolSyncRepSourceInited = "SourceInitialized"
VRGConditionReasonVolSyncRepDestInited = "DestinationInitialized"
VRGConditionReasonVolSyncPVsRestored = "Restored"
VRGConditionReasonVolSyncFinalSyncInProgress = "Syncing"
VRGConditionReasonVolSyncFinalSyncComplete = "Synced"
VRGConditionReasonClusterDataAnnotationFailed = "AnnotationFailed"
)

const clusterDataProtectedTrueMessage = "Kube objects protected"
Expand Down
14 changes: 8 additions & 6 deletions controllers/volumereplicationgroup_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -449,12 +449,14 @@ const (
pvcInUse = "kubernetes.io/pvc-protection"

// Annotations
pvcVRAnnotationProtectedKey = "volumereplicationgroups.ramendr.openshift.io/vr-protected"
pvcVRAnnotationProtectedValue = "protected"
pvVRAnnotationRetentionKey = "volumereplicationgroups.ramendr.openshift.io/vr-retained"
pvVRAnnotationRetentionValue = "retained"
RestoreAnnotation = "volumereplicationgroups.ramendr.openshift.io/ramen-restore"
RestoredByRamen = "True"
pvcVRAnnotationProtectedKey = "volumereplicationgroups.ramendr.openshift.io/vr-protected"
pvcVRAnnotationProtectedValue = "protected"
pvcVRAnnotationArchivedKey = "volumereplicationgroups.ramendr.openshift.io/vr-archived"
pvcVRAnnotationArchivedVersionV1 = "archiveV1"
pvVRAnnotationRetentionKey = "volumereplicationgroups.ramendr.openshift.io/vr-retained"
pvVRAnnotationRetentionValue = "retained"
RestoreAnnotation = "volumereplicationgroups.ramendr.openshift.io/ramen-restore"
RestoredByRamen = "True"

// StorageClass label
StorageIDLabel = "ramendr.openshift.io/storageid"
Expand Down
115 changes: 98 additions & 17 deletions controllers/vrg_volrep.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"errors"
"fmt"
"reflect"
"strconv"
"strings"

"github.com/aws/aws-sdk-go/aws/awserr"
Expand Down Expand Up @@ -508,17 +509,43 @@ func (v *VRGInstance) undoPVRetentionForPVC(pvc corev1.PersistentVolumeClaim, lo
return nil
}

func (v *VRGInstance) generateArchiveAnnotation(gen int64) string {
return fmt.Sprintf("%s-%s", pvcVRAnnotationArchivedVersionV1, strconv.Itoa(int(gen)))
}

func (v *VRGInstance) isArchivedAlready(pvc *corev1.PersistentVolumeClaim, log logr.Logger) bool {
pvHasAnnotation := false
pvcHasAnnotation := false

pv, err := v.getPVFromPVC(pvc)
if err != nil {
log.Error(err, "Failed to get PV from PVC, PVC:%v, err: %w", pvc.Name, err)

return false
}

pvcDesiredValue := v.generateArchiveAnnotation(pvc.Generation)
if v, ok := pvc.ObjectMeta.Annotations[pvcVRAnnotationArchivedKey]; ok && (v == pvcDesiredValue) {
pvcHasAnnotation = true
}

pvDesiredValue := v.generateArchiveAnnotation(pv.Generation)
if v, ok := pv.ObjectMeta.Annotations[pvcVRAnnotationArchivedKey]; ok && (v == pvDesiredValue) {
pvHasAnnotation = true
}

if !pvHasAnnotation || !pvcHasAnnotation {
return false
}

return true
}

// Upload PV to the list of S3 stores in the VRG spec
func (v *VRGInstance) uploadPVandPVCtoS3Stores(pvc *corev1.PersistentVolumeClaim, log logr.Logger) (err error) {
// Find the ProtectedPVC of the given PVC in v.instance.Status.ProtectedPVCs[]
protectedPVC := v.findProtectedPVC(pvc.Name)
// Find the ClusterDataProtected condition of the given PVC in ProtectedPVC.Conditions
clusterDataProtected := findCondition(protectedPVC.Conditions, VRGConditionTypeClusterDataProtected)

// Optimization: skip uploading the PV of this PVC if it was uploaded previously
if clusterDataProtected != nil && clusterDataProtected.Status == metav1.ConditionTrue &&
clusterDataProtected.ObservedGeneration == v.instance.Generation {
// v.log.Info("PV cluster data already protected")
if v.isArchivedAlready(pvc, log) {
v.log.Info("PV cluster data already protected for PVC", "PVC", pvc.Name)

return nil
}

Expand All @@ -540,22 +567,34 @@ func (v *VRGInstance) uploadPVandPVCtoS3Stores(pvc *corev1.PersistentVolumeClaim
}

numProfilesUploaded := len(s3Profiles)
// Set ClusterDataProtected condition to true if PV was uploaded to all the profiles
if numProfilesUploaded == numProfilesToUpload {
msg := fmt.Sprintf("Done uploading PV/PVC cluster data to %d of %d S3 profile(s): %v",
numProfilesUploaded, numProfilesToUpload, s3Profiles)
v.log.Info(msg)
v.updatePVCClusterDataProtectedCondition(pvc.Name,
VRGConditionReasonUploaded, msg)
} else {

if numProfilesUploaded != numProfilesToUpload {
// Merely defensive as we don't expect to reach here
msg := fmt.Sprintf("Uploaded PV/PVC cluster data to only %d of %d S3 profile(s): %v",
numProfilesUploaded, numProfilesToUpload, s3Profiles)
v.log.Info(msg)
v.updatePVCClusterDataProtectedCondition(pvc.Name,
VRGConditionReasonUploadError, msg)

return fmt.Errorf(msg)
}

if err := v.addArchivedAnnotationForPVC(pvc, log); err != nil {
msg := fmt.Sprintf("failed to add archived annotation for PVC (%s/%s) with error (%v)",
pvc.Namespace, pvc.Name, err)
v.log.Info(msg)
v.updatePVCClusterDataProtectedCondition(pvc.Name,
VRGConditionReasonClusterDataAnnotationFailed, msg)

return fmt.Errorf(msg)
}

msg := fmt.Sprintf("Done uploading PV/PVC cluster data to %d of %d S3 profile(s): %v",
numProfilesUploaded, numProfilesToUpload, s3Profiles)
v.log.Info(msg)
v.updatePVCClusterDataProtectedCondition(pvc.Name,
VRGConditionReasonUploaded, msg)

return nil
}

Expand Down Expand Up @@ -1556,6 +1595,8 @@ func setPVCClusterDataProtectedCondition(protectedPVC *ramendrv1alpha1.Protected
setVRGClusterDataProtectingCondition(&protectedPVC.Conditions, observedGeneration, message)
case reason == VRGConditionReasonUploadError:
setVRGClusterDataUnprotectedCondition(&protectedPVC.Conditions, observedGeneration, message)
case reason == VRGConditionReasonClusterDataAnnotationFailed:
setVRGClusterDataUnprotectedCondition(&protectedPVC.Conditions, observedGeneration, message)
default:
// if appropriate reason is not provided, then treat it as an unknown condition.
message = "Unknown reason: " + reason
Expand Down Expand Up @@ -1686,6 +1727,46 @@ func (v *VRGInstance) removeFinalizerFromPVC(pvc *corev1.PersistentVolumeClaim,
return nil
}

func (v *VRGInstance) addArchivedAnnotationForPVC(pvc *corev1.PersistentVolumeClaim, log logr.Logger) error {
if pvc.ObjectMeta.Annotations == nil {
pvc.ObjectMeta.Annotations = map[string]string{}
}

pvc.ObjectMeta.Annotations[pvcVRAnnotationArchivedKey] = v.generateArchiveAnnotation(pvc.Generation)

if err := v.reconciler.Update(v.ctx, pvc); err != nil {
log.Error(err, "Failed to update PersistentVolumeClaim annotation")

return fmt.Errorf("failed to update PersistentVolumeClaim (%s/%s) annotation (%s) belonging to"+
"VolumeReplicationGroup (%s/%s), %w",
pvc.Namespace, pvc.Name, pvcVRAnnotationArchivedKey, v.instance.Namespace, v.instance.Name, err)
}

pv, err := v.getPVFromPVC(pvc)
if err != nil {
log.Error(err, "Failed to get PV from PVC")

return fmt.Errorf("failed to update PersistentVolume (%s) annotation (%s) belonging to"+
"VolumeReplicationGroup (%s/%s), %w",
pv.Name, pvcVRAnnotationArchivedKey, v.instance.Namespace, v.instance.Name, err)
}

if pv.ObjectMeta.Annotations == nil {
pv.ObjectMeta.Annotations = map[string]string{}
}

pv.ObjectMeta.Annotations[pvcVRAnnotationArchivedKey] = v.generateArchiveAnnotation(pv.Generation)
if err := v.reconciler.Update(v.ctx, &pv); err != nil {
log.Error(err, "Failed to update PersistentVolume annotation")

return fmt.Errorf("failed to update PersistentVolume (%s) annotation (%s) belonging to"+
"VolumeReplicationGroup (%s/%s), %w",
pvc.Name, pvcVRAnnotationArchivedKey, v.instance.Namespace, v.instance.Name, err)
}

return nil
}

// findProtectedPVC returns the &VRG.Status.ProtectedPVC[x] for the given pvcName
func (v *VRGInstance) findProtectedPVC(pvcName string) *ramendrv1alpha1.ProtectedPVC {
for index := range v.instance.Status.ProtectedPVCs {
Expand Down

0 comments on commit d78eb0b

Please sign in to comment.