Skip to content

Commit

Permalink
Merge pull request #1830 from bertinatto/try-apply-workload
Browse files Browse the repository at this point in the history
API-1835: Migrate WorkloadController to SSA
  • Loading branch information
openshift-merge-bot[bot] authored Oct 21, 2024
2 parents c31b2ce + ed9ab11 commit 4c5ecb3
Show file tree
Hide file tree
Showing 2 changed files with 108 additions and 83 deletions.
179 changes: 100 additions & 79 deletions pkg/operator/apiserver/controller/workload/workload.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ import (

operatorv1 "github.com/openshift/api/operator/v1"
openshiftconfigclientv1 "github.com/openshift/client-go/config/clientset/versioned/typed/config/v1"
applyoperatorv1 "github.com/openshift/client-go/operator/applyconfigurations/operator/v1"
"github.com/openshift/library-go/pkg/apps/deployment"
"github.com/openshift/library-go/pkg/controller/factory"
"github.com/openshift/library-go/pkg/operator/events"
"github.com/openshift/library-go/pkg/operator/resource/resourcemerge"
"github.com/openshift/library-go/pkg/operator/status"
"github.com/openshift/library-go/pkg/operator/v1helpers"
)
Expand All @@ -49,6 +49,7 @@ type Delegate interface {
// Callers must provide a sync function for delegation. It should bring the desired workload into operation.
// The returned state along with errors will be converted into conditions and persisted in the status field.
type Controller struct {
controllerInstanceName string
// conditionsPrefix an optional prefix that will be used as operator's condition type field for example APIServerDeploymentDegraded where APIServer indicates the prefix
conditionsPrefix string
operatorNamespace string
Expand All @@ -71,13 +72,13 @@ type Controller struct {

// NewController creates a brand new Controller instance.
//
// the "name" param will be used to set conditions in the status field. It will be suffixed with "WorkloadController",
// the "instanceName" param will be used to set conditions in the status field. It will be suffixed with "WorkloadController",
// so it can end up in the condition in the form of "OAuthAPIWorkloadControllerDeploymentAvailable"
//
// the "operatorNamespace" is used to set "version-mapping" in the correct namespace
//
// the "targetNamespace" represent the namespace for the managed resource (DaemonSet)
func NewController(name, operatorNamespace, targetNamespace, targetOperandVersion, operandNamePrefix, conditionsPrefix string,
func NewController(instanceName, operatorNamespace, targetNamespace, targetOperandVersion, operandNamePrefix, conditionsPrefix string,
operatorClient v1helpers.OperatorClient,
kubeClient kubernetes.Interface,
podLister corev1listers.PodLister,
Expand All @@ -89,6 +90,7 @@ func NewController(name, operatorNamespace, targetNamespace, targetOperandVersio
versionRecorder status.VersionGetter,
) factory.Controller {
controllerRef := &Controller{
controllerInstanceName: factory.ControllerInstanceName(instanceName, "Workload"),
operatorNamespace: operatorNamespace,
targetNamespace: targetNamespace,
targetOperandVersion: targetOperandVersion,
Expand All @@ -100,7 +102,7 @@ func NewController(name, operatorNamespace, targetNamespace, targetOperandVersio
delegate: delegate,
openshiftClusterConfigClient: openshiftClusterConfigClient,
versionRecorder: versionRecorder,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), name),
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), instanceName),
}

c := factory.New()
Expand All @@ -111,7 +113,7 @@ func NewController(name, operatorNamespace, targetNamespace, targetOperandVersio
return c.WithSync(controllerRef.sync).
WithInformers(informers...).
ToController(
fmt.Sprintf("%sWorkloadController", name), // don't change what is passed here unless you also remove the old FooDegraded condition
fmt.Sprintf("%sWorkloadController", controllerRef.controllerInstanceName),
eventRecorder,
)
}
Expand Down Expand Up @@ -161,40 +163,40 @@ func (c *Controller) updateOperatorStatus(ctx context.Context, previousStatus *o
errs = []error{}
}

deploymentAvailableCondition := operatorv1.OperatorCondition{
Type: fmt.Sprintf("%sDeployment%s", c.conditionsPrefix, operatorv1.OperatorStatusTypeAvailable),
Status: operatorv1.ConditionTrue,
}
deploymentAvailableCondition := applyoperatorv1.OperatorCondition().
WithType(fmt.Sprintf("%sDeployment%s", c.conditionsPrefix, operatorv1.OperatorStatusTypeAvailable))

workloadDegradedCondition := operatorv1.OperatorCondition{
Type: fmt.Sprintf("%sWorkloadDegraded", c.conditionsPrefix),
Status: operatorv1.ConditionFalse,
}
workloadDegradedCondition := applyoperatorv1.OperatorCondition().
WithType(fmt.Sprintf("%sWorkloadDegraded", c.conditionsPrefix))

deploymentDegradedCondition := operatorv1.OperatorCondition{
Type: fmt.Sprintf("%sDeploymentDegraded", c.conditionsPrefix),
Status: operatorv1.ConditionFalse,
}
deploymentDegradedCondition := applyoperatorv1.OperatorCondition().
WithType(fmt.Sprintf("%sDeploymentDegraded", c.conditionsPrefix))

deploymentProgressingCondition := operatorv1.OperatorCondition{
Type: fmt.Sprintf("%sDeployment%s", c.conditionsPrefix, operatorv1.OperatorStatusTypeProgressing),
Status: operatorv1.ConditionFalse,
deploymentProgressingCondition := applyoperatorv1.OperatorCondition().
WithType(fmt.Sprintf("%sDeployment%s", c.conditionsPrefix, operatorv1.OperatorStatusTypeProgressing))

status := applyoperatorv1.OperatorStatus()
if workload != nil {
// The Hash field is not required since the LastGeneration field is enough to uniquely identify a Deployment's desired state
status = status.WithGenerations(applyoperatorv1.GenerationStatus().
WithGroup("apps").
WithResource("deployments").
WithNamespace(workload.Namespace).
WithName(workload.Name).
WithLastGeneration(workload.Generation),
)
}

// only set updateGenerationFn to update the observed generation if everything is available
var updateGenerationFn func(newStatus *operatorv1.OperatorStatus) error
defer func() {
updates := []v1helpers.UpdateStatusFunc{
v1helpers.UpdateConditionFn(deploymentAvailableCondition),
v1helpers.UpdateConditionFn(deploymentDegradedCondition),
v1helpers.UpdateConditionFn(deploymentProgressingCondition),
v1helpers.UpdateConditionFn(workloadDegradedCondition),
}
if updateGenerationFn != nil {
updates = append(updates, updateGenerationFn)
}
if _, _, updateError := v1helpers.UpdateStatus(ctx, c.operatorClient, updates...); updateError != nil {
err = updateError
status = status.WithConditions(
deploymentAvailableCondition,
deploymentDegradedCondition,
deploymentProgressingCondition,
workloadDegradedCondition,
)

if applyError := c.operatorClient.ApplyOperatorStatus(ctx, c.controllerInstanceName, status); applyError != nil {
err = applyError
}
}()

Expand All @@ -209,15 +211,23 @@ func (c *Controller) updateOperatorStatus(ctx context.Context, previousStatus *o

// we are degraded, not available and we are not progressing

deploymentDegradedCondition.Status = operatorv1.ConditionTrue
deploymentDegradedCondition.Reason = "PreconditionNotFulfilled"
deploymentDegradedCondition.Message = message
deploymentDegradedCondition = deploymentDegradedCondition.
WithStatus(operatorv1.ConditionTrue).
WithReason("PreconditionNotFulfilled").
WithMessage(message)

deploymentAvailableCondition.Status = operatorv1.ConditionFalse
deploymentAvailableCondition.Reason = "PreconditionNotFulfilled"
deploymentAvailableCondition = deploymentAvailableCondition.
WithStatus(operatorv1.ConditionFalse).
WithReason("PreconditionNotFulfilled")

deploymentProgressingCondition.Status = operatorv1.ConditionFalse
deploymentProgressingCondition.Reason = "PreconditionNotFulfilled"
deploymentProgressingCondition = deploymentProgressingCondition.
WithStatus(operatorv1.ConditionFalse).
WithReason("PreconditionNotFulfilled")

workloadDegradedCondition = workloadDegradedCondition.
WithStatus(operatorv1.ConditionTrue).
WithReason("PreconditionNotFulfilled").
WithMessage(message)

return kerrors.NewAggregate(errs)
}
Expand All @@ -227,37 +237,49 @@ func (c *Controller) updateOperatorStatus(ctx context.Context, previousStatus *o
for _, err := range errs {
message = message + err.Error() + "\n"
}
workloadDegradedCondition.Status = operatorv1.ConditionTrue
workloadDegradedCondition.Reason = "SyncError"
workloadDegradedCondition.Message = message
workloadDegradedCondition = workloadDegradedCondition.
WithStatus(operatorv1.ConditionTrue).
WithReason("SyncError").
WithMessage(message)
} else if workload == nil {
workloadDegradedCondition = workloadDegradedCondition.
WithStatus(operatorv1.ConditionTrue).
WithReason("NoDeployment").
WithMessage(fmt.Sprintf("deployment/%s: could not be retrieved", c.targetNamespace))
} else {
workloadDegradedCondition.Status = operatorv1.ConditionFalse
workloadDegradedCondition = workloadDegradedCondition.
WithStatus(operatorv1.ConditionFalse)
}

if workload == nil {
message := fmt.Sprintf("deployment/%s: could not be retrieved", c.targetNamespace)
deploymentAvailableCondition.Status = operatorv1.ConditionFalse
deploymentAvailableCondition.Reason = "NoDeployment"
deploymentAvailableCondition.Message = message
deploymentAvailableCondition = deploymentAvailableCondition.
WithStatus(operatorv1.ConditionFalse).
WithReason("NoDeployment").
WithMessage(message)

deploymentProgressingCondition.Status = operatorv1.ConditionTrue
deploymentProgressingCondition.Reason = "NoDeployment"
deploymentProgressingCondition.Message = message
deploymentProgressingCondition = deploymentProgressingCondition.
WithStatus(operatorv1.ConditionTrue).
WithReason("NoDeployment").
WithMessage(message)

deploymentDegradedCondition.Status = operatorv1.ConditionTrue
deploymentDegradedCondition.Reason = "NoDeployment"
deploymentDegradedCondition.Message = message
deploymentDegradedCondition = deploymentDegradedCondition.
WithStatus(operatorv1.ConditionTrue).
WithReason("NoDeployment").
WithMessage(message)

return kerrors.NewAggregate(errs)
}

if workload.Status.AvailableReplicas == 0 {
deploymentAvailableCondition.Status = operatorv1.ConditionFalse
deploymentAvailableCondition.Reason = "NoPod"
deploymentAvailableCondition.Message = fmt.Sprintf("no %s.%s pods available on any node.", workload.Name, c.targetNamespace)
deploymentAvailableCondition = deploymentAvailableCondition.
WithStatus(operatorv1.ConditionFalse).
WithReason("NoPod").
WithMessage(fmt.Sprintf("no %s.%s pods available on any node.", workload.Name, c.targetNamespace))
} else {
deploymentAvailableCondition.Status = operatorv1.ConditionTrue
deploymentAvailableCondition.Reason = "AsExpected"
deploymentAvailableCondition = deploymentAvailableCondition.
WithStatus(operatorv1.ConditionTrue).
WithReason("AsExpected")
}

desiredReplicas := int32(1)
Expand All @@ -268,18 +290,21 @@ func (c *Controller) updateOperatorStatus(ctx context.Context, previousStatus *o
// If the workload is up to date, then we are no longer progressing
workloadAtHighestGeneration := workload.ObjectMeta.Generation == workload.Status.ObservedGeneration
workloadIsBeingUpdated := workload.Status.UpdatedReplicas < desiredReplicas
workloadIsBeingUpdatedTooLong, err := isUpdatingTooLong(previousStatus, deploymentProgressingCondition.Type)
workloadIsBeingUpdatedTooLong, err := isUpdatingTooLong(previousStatus, *deploymentProgressingCondition.Type)
if !workloadAtHighestGeneration {
deploymentProgressingCondition.Status = operatorv1.ConditionTrue
deploymentProgressingCondition.Reason = "NewGeneration"
deploymentProgressingCondition.Message = fmt.Sprintf("deployment/%s.%s: observed generation is %d, desired generation is %d.", workload.Name, c.targetNamespace, workload.Status.ObservedGeneration, workload.ObjectMeta.Generation)
deploymentProgressingCondition = deploymentProgressingCondition.
WithStatus(operatorv1.ConditionTrue).
WithReason("NewGeneration").
WithMessage(fmt.Sprintf("deployment/%s.%s: observed generation is %d, desired generation is %d.", workload.Name, c.targetNamespace, workload.Status.ObservedGeneration, workload.ObjectMeta.Generation))
} else if workloadIsBeingUpdated {
deploymentProgressingCondition.Status = operatorv1.ConditionTrue
deploymentProgressingCondition.Reason = "PodsUpdating"
deploymentProgressingCondition.Message = fmt.Sprintf("deployment/%s.%s: %d/%d pods have been updated to the latest generation", workload.Name, c.targetNamespace, workload.Status.UpdatedReplicas, desiredReplicas)
deploymentProgressingCondition = deploymentProgressingCondition.
WithStatus(operatorv1.ConditionTrue).
WithReason("PodsUpdating").
WithMessage(fmt.Sprintf("deployment/%s.%s: %d/%d pods have been updated to the latest generation", workload.Name, c.targetNamespace, workload.Status.UpdatedReplicas, desiredReplicas))
} else {
deploymentProgressingCondition.Status = operatorv1.ConditionFalse
deploymentProgressingCondition.Reason = "AsExpected"
deploymentProgressingCondition = deploymentProgressingCondition.
WithStatus(operatorv1.ConditionFalse).
WithReason("AsExpected")
}

// During a rollout the default maxSurge (25%) will allow the available
Expand All @@ -288,17 +313,19 @@ func (c *Controller) updateOperatorStatus(ctx context.Context, previousStatus *o
workloadHasAllPodsAvailable := workload.Status.AvailableReplicas >= desiredReplicas
if !workloadHasAllPodsAvailable && (!workloadIsBeingUpdated || workloadIsBeingUpdatedTooLong) {
numNonAvailablePods := desiredReplicas - workload.Status.AvailableReplicas
deploymentDegradedCondition.Status = operatorv1.ConditionTrue
deploymentDegradedCondition.Reason = "UnavailablePod"
deploymentDegradedCondition = deploymentDegradedCondition.
WithStatus(operatorv1.ConditionTrue).
WithReason("UnavailablePod")
podContainersStatus, err := deployment.PodContainersStatus(workload, c.podsLister)
if err != nil {
podContainersStatus = []string{fmt.Sprintf("failed to get pod containers details: %v", err)}
}
deploymentDegradedCondition.Message = fmt.Sprintf("%v of %v requested instances are unavailable for %s.%s (%s)", numNonAvailablePods, desiredReplicas, workload.Name, c.targetNamespace,
strings.Join(podContainersStatus, ", "))
deploymentDegradedCondition = deploymentDegradedCondition.
WithMessage(fmt.Sprintf("%v of %v requested instances are unavailable for %s.%s (%s)", numNonAvailablePods, desiredReplicas, workload.Name, c.targetNamespace, strings.Join(podContainersStatus, ", ")))
} else {
deploymentDegradedCondition.Status = operatorv1.ConditionFalse
deploymentDegradedCondition.Reason = "AsExpected"
deploymentDegradedCondition = deploymentDegradedCondition.
WithStatus(operatorv1.ConditionFalse).
WithReason("AsExpected")
}

// if the deployment is all available and at the expected generation, then update the version to the latest
Expand All @@ -313,12 +340,6 @@ func (c *Controller) updateOperatorStatus(ctx context.Context, previousStatus *o
c.versionRecorder.SetVersion(operandName, c.targetOperandVersion)
}

// set updateGenerationFn so that it is invoked in defer
updateGenerationFn = func(newStatus *operatorv1.OperatorStatus) error {
resourcemerge.SetDeploymentGeneration(&newStatus.Generations, workload)
return nil
}

if len(errs) > 0 {
return kerrors.NewAggregate(errs)
}
Expand Down
12 changes: 8 additions & 4 deletions pkg/operator/apiserver/controller/workload/workload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,10 @@ func TestUpdateOperatorStatus(t *testing.T) {
Message: "deployment/: could not be retrieved",
},
{
Type: fmt.Sprintf("%sWorkloadDegraded", defaultControllerName),
Status: operatorv1.ConditionFalse,
Type: fmt.Sprintf("%sWorkloadDegraded", defaultControllerName),
Status: operatorv1.ConditionTrue,
Reason: "NoDeployment",
Message: "deployment/: could not be retrieved",
},
{
Type: fmt.Sprintf("%sDeploymentDegraded", defaultControllerName),
Expand Down Expand Up @@ -441,8 +443,10 @@ func TestUpdateOperatorStatus(t *testing.T) {
Message: "",
},
{
Type: fmt.Sprintf("%sWorkloadDegraded", defaultControllerName),
Status: operatorv1.ConditionFalse,
Type: fmt.Sprintf("%sWorkloadDegraded", defaultControllerName),
Status: operatorv1.ConditionTrue,
Reason: "PreconditionNotFulfilled",
Message: "the operator didn't specify what preconditions are missing",
},
}
return areCondidtionsEqual(expectedConditions, actualStatus.Conditions)
Expand Down

0 comments on commit 4c5ecb3

Please sign in to comment.