Skip to content

Commit

Permalink
Update logic for workflows
Browse files Browse the repository at this point in the history
There was a race condition in the previous implementation of the
workflows that was caused by the dependency of the workflow feature
on a ConfigMap that served as counter.

This commit simplifies the logic of the Reconcile loop in two ways:

  1. Removes the dependency for an external workflow counter

  2. Introducex a NextAction function which decides what next action
     should be performed based on the current state of the OpenShift
     cluster. The input of this function is the instance which is
     currently being processed and a workflowLength. Using these two
     arguments it then tells the Reconcile loop which actions it
     should perform: Wait, CreateFirstJob, CreateNextJob, EndTesting,
     Failure.

This approach should be simplify the logic and move the test-operator
towards a unified Reconcile loop.
  • Loading branch information
lpiwowar committed Nov 1, 2024
1 parent eae3554 commit 379e406
Show file tree
Hide file tree
Showing 5 changed files with 339 additions and 308 deletions.
118 changes: 50 additions & 68 deletions controllers/ansibletest_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ package controllers

import (
"context"
"errors"
"fmt"
"strconv"
"time"

Expand All @@ -37,7 +39,6 @@ import (
corev1 "k8s.io/api/core/v1"
k8s_errors "k8s.io/apimachinery/pkg/api/errors"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"
)

Expand Down Expand Up @@ -68,9 +69,6 @@ func (r *AnsibleTestReconciler) GetLogger(ctx context.Context) logr.Logger {
func (r *AnsibleTestReconciler) Reconcile(ctx context.Context, req ctrl.Request) (result ctrl.Result, _err error) {
Log := r.GetLogger(ctx)

// How much time should we wait before calling Reconcile loop when there is a failure
requeueAfter := time.Second * 60

// Fetch the ansible instance
instance := &testv1beta1.AnsibleTest{}
err := r.Client.Get(ctx, req.NamespacedName, instance)
Expand All @@ -81,11 +79,6 @@ func (r *AnsibleTestReconciler) Reconcile(ctx context.Context, req ctrl.Request)
return ctrl.Result{}, err
}

workflowActive := false
if len(instance.Spec.Workflow) > 0 {
workflowActive = true
}

// Create a helper
helper, err := helper.NewHelper(
instance,
Expand Down Expand Up @@ -142,40 +135,51 @@ func (r *AnsibleTestReconciler) Reconcile(ctx context.Context, req ctrl.Request)

}

// Ensure that there is an external counter and read its value
// We use the external counter to keep track of the workflow steps
r.WorkflowStepCounterCreate(ctx, instance, helper)
externalWorkflowCounter := r.WorkflowStepCounterRead(ctx, instance)
if externalWorkflowCounter == -1 {
return ctrl.Result{RequeueAfter: requeueAfter}, nil
}
workflowLength := len(instance.Spec.Workflow)
nextAction, nextWorkflowStep, err := r.NextAction(ctx, instance, workflowLength)

// Each job that is being executed by the test operator has
currentWorkflowStep := 0
runningAnsibleJob := &batchv1.Job{}
runningJobName := r.GetJobName(instance, externalWorkflowCounter-1)
err = r.Client.Get(ctx, client.ObjectKey{Namespace: instance.GetNamespace(), Name: runningJobName}, runningAnsibleJob)
if err != nil && !k8s_errors.IsNotFound(err) {
switch nextAction {
case Failure:
return ctrl.Result{}, err
} else if err == nil {
currentWorkflowStep, _ = strconv.Atoi(runningAnsibleJob.Labels["workflowStep"])
}

if r.CompletedJobExists(ctx, instance, currentWorkflowStep) {
// The job created by the instance was completed. Release the lock
// so that other instances can spawn a job.
instance.Status.Conditions.MarkTrue(condition.DeploymentReadyCondition, condition.DeploymentReadyMessage)
Log.Info("Job completed")
if lockReleased, err := r.ReleaseLock(ctx, instance); !lockReleased {
return ctrl.Result{}, err
case Wait:
Log.Info(InfoWaitingOnJob)
return ctrl.Result{RequeueAfter: RequeueAfterValue}, nil

case EndTesting:
if lockReleased, _ := r.ReleaseLock(ctx, instance); !lockReleased {
Log.Info(fmt.Sprintf(InfoCanNotAcquireLock, testOperatorLockName))
return ctrl.Result{RequeueAfter: RequeueAfterValue}, nil
}

instance.Status.Conditions.MarkTrue(
condition.DeploymentReadyCondition,
condition.DeploymentReadyMessage)

Log.Info(InfoTestingCompleted)
return ctrl.Result{}, nil

case CreateFirstJob:
lockAcquired, _ := r.AcquireLock(ctx, instance, helper, false)
if !lockAcquired {
Log.Info(fmt.Sprintf(InfoCanNotAcquireLock, testOperatorLockName))
return ctrl.Result{RequeueAfter: RequeueAfterValue}, nil
}

Log.Info(fmt.Sprintf(InfoCreatingFirstPod, nextWorkflowStep))

case CreateNextJob:
Log.Info(fmt.Sprintf(InfoCreatingNextPod, nextWorkflowStep))

default:
return ctrl.Result{}, errors.New(ErrReceivedUnexpectedAction)
}

serviceLabels := map[string]string{
common.AppSelector: ansibletest.ServiceName,
"workflowStep": strconv.Itoa(externalWorkflowCounter),
"instanceName": instance.Name,
"operator": "test-operator",
workflowStepLabel: strconv.Itoa(nextWorkflowStep),
instanceNameLabel: instance.Name,
operatorNameLabel: "test-operator",
}

// Create PersistentVolumeClaim
Expand All @@ -194,51 +198,30 @@ func (r *AnsibleTestReconciler) Reconcile(ctx context.Context, req ctrl.Request)
}
// Create PersistentVolumeClaim - end

// If the current job is executing the last workflow step -> do not create another job
if workflowActive && externalWorkflowCounter >= len(instance.Spec.Workflow) {
return ctrl.Result{}, nil
} else if !workflowActive && r.JobExists(ctx, instance, currentWorkflowStep) {
return ctrl.Result{}, nil
}

// We are about to start job that spawns the pod with tests.
// This lock ensures that there is always only one pod running.
lockAcquired, err := r.AcquireLock(ctx, instance, helper, false)
if !lockAcquired {
Log.Info("Can not acquire lock")
requeueAfter := time.Second * 60
return ctrl.Result{RequeueAfter: requeueAfter}, err
}
Log.Info("Lock acquired")

if workflowActive {
r.WorkflowStepCounterIncrease(ctx, instance, helper)
}

instance.Status.Conditions.MarkTrue(condition.ServiceConfigReadyCondition, condition.ServiceConfigReadyMessage)

// Create a new job
mountCerts := r.CheckSecretExists(ctx, instance, "combined-ca-bundle")
jobName := r.GetJobName(instance, externalWorkflowCounter)
envVars, workflowOverrideParams := r.PrepareAnsibleEnv(instance, externalWorkflowCounter)
jobName := r.GetJobName(instance, nextWorkflowStep)
envVars, workflowOverrideParams := r.PrepareAnsibleEnv(instance, nextWorkflowStep)
logsPVCName := r.GetPVCLogsName(instance, 0)
containerImage, err := r.GetContainerImage(ctx, workflowOverrideParams["ContainerImage"], instance)
privileged := r.OverwriteAnsibleWithWorkflow(instance.Spec, "Privileged", "pbool", externalWorkflowCounter).(bool)
privileged := r.OverwriteAnsibleWithWorkflow(instance.Spec, "Privileged", "pbool", nextWorkflowStep).(bool)
if err != nil {
return ctrl.Result{}, err
}

if externalWorkflowCounter < len(instance.Spec.Workflow) {
if instance.Spec.Workflow[externalWorkflowCounter].NodeSelector != nil {
instance.Spec.NodeSelector = *instance.Spec.Workflow[externalWorkflowCounter].NodeSelector
if nextWorkflowStep < len(instance.Spec.Workflow) {
if instance.Spec.Workflow[nextWorkflowStep].NodeSelector != nil {
instance.Spec.NodeSelector = *instance.Spec.Workflow[nextWorkflowStep].NodeSelector
}

if instance.Spec.Workflow[externalWorkflowCounter].Tolerations != nil {
instance.Spec.Tolerations = *instance.Spec.Workflow[externalWorkflowCounter].Tolerations
if instance.Spec.Workflow[nextWorkflowStep].Tolerations != nil {
instance.Spec.Tolerations = *instance.Spec.Workflow[nextWorkflowStep].Tolerations
}

if instance.Spec.Workflow[externalWorkflowCounter].SELinuxLevel != nil {
instance.Spec.SELinuxLevel = *instance.Spec.Workflow[externalWorkflowCounter].SELinuxLevel
if instance.Spec.Workflow[nextWorkflowStep].SELinuxLevel != nil {
instance.Spec.SELinuxLevel = *instance.Spec.Workflow[nextWorkflowStep].SELinuxLevel
}
}

Expand All @@ -260,7 +243,7 @@ func (r *AnsibleTestReconciler) Reconcile(ctx context.Context, req ctrl.Request)
mountCerts,
envVars,
workflowOverrideParams,
externalWorkflowCounter,
nextWorkflowStep,
containerImage,
privileged,
)
Expand Down Expand Up @@ -297,7 +280,6 @@ func (r *AnsibleTestReconciler) Reconcile(ctx context.Context, req ctrl.Request)
return ctrlResult, nil
}
// Create a new job - end

Log.Info("Reconciled Service successfully")
return ctrl.Result{}, nil
}
Expand Down
Loading

0 comments on commit 379e406

Please sign in to comment.