diff --git a/.github/test.sh b/.github/test.sh index 5a6ec38..eecd676 100755 --- a/.github/test.sh +++ b/.github/test.sh @@ -25,8 +25,15 @@ while true do pod_status=$(kubectl get pods ${controller_pod} -n ${namespace} --no-headers -o custom-columns=":status.phase") if [[ "${pod_status}" == "Running" ]]; then - echo "Controller ${controller_pod} is ready" - break + echo "Controller ${controller_pod} is running" + + # They also need to be ready + ready=true + kubectl get pods -n ${namespace} ${controller_pod} | grep "2/2" || ready=false + if [[ "${ready}" == "true" ]]; then + echo "Controller ${controller_pod} containers are ready" + break + fi fi sleep 20 done @@ -99,7 +106,9 @@ check_output 'check-pod-deleted' "${pods_running}" "0" echo_run kubectl apply -f ./examples/job.yaml sleep 3 echo_run kubectl get pods - +pods_running=$(kubectl get pods -o json | jq -r '.items | length') +echo " Pods Running: ${pods_running}" +check_output 'check-pods-running' "${pods_running}" "2" # Check both job pods for pod in $(kubectl get pods -o json | jq -r .items[].metadata.name) @@ -120,4 +129,35 @@ echo_run kubectl delete -f ./examples/job.yaml sleep 2 pods_running=$(kubectl get pods -o json | jq -r '.items | length') echo " Pods Running: ${pods_running}" +check_output 'check-pod-deleted' "${pods_running}" "0" + + + +# Deployment +echo_run kubectl apply -f ./examples/deployment.yaml +sleep 5 +echo_run kubectl get pods +pods_running=$(kubectl get pods -o json | jq -r '.items | length') +echo " Pods Running: ${pods_running}" +check_output 'check-pods-running' "${pods_running}" "2" +echo_run kubectl logs -n ${namespace} ${controller_pod} -c manager + +# Check both job pods +for pod in $(kubectl get pods -o json | jq -r .items[].metadata.name) + do + echo "Checking deployment pod ${pod}" + scheduled_by=$(kubectl get pod ${pod} -o json | jq -r .spec.schedulerName) + pod_status=$(kubectl get pods ${pod} --no-headers -o custom-columns=":status.phase") + echo + echo " Pod Status: ${pod_status}" + echo " Scheduled by: ${scheduled_by}" + check_output 'check-pod-scheduled-by' "${scheduled_by}" "FluxionScheduler" + check_output 'check-pod-status' "${pod_status}" "Running" +done + +# Now delete the job - we are done! +echo_run kubectl delete -f ./examples/deployment.yaml +sleep 2 +pods_running=$(kubectl get pods -o json | jq -r '.items | length') +echo " Pods Running: ${pods_running}" check_output 'check-pod-deleted' "${pods_running}" "0" \ No newline at end of file diff --git a/README.md b/README.md index d8468cc..0c072d5 100644 --- a/README.md +++ b/README.md @@ -32,6 +32,7 @@ And we use `ghcr.io/converged-computing/fluxion` for the fluxion service. ### Choices - **Duration of job comes from Kubernetes** Right now, we don't allow a special or different duration to be given to Fluxion. Any duration or deletion needs to come from Kubernetes first, by way of an object deletion. Otherwise we would need to orchestrate deletion from the cluster and Fluxion, and it's easier to ask the user to delete with a job duration or other mechanism. +- **ungate** is done as a retryable task, the reason being that API operations to Kubernetes are not always reliable. ## Deploy @@ -233,7 +234,7 @@ SELECT * from reservations; ### TODO -- [ ] need to cleanup - handle FluxJob object so doesn't keep reconciling. Likely we want to delete at some point. +- [ ] Pod creation needs better orchestration - [ ] In the case of jobs that are changing (e.g., pods deleting, but we don't want to kill entire job) what should we do? - we need to use shrink here. And a shrink down to size 0 I assume is a cancel. - [ ] For cancel, we would issue a cancel for every pod associated with a job. How can we avoid that (or is that OK?) diff --git a/api/v1alpha1/fluxjob_enqueue.go b/api/v1alpha1/fluxjob_enqueue.go index b2652b1..064b0c5 100644 --- a/api/v1alpha1/fluxjob_enqueue.go +++ b/api/v1alpha1/fluxjob_enqueue.go @@ -2,6 +2,7 @@ package v1alpha1 import ( "context" + "fmt" "github.com/converged-computing/fluxqueue/pkg/defaults" appsv1 "k8s.io/api/apps/v1" @@ -64,6 +65,9 @@ func (a *jobReceiver) EnqueueJob(ctx context.Context, job *batchv1.Job) error { if job.Spec.Template.ObjectMeta.Labels == nil { job.Spec.Template.ObjectMeta.Labels = map[string]string{} } + if job.ObjectMeta.Labels == nil { + job.ObjectMeta.Labels = map[string]string{} + } // Cut out early if we are getting hit again _, ok := job.ObjectMeta.Labels[defaults.SeenLabel] @@ -102,6 +106,9 @@ func (a *jobReceiver) EnqueueDeployment(ctx context.Context, deployment *appsv1. if deployment.Spec.Template.ObjectMeta.Labels == nil { deployment.Spec.Template.ObjectMeta.Labels = map[string]string{} } + if deployment.ObjectMeta.Labels == nil { + deployment.ObjectMeta.Labels = map[string]string{} + } // Cut out early if we are getting hit again _, ok := deployment.ObjectMeta.Labels[defaults.SeenLabel] @@ -119,6 +126,10 @@ func (a *jobReceiver) EnqueueDeployment(ctx context.Context, deployment *appsv1. fluxqGate := corev1.PodSchedulingGate{Name: defaults.SchedulingGateName} deployment.Spec.Template.Spec.SchedulingGates = append(deployment.Spec.Template.Spec.SchedulingGates, fluxqGate) + // We will use this later as a selector to get pods associated with the deployment + selector := fmt.Sprintf("deployment-%s-%s", deployment.Name, deployment.Namespace) + deployment.Spec.Template.ObjectMeta.Labels[defaults.SelectorLabel] = selector + logger.Info("received deployment and gated pods", "Name", deployment.Name) return SubmitFluxJob( ctx, diff --git a/api/v1alpha1/fluxjob_webhook.go b/api/v1alpha1/fluxjob_webhook.go index 70f7fc1..7227eee 100644 --- a/api/v1alpha1/fluxjob_webhook.go +++ b/api/v1alpha1/fluxjob_webhook.go @@ -54,6 +54,14 @@ func (a *jobReceiver) Handle(ctx context.Context, req admission.Request) admissi return admission.Errored(http.StatusBadRequest, err) } + marshalledDeployment, tryNext, err := a.HandleDeployment(ctx, req) + if err == nil { + return admission.PatchResponseFromRaw(req.Object.Raw, marshalledDeployment) + } + if !tryNext { + return admission.Errored(http.StatusBadRequest, err) + } + marshalledPod, err := a.HandlePod(ctx, req) if err == nil { return admission.PatchResponseFromRaw(req.Object.Raw, marshalledPod) diff --git a/api/v1alpha1/fluxjob_webhook_test.go b/api/v1alpha1/fluxjob_webhook_test.go deleted file mode 100644 index 8adea87..0000000 --- a/api/v1alpha1/fluxjob_webhook_test.go +++ /dev/null @@ -1,33 +0,0 @@ -/* -Copyright 2025. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package v1alpha1 - -import ( - . "github.com/onsi/ginkgo/v2" -) - -var _ = Describe("FluxJob Webhook", func() { - - Context("When creating FluxJob under Defaulting Webhook", func() { - It("Should fill in the default value if a required field is empty", func() { - - // TODO(user): Add your logic here - - }) - }) - -}) diff --git a/dist/fluxqueue.yaml b/dist/fluxqueue.yaml index d184b6c..ea0e332 100644 --- a/dist/fluxqueue.yaml +++ b/dist/fluxqueue.yaml @@ -594,6 +594,7 @@ webhooks: - "" - core - batch + - apps apiVersions: - v1 operations: @@ -602,4 +603,5 @@ webhooks: resources: - pods - jobs + - deployments sideEffects: None diff --git a/examples/deployment.yaml b/examples/deployment.yaml new file mode 100644 index 0000000..0b4b90c --- /dev/null +++ b/examples/deployment.yaml @@ -0,0 +1,17 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: deployment +spec: + replicas: 2 + selector: + matchLabels: + app: deployment + template: + metadata: + labels: + app: deployment + spec: + containers: + - name: container + image: registry.k8s.io/pause:2.0 diff --git a/internal/controller/fluxjob_controller.go b/internal/controller/fluxjob_controller.go index 9e44eea..6f6eac0 100644 --- a/internal/controller/fluxjob_controller.go +++ b/internal/controller/fluxjob_controller.go @@ -112,9 +112,11 @@ func (r *FluxJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct rlog.Info("Found FluxJob", "Name", spec.Name, "Namespace", spec.Namespace, "Status", spec.Status.SubmitStatus) result := ctrl.Result{} - // If the job is already submit, continue + // If the job is already submit, delete it if spec.Status.SubmitStatus == api.SubmitStatusSubmit { - return result, nil + rlog.Info("Deleting FluxJob that is submit", "Name", spec.Name, "Namespace", spec.Namespace, "Status", spec.Status.SubmitStatus) + err = r.Client.Delete(ctx, &spec) + return result, err } // Submit the job to the queue - TODO if error, should delete? diff --git a/pkg/defaults/defaults.go b/pkg/defaults/defaults.go index e64a8da..91e4545 100644 --- a/pkg/defaults/defaults.go +++ b/pkg/defaults/defaults.go @@ -8,5 +8,6 @@ var ( FluxJobIdLabel = "fluxqueue/jobid" NodesLabel = "fluxqueue/fluxion-nodes" SeenLabel = "fluxqueue.seen" + SelectorLabel = "fluxqueue.selector" UnschedulableLabel = "fluxqueue/unschedulable" ) diff --git a/pkg/fluxqueue/fluxqueue.go b/pkg/fluxqueue/fluxqueue.go index 6a11c96..819154a 100644 --- a/pkg/fluxqueue/fluxqueue.go +++ b/pkg/fluxqueue/fluxqueue.go @@ -28,6 +28,7 @@ import ( const ( // IMPORTANT: must be one because fluxion is run single threaded queueMaxWorkers = 1 + taskMaxWorkers = 1 mutexLocked = 1 ) @@ -105,7 +106,13 @@ func NewQueue(ctx context.Context, cfg rest.Config) (*Queue, error) { river.QueueDefault: {MaxWorkers: queueMaxWorkers}, // Cleanup queue is typically for cancel + // Note that if this needs to be single threaded, + // it should be done in the default queue "cleanup_queue": {MaxWorkers: queueMaxWorkers}, + + // This is for Kubernetes tasks (ungate, etc) + // that don't need to be single threaded + "task_queue": {MaxWorkers: taskMaxWorkers}, }, Workers: workers, }) diff --git a/pkg/fluxqueue/strategy/easy.go b/pkg/fluxqueue/strategy/easy.go index ec3ca65..5dce6f7 100644 --- a/pkg/fluxqueue/strategy/easy.go +++ b/pkg/fluxqueue/strategy/easy.go @@ -47,6 +47,8 @@ type ReservationModel struct { // job worker: a queue to submit jobs to fluxion // cleanup worker: a queue to cleanup func (EasyBackfill) AddWorkers(workers *river.Workers, cfg rest.Config) error { + + // These workers are in the default (fluxion) queue with one worker jobWorker, err := work.NewJobWorker(cfg) if err != nil { return err @@ -59,9 +61,16 @@ func (EasyBackfill) AddWorkers(workers *river.Workers, cfg rest.Config) error { if err != nil { return err } + + // These workers can be run concurrently (>1 worker) + ungateWorker, err := work.NewUngateWorker(cfg) + if err != nil { + return err + } river.AddWorker(workers, jobWorker) river.AddWorker(workers, cleanupWorker) river.AddWorker(workers, reservationWorker) + river.AddWorker(workers, ungateWorker) return nil } diff --git a/pkg/fluxqueue/strategy/workers/job.go b/pkg/fluxqueue/strategy/workers/job.go index 1d7f372..baef987 100644 --- a/pkg/fluxqueue/strategy/workers/job.go +++ b/pkg/fluxqueue/strategy/workers/job.go @@ -8,6 +8,7 @@ import ( "strings" "time" + "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgxpool" "github.com/converged-computing/fluxion/pkg/client" @@ -123,7 +124,7 @@ func (w JobWorker) Work(ctx context.Context, job *river.Job[JobArgs]) error { wlog.Info("Fluxion allocation response", "Nodes", nodes) // Unsuspend the job or ungate the pods, adding the node assignments as labels for the scheduler - err = w.releaseJob(job.Args, fluxID, nodes) + err = w.releaseJob(ctx, job.Args, fluxID, nodes) if err != nil { return err } @@ -132,7 +133,7 @@ func (w JobWorker) Work(ctx context.Context, job *river.Job[JobArgs]) error { } // Release job will unsuspend a job or ungate pods to allow for scheduling -func (w JobWorker) releaseJob(args JobArgs, fluxID int64, nodes []string) error { +func (w JobWorker) releaseJob(ctx context.Context, args JobArgs, fluxID int64, nodes []string) error { var err error if args.Type == api.JobWrappedJob.String() { @@ -145,15 +146,8 @@ func (w JobWorker) releaseJob(args JobArgs, fluxID int64, nodes []string) error } wlog.Info("Success unsuspending job", "Namespace", args.Namespace, "Name", args.Name) - } else if args.Type == api.JobWrappedPod.String() { - - // Pod Type - err = w.ungatePod(args.Namespace, args.Name, nodes, fluxID) - if err != nil { - wlog.Info("Error ungating pod", "Namespace", args.Namespace, "Name", args.Name, "Error", err) - return err - } - wlog.Info("Success ungating pod", "Namespace", args.Namespace, "Name", args.Name) + } else if args.Type == api.JobWrappedDeployment.String() || args.Type == api.JobWrappedPod.String() { + w.ungatePod(ctx, args.Namespace, args.Name, args.Type, nodes, fluxID) } else { @@ -274,32 +268,38 @@ func (w JobWorker) rejectJob(namespace, name string) error { return patchUnsuspend(ctx, client, name, namespace) } -// Ungate the pod, adding an annotation for nodes along with the fluxion scheduler -func (w JobWorker) ungatePod(namespace, name string, nodes []string, fluxId int64) error { - ctx := context.Background() - - // Get the pod to update - client, err := kubernetes.NewForConfig(&w.RESTConfig) - if err != nil { - return err +// ungatePod submits jobs to ungate. We do this because Kubernetes isn't always reliable +// to get pods that we need via the API, or operations to patch, etc. +func (w JobWorker) ungatePod( + ctx context.Context, + namespace, name, jobType string, + nodes []string, + fluxId int64, +) error { + + // Create a job to ungate the deployment pods + riverClient := river.ClientFromContext[pgx.Tx](ctx) + insertOpts := river.InsertOpts{ + Tags: []string{"ungate"}, + Queue: "task_queue", } - // Convert jobid to string - jobid := fmt.Sprintf("%d", fluxId) - - // Patch the pod to add the nodes - nodesStr := strings.Join(nodes, "__") - payload := `{"metadata": {"labels": {"` + defaults.NodesLabel + `": "` + nodesStr + `", "` + defaults.FluxJobIdLabel + `": "` + jobid + `"}}}` - fmt.Println(payload) - _, err = client.CoreV1().Pods(namespace).Patch(ctx, name, patchTypes.MergePatchType, []byte(payload), metav1.PatchOptions{}) + ungateArgs := UngateArgs{ + Name: name, + Namespace: namespace, + Nodes: nodes, + JobID: fluxId, + Type: jobType, + } + _, err := riverClient.Insert(ctx, ungateArgs, &insertOpts) if err != nil { - return err + wlog.Info("Error inserting ungate job", "Namespace", namespace, "Name", name, "Error", err) } - return removeGate(ctx, client, namespace, name) + return err } // removeGate removes the scheduling gate from the pod func removeGate(ctx context.Context, client *kubernetes.Clientset, namespace, name string) error { - pod, err := client.CoreV1().Pods(namespace).Get(context.TODO(), name, metav1.GetOptions{}) + pod, err := client.CoreV1().Pods(namespace).Get(ctx, name, metav1.GetOptions{}) if err != nil { return err } diff --git a/pkg/fluxqueue/strategy/workers/ungate.go b/pkg/fluxqueue/strategy/workers/ungate.go new file mode 100644 index 0000000..487138f --- /dev/null +++ b/pkg/fluxqueue/strategy/workers/ungate.go @@ -0,0 +1,128 @@ +package workers + +import ( + "context" + "fmt" + "strings" + + api "github.com/converged-computing/fluxqueue/api/v1alpha1" + "github.com/converged-computing/fluxqueue/pkg/defaults" + "github.com/riverqueue/river" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + patchTypes "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" +) + +// Ungate workers explicitly ungate pods, and add node labels +func (args UngateArgs) Kind() string { return "ungate" } + +type UngateWorker struct { + river.WorkerDefaults[UngateArgs] + RESTConfig rest.Config +} + +// NewJobWorker returns a new job worker with a Fluxion client +func NewUngateWorker(cfg rest.Config) (*UngateWorker, error) { + worker := UngateWorker{RESTConfig: cfg} + return &worker, nil +} + +// JobArgs serializes a postgres row back into fields for the FluxJob +// We add extra fields to anticipate getting node assignments +type UngateArgs struct { + Name string `json:"name"` + Type string `json:"type"` + Namespace string `json:"namespace"` + Nodes []string `json:"nodes"` + JobID int64 `json:"jobid"` +} + +// Ungate a specific pod for a group (e.g., deployment) +// Right now we aren't using this for single pods (but can/will) +func (w UngateWorker) Work(ctx context.Context, job *river.Job[UngateArgs]) error { + + var err error + wlog.Info("Running ungate worker", "Namespace", job.Args.Namespace, "Name", job.Args.Name) + jobid := fmt.Sprintf("%d", job.Args.JobID) + + client, err := kubernetes.NewForConfig(&w.RESTConfig) + if err != nil { + wlog.Info("Error getting Kubernetes client", "Namespace", job.Args.Namespace, "Name", job.Args.Name, "Error", err) + return err + } + + // Ungate single pod (should only be one) + if job.Args.Type == api.JobWrappedPod.String() { + nodesStr := strings.Join(job.Args.Nodes, "__") + payload := `{"metadata": {"labels": {"` + defaults.NodesLabel + `": "` + nodesStr + `", "` + defaults.FluxJobIdLabel + `": "` + jobid + `"}}}` + _, err = client.CoreV1().Pods(job.Args.Namespace).Patch(ctx, job.Args.Name, patchTypes.MergePatchType, []byte(payload), metav1.PatchOptions{}) + if err != nil { + return err + } + err = removeGate(ctx, client, job.Args.Namespace, job.Args.Name) + if err != nil { + wlog.Info("Error in removing single pod", "Error", err) + return err + } + } + + // For a deployment, we need to get the pods based on a selector + if job.Args.Type == api.JobWrappedDeployment.String() { + selector := fmt.Sprintf("%s=deployment-%s-%s", defaults.SelectorLabel, job.Args.Name, job.Args.Namespace) + + // 4. Get pods in the default namespace + pods, err := client.CoreV1().Pods(job.Args.Namespace).List(ctx, metav1.ListOptions{ + LabelSelector: selector, + }) + wlog.Info("Selector returned pods for nodes", "Pods", len(pods.Items), "Nodes", len(job.Args.Nodes)) + + if err != nil { + wlog.Info("Error listing pods in ungate worker", "Namespace", job.Args.Namespace, "Name", job.Args.Name, "Error", err) + return err + } + // Ungate as many as we are able + for i, pod := range pods.Items { + + // This shouldn't happen + if i >= len(pods.Items) { + wlog.Info("Warning - we have more pods than nodes") + break + } + + // We should not try to ungate (and assign a node) to a pod that + // already has been ungated + ungated := true + if pod.Spec.SchedulingGates != nil { + for _, gate := range pod.Spec.SchedulingGates { + if gate.Name == defaults.SchedulingGateName { + ungated = false + break + } + } + } + if ungated { + continue + } + payload := `{"metadata": {"labels": {"` + defaults.NodesLabel + `": "` + job.Args.Nodes[i] + `", "` + defaults.FluxJobIdLabel + `": "` + jobid + `"}}}` + _, err = client.CoreV1().Pods(job.Args.Namespace).Patch(ctx, pod.ObjectMeta.Name, patchTypes.MergePatchType, []byte(payload), metav1.PatchOptions{}) + if err != nil { + wlog.Info("Error in patching deployment pod", "Error", err) + return err + } + err = removeGate(ctx, client, pod.ObjectMeta.Namespace, pod.ObjectMeta.Name) + if err != nil { + wlog.Info("Error in removing deployment pod gate", "Error", err) + return err + } + } + + // Kubernetes has not created the pod objects yet + // Returning an error will have it run again, with a delay + // https://riverqueue.com/docs/job-retries + if len(pods.Items) < len(job.Args.Nodes) { + return fmt.Errorf("ungate pods job did not have all pods") + } + } + return err +}