Skip to content

Commit

Permalink
Stop provider upgrade if client gets behind
Browse files Browse the repository at this point in the history
- Provider supports client operator at X.Y and X.(Y-1) versions
- If any client is already at X.(Y-1) version and provider gets upgraded
then the lagging client goes out of support
- This PR prohibits such provider upgrades

[RHSTOR-5073]

Signed-off-by: Leela Venkaiah G <[email protected]>
  • Loading branch information
leelavg committed Dec 20, 2023
1 parent 42c58dc commit a097232
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 33 deletions.
56 changes: 52 additions & 4 deletions controllers/storagecluster/reconcile.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
conditionsv1 "github.com/openshift/custom-resource-status/conditions/v1"
"github.com/operator-framework/operator-lib/conditions"
ocsv1 "github.com/red-hat-storage/ocs-operator/api/v4/v1"
ocsv1alpha1 "github.com/red-hat-storage/ocs-operator/api/v4/v1alpha1"
"github.com/red-hat-storage/ocs-operator/v4/controllers/util"
statusutil "github.com/red-hat-storage/ocs-operator/v4/controllers/util"
"github.com/red-hat-storage/ocs-operator/v4/version"
Expand Down Expand Up @@ -505,13 +506,35 @@ func (r *StorageClusterReconciler) reconcilePhases(
if instance.Status.Phase != statusutil.PhaseClusterExpanding {
instance.Status.Phase = statusutil.PhaseReady

var returnErr error
var notUpgradeableReasons, notUpgradeableMessages []string
// mark operator upgradeable if and only if all storageclusters are ready
if r.clusters.AreOtherStorageClustersReady(instance) {
returnErr := r.SetOperatorConditions(message, reason, metav1.ConditionTrue, nil)
if returnErr != nil {
return reconcile.Result{}, returnErr
if !r.clusters.AreOtherStorageClustersReady(instance) {
notUpgradeableReasons = append(notUpgradeableReasons, "NotReady")
notUpgradeableMessages = append(notUpgradeableMessages, "StorageCluster is not ready")
}
// check operator upgradeability based on connected clients
if instance.Spec.AllowRemoteStorageConsumers {
if count, err := getUnsupportedClientsCount(r, instance.Namespace); err != nil {
notUpgradeableReasons = append(notUpgradeableReasons, "ODFClients")
notUpgradeableMessages = append(notUpgradeableMessages, "Unable to determine status of connected ODF Clients")
} else if count != 0 {
notUpgradeableReasons = append(notUpgradeableReasons, "ODFClients")
notUpgradeableMessages = append(notUpgradeableMessages, fmt.Sprintf("%d connected ODF Client Operators are not up to date", count))
}
}
if len(notUpgradeableMessages) > 0 {
// we are not upgradeable
returnErr = r.SetOperatorConditions(
strings.Join(notUpgradeableMessages, ";"), strings.Join(notUpgradeableReasons, ";"),
metav1.ConditionFalse, nil)
} else {
// we are upgradeable
returnErr = r.SetOperatorConditions(message, reason, metav1.ConditionTrue, nil)
}
if returnErr != nil {
return reconcile.Result{}, returnErr
}
}
} else {
// If any component operator reports negatively we want to write that to
Expand Down Expand Up @@ -838,3 +861,28 @@ func validateCustomStorageClassNames(sc *ocsv1.StorageCluster) error {

return nil
}

func getUnsupportedClientsCount(r *StorageClusterReconciler, namespace string) (int, error) {
scList := &ocsv1alpha1.StorageConsumerList{}
err := r.Client.List(r.ctx, scList, client.InNamespace(namespace))
if err != nil {
r.Log.Error(err, "Failed to list StorageConsumers")
return -1, err
}
var count int
providerVersion, _ := semver.Make(version.Version)
for idx := range scList.Items {
clientVersion, err := semver.Make(scList.Items[idx].Status.Client.OperatorVersion)
if err == nil {
// provider operator and client operator should be on same verison for full compatilibity

Check failure on line 877 in controllers/storagecluster/reconcile.go

View workflow job for this annotation

GitHub Actions / verify code spellings

verison ==> version

Check failure on line 877 in controllers/storagecluster/reconcile.go

View workflow job for this annotation

GitHub Actions / verify code spellings

compatilibity ==> compatibility
if providerVersion.Major != clientVersion.Major || providerVersion.Minor != clientVersion.Minor {
count++
}
} else {
r.Log.Error(err, "Failed to parse client operator version", "StorageConsumer", scList.Items[idx].GetName())
count++
}
}

return count, nil
}
41 changes: 12 additions & 29 deletions controllers/storagecluster/storagecluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
conditionsv1 "github.com/openshift/custom-resource-status/conditions/v1"
"github.com/operator-framework/operator-lib/conditions"
ocsv1 "github.com/red-hat-storage/ocs-operator/api/v4/v1"
ocsv1alpha1 "github.com/red-hat-storage/ocs-operator/api/v4/v1alpha1"
"github.com/red-hat-storage/ocs-operator/v4/controllers/util"
cephv1 "github.com/rook/rook/pkg/apis/ceph.rook.io/v1"
appsv1 "k8s.io/api/apps/v1"
Expand Down Expand Up @@ -154,35 +155,16 @@ func (r *StorageClusterReconciler) SetupWithManager(mgr ctrl.Manager) error {
},
)

enqueueFromStorageProfile := handler.EnqueueRequestsFromMapFunc(
func(_ context.Context, obj client.Object) []reconcile.Request {
// only storage profile is being watched
_ = obj.(*ocsv1.StorageProfile)

// Get the StorageCluster object
scList := &ocsv1.StorageClusterList{}
err := r.Client.List(r.ctx, scList, client.InNamespace(obj.GetNamespace()), client.Limit(1))
if err != nil {
r.Log.Error(err, "Unable to list StorageCluster objects")
return []reconcile.Request{}
}

if len(scList.Items) == 0 {
return []reconcile.Request{}
}

sc := scList.Items[0]
// Return name and namespace of StorageCluster
return []reconcile.Request{
{
NamespacedName: types.NamespacedName{
Name: sc.Name,
Namespace: sc.Namespace,
},
},
ocsClientOperatorVersionPredicate := predicate.Funcs{
UpdateFunc: func(e event.UpdateEvent) bool {
if e.ObjectOld == nil || e.ObjectNew == nil {
return false
}
oldObj := e.ObjectOld.(*ocsv1alpha1.StorageConsumer)
newObj := e.ObjectNew.(*ocsv1alpha1.StorageConsumer)
return oldObj.Status.Client.OperatorVersion != newObj.Status.Client.OperatorVersion
},
)
}

builder := ctrl.NewControllerManagedBy(mgr).
For(&ocsv1.StorageCluster{}, builder.WithPredicates(scPredicate)).
Expand All @@ -191,15 +173,16 @@ func (r *StorageClusterReconciler) SetupWithManager(mgr ctrl.Manager) error {
Owns(&appsv1.Deployment{}, builder.WithPredicates(predicate.GenerationChangedPredicate{})).
Owns(&corev1.Service{}, builder.WithPredicates(predicate.GenerationChangedPredicate{})).
Owns(&corev1.ConfigMap{}, builder.WithPredicates(predicate.GenerationChangedPredicate{})).
Watches(&ocsv1.StorageProfile{}, enqueueFromStorageProfile).
Watches(&ocsv1.StorageProfile{}, enqueueStorageClusterRequest).
Watches(
&extv1.CustomResourceDefinition{
ObjectMeta: metav1.ObjectMeta{
Name: "virtualmachines.kubevirt.io",
},
},
enqueueStorageClusterRequest,
)
).
Watches(&ocsv1alpha1.StorageConsumer{}, enqueueStorageClusterRequest, builder.WithPredicates(ocsClientOperatorVersionPredicate))
if os.Getenv("SKIP_NOOBAA_CRD_WATCH") != "true" {
builder.Owns(&nbv1.NooBaa{})
}
Expand Down

0 comments on commit a097232

Please sign in to comment.