From 1b890104331318f2bdc559545ca5f6338b233205 Mon Sep 17 00:00:00 2001 From: Kevin Hannon Date: Thu, 18 Apr 2024 15:46:26 -0400 Subject: [PATCH 1/2] support elastic jobset --- pkg/controllers/elastic_jobset.go | 53 +++++ pkg/controllers/elastic_jobset_test.go | 212 +++++++++++++++++ pkg/controllers/jobset_controller.go | 23 ++ pkg/webhooks/jobset_webhook.go | 34 ++- pkg/webhooks/jobset_webhook_test.go | 215 +++++++++++++++++- site/content/en/docs/concepts/_index.md | 9 + test/e2e/e2e_test.go | 49 +++- .../controller/jobset_controller_test.go | 65 ++++++ test/util/util.go | 17 ++ 9 files changed, 661 insertions(+), 16 deletions(-) create mode 100644 pkg/controllers/elastic_jobset.go create mode 100644 pkg/controllers/elastic_jobset_test.go diff --git a/pkg/controllers/elastic_jobset.go b/pkg/controllers/elastic_jobset.go new file mode 100644 index 000000000..495a4cc19 --- /dev/null +++ b/pkg/controllers/elastic_jobset.go @@ -0,0 +1,53 @@ +/* +Copyright 2024 The Kubernetes Authors. +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controllers + +import ( + "fmt" + "strconv" + + batchv1 "k8s.io/api/batch/v1" + + jobset "sigs.k8s.io/jobset/api/jobset/v1alpha2" +) + +// jobsToDeleteDownScale gathers the excess jobs during a downscale +// and deletes the jobs +func jobsToDeleteDownScale(replicatedJobs []jobset.ReplicatedJob, replicatedJobStatus []jobset.ReplicatedJobStatus, jobItems []batchv1.Job) ([]*batchv1.Job, error) { + jobsToDelete := []*batchv1.Job{} + for _, replicatedJob := range replicatedJobs { + status := findReplicatedJobStatus(replicatedJobStatus, replicatedJob.Name) + countOfJobsToDelete := status.Ready - replicatedJob.Replicas + if countOfJobsToDelete > 0 { + jobsWeDeleted := 0 + for _, val := range jobItems { + if val.Labels[jobset.ReplicatedJobNameKey] != replicatedJob.Name { + continue + } + jobIndex, err := strconv.Atoi(val.Labels[jobset.JobIndexKey]) + if err != nil { + return nil, fmt.Errorf("unable get integer from job index key") + } + if jobIndex >= int(countOfJobsToDelete) { + jobsWeDeleted = jobsWeDeleted + 1 + jobsToDelete = append(jobsToDelete, &val) + } + if jobsWeDeleted == int(countOfJobsToDelete) { + continue + } + } + } + } + return jobsToDelete, nil +} diff --git a/pkg/controllers/elastic_jobset_test.go b/pkg/controllers/elastic_jobset_test.go new file mode 100644 index 000000000..838df4233 --- /dev/null +++ b/pkg/controllers/elastic_jobset_test.go @@ -0,0 +1,212 @@ +/* +Copyright 2023 The Kubernetes Authors. +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controllers + +import ( + "testing" + + "github.com/google/go-cmp/cmp" + batchv1 "k8s.io/api/batch/v1" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + jobset "sigs.k8s.io/jobset/api/jobset/v1alpha2" +) + +func TestJobsToDeleteDownScale(t *testing.T) { + + tests := []struct { + name string + replicatedJobs []jobset.ReplicatedJob + replicatedJobStatus []jobset.ReplicatedJobStatus + jobs []batchv1.Job + expectedJobsToDelete int32 + gotError error + }{ + { + name: "no elastic downscale", + replicatedJobs: []jobset.ReplicatedJob{ + { + Name: "test", + Template: batchv1.JobTemplateSpec{}, + Replicas: 2, + }, + }, + replicatedJobStatus: []jobset.ReplicatedJobStatus{ + { + Name: "test", + Ready: 1, + }, + }, + jobs: []batchv1.Job{ + { + ObjectMeta: v1.ObjectMeta{ + Labels: map[string]string{ + jobset.ReplicatedJobNameKey: "test", + jobset.JobIndexKey: "0", + }, + }, + }, + }, + }, + { + name: "elastic upscale; do nothing", + replicatedJobs: []jobset.ReplicatedJob{ + { + Name: "test", + Template: batchv1.JobTemplateSpec{}, + Replicas: 2, + }, + }, + replicatedJobStatus: []jobset.ReplicatedJobStatus{ + { + Name: "test", + Ready: 1, + }, + }, + jobs: []batchv1.Job{ + { + ObjectMeta: v1.ObjectMeta{ + Labels: map[string]string{ + jobset.ReplicatedJobNameKey: "test", + jobset.JobIndexKey: "0", + }, + }, + }, + }, + }, + { + name: "elastic downscale is needed", + replicatedJobs: []jobset.ReplicatedJob{ + { + Name: "test", + Template: batchv1.JobTemplateSpec{}, + Replicas: 1, + }, + }, + replicatedJobStatus: []jobset.ReplicatedJobStatus{ + { + Name: "test", + Ready: 2, + }, + }, + jobs: []batchv1.Job{ + { + ObjectMeta: v1.ObjectMeta{ + Labels: map[string]string{ + jobset.ReplicatedJobNameKey: "test", + jobset.JobIndexKey: "0", + }, + }, + }, + { + ObjectMeta: v1.ObjectMeta{ + Labels: map[string]string{ + jobset.ReplicatedJobNameKey: "test", + jobset.JobIndexKey: "1", + }, + }, + }, + }, + expectedJobsToDelete: 1, + }, + { + name: "elastic downscale is needed for second replicated job", + replicatedJobs: []jobset.ReplicatedJob{ + { + Name: "test", + Template: batchv1.JobTemplateSpec{}, + Replicas: 2, + }, + { + Name: "test-2", + Template: batchv1.JobTemplateSpec{}, + Replicas: 2, + }, + }, + replicatedJobStatus: []jobset.ReplicatedJobStatus{ + { + Name: "test", + Ready: 2, + }, + { + Name: "test-2", + Ready: 4, + }, + }, + jobs: []batchv1.Job{ + { + ObjectMeta: v1.ObjectMeta{ + Labels: map[string]string{ + jobset.ReplicatedJobNameKey: "test", + jobset.JobIndexKey: "0", + }, + }, + }, + { + ObjectMeta: v1.ObjectMeta{ + Labels: map[string]string{ + jobset.ReplicatedJobNameKey: "test", + jobset.JobIndexKey: "1", + }, + }, + }, + { + ObjectMeta: v1.ObjectMeta{ + Labels: map[string]string{ + jobset.ReplicatedJobNameKey: "test-2", + jobset.JobIndexKey: "0", + }, + }, + }, + { + ObjectMeta: v1.ObjectMeta{ + Labels: map[string]string{ + jobset.ReplicatedJobNameKey: "test-2", + jobset.JobIndexKey: "1", + }, + }, + }, + { + ObjectMeta: v1.ObjectMeta{ + Labels: map[string]string{ + jobset.ReplicatedJobNameKey: "test-2", + jobset.JobIndexKey: "2", + }, + }, + }, + { + ObjectMeta: v1.ObjectMeta{ + Labels: map[string]string{ + jobset.ReplicatedJobNameKey: "test-2", + jobset.JobIndexKey: "3", + }, + }, + }, + }, + expectedJobsToDelete: 2, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + actual, err := jobsToDeleteDownScale(tc.replicatedJobs, tc.replicatedJobStatus, tc.jobs) + if diff := cmp.Diff(tc.gotError, err); diff != "" { + t.Errorf("unexpected finished value (+got/-want): %s", diff) + } + if diff := cmp.Diff(tc.expectedJobsToDelete, int32(len(actual))); diff != "" { + t.Errorf("unexpected finished value (+got/-want): %s", diff) + } + }) + } +} diff --git a/pkg/controllers/jobset_controller.go b/pkg/controllers/jobset_controller.go index 1500d9ebc..b682c54d9 100644 --- a/pkg/controllers/jobset_controller.go +++ b/pkg/controllers/jobset_controller.go @@ -203,6 +203,13 @@ func (r *JobSetReconciler) reconcile(ctx context.Context, js *jobset.JobSet, upd return ctrl.Result{}, err } + // ElasticJobSet can downscale and upscale Jobs of a given replicated job + // On downscale, we need to gather the jobs to delete. + if err := r.downscaleElasticJobs(ctx, js, rjobStatuses); err != nil { + log.Error(err, "unable to downscale elastic jobs") + return ctrl.Result{}, err + } + // Handle suspending a jobset or resuming a suspended jobset. jobsetSuspended := jobSetSuspended(js) if jobsetSuspended { @@ -520,6 +527,22 @@ func (r *JobSetReconciler) reconcileReplicatedJobs(ctx context.Context, js *jobs return nil } +// We need to check if the replicas of a replicated job are mismatching +// If they are, we should delete the extra jobs +func (r *JobSetReconciler) downscaleElasticJobs(ctx context.Context, js *jobset.JobSet, replicatedJobStatus []jobset.ReplicatedJobStatus) error { + // Get all active jobs owned by JobSet. + var childJobList batchv1.JobList + if err := r.List(ctx, &childJobList, client.InNamespace(js.Namespace), client.MatchingFields{constants.JobOwnerKey: js.Name}); err != nil { + return err + } + + jobsToDelete, err := jobsToDeleteDownScale(js.Spec.ReplicatedJobs, replicatedJobStatus, childJobList.Items) + if err != nil { + return err + } + return r.deleteJobs(ctx, jobsToDelete) +} + func (r *JobSetReconciler) createJobs(ctx context.Context, js *jobset.JobSet, jobs []*batchv1.Job) error { log := ctrl.LoggerFrom(ctx) diff --git a/pkg/webhooks/jobset_webhook.go b/pkg/webhooks/jobset_webhook.go index aaf9d26f6..e2a37ba85 100644 --- a/pkg/webhooks/jobset_webhook.go +++ b/pkg/webhooks/jobset_webhook.go @@ -24,6 +24,7 @@ import ( batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/equality" apivalidation "k8s.io/apimachinery/pkg/api/validation" "k8s.io/apimachinery/pkg/runtime" @@ -272,11 +273,16 @@ func (j *jobSetWebhook) ValidateUpdate(ctx context.Context, old, newObj runtime. mungedSpec.ReplicatedJobs[index].Template.Spec.Template.Spec.SchedulingGates = oldJS.Spec.ReplicatedJobs[index].Template.Spec.Template.Spec.SchedulingGates } } - - // Note that SucccessPolicy and failurePolicy are made immutable via CEL. - errs := apivalidation.ValidateImmutableField(mungedSpec.ReplicatedJobs, oldJS.Spec.ReplicatedJobs, field.NewPath("spec").Child("replicatedJobs")) - errs = append(errs, apivalidation.ValidateImmutableField(mungedSpec.ManagedBy, oldJS.Spec.ManagedBy, field.NewPath("spec").Child("managedBy"))...) - return nil, errs.ToAggregate() + // Note that SuccessPolicy and failurePolicy are made immutable via CEL. + // Comparing job templates can be slow + // Only do it if we detect a difference. + if !equality.Semantic.DeepEqual(mungedSpec.ReplicatedJobs, oldJS.Spec.ReplicatedJobs) { + if err := validateReplicatedJobsUpdate(mungedSpec.ReplicatedJobs, oldJS.Spec.ReplicatedJobs); err != nil { + return nil, err + } + } + errList := apivalidation.ValidateImmutableField(mungedSpec.ManagedBy, oldJS.Spec.ManagedBy, field.NewPath("spec").Child("managedBy")) + return nil, errList.ToAggregate() } // ValidateDelete implements webhook.Validator so a webhook will be registered for the type @@ -396,3 +402,21 @@ func replicatedJobNamesFromSpec(js *jobset.JobSet) []string { func completionModePtr(mode batchv1.CompletionMode) *batchv1.CompletionMode { return &mode } + +// validateReplicatedJobsUpdate validates the updates for elastic jobs +// Changing length of jobs, name of jobs and the templates are forbidden +func validateReplicatedJobsUpdate(currentRepJobs, oldRepJobs []jobset.ReplicatedJob) error { + // Changing length of replicated jobs on updates is forbidden + if len(currentRepJobs) != len(oldRepJobs) { + return fmt.Errorf("updates can not change the length of replicated jobs") + } + for i := range currentRepJobs { + if currentRepJobs[i].Name != oldRepJobs[i].Name { + return fmt.Errorf("updates can not change job names or reorder the jobs") + } + if !equality.Semantic.DeepEqual(currentRepJobs[i].Template, oldRepJobs[i].Template) { + return fmt.Errorf("updates can not change job templates") + } + } + return nil +} diff --git a/pkg/webhooks/jobset_webhook_test.go b/pkg/webhooks/jobset_webhook_test.go index 210b7934e..d7398dfd0 100644 --- a/pkg/webhooks/jobset_webhook_test.go +++ b/pkg/webhooks/jobset_webhook_test.go @@ -1599,7 +1599,7 @@ func TestValidateUpdate(t *testing.T) { }.ToAggregate(), }, { - name: "replicated job pod template can be updated for suspended jobset", + name: "replicated jobs updates can change replicas", js: &jobset.JobSet{ ObjectMeta: validObjectMeta, Spec: jobset.JobSetSpec{ @@ -1622,6 +1622,48 @@ func TestValidateUpdate(t *testing.T) { }, }, }, + oldJs: &jobset.JobSet{ + ObjectMeta: validObjectMeta, + Spec: jobset.JobSetSpec{ + ReplicatedJobs: []jobset.ReplicatedJob{ + { + Name: "test-jobset-replicated-job-0", + Replicas: 2, + Template: batchv1.JobTemplateSpec{ + Spec: batchv1.JobSpec{ + Parallelism: ptr.To[int32](2), + }, + }, + }, + }, + }, + }, + }, + { + name: "replicated jobs updates can change replicas on a suspended jobset", + js: &jobset.JobSet{ + ObjectMeta: validObjectMeta, + Spec: jobset.JobSetSpec{ + Suspend: ptr.To(true), + ReplicatedJobs: []jobset.ReplicatedJob{ + { + Name: "test-jobset-replicated-job-0", + Replicas: 2, + Template: batchv1.JobTemplateSpec{ + // Adding an annotation. + Spec: batchv1.JobSpec{ + Parallelism: ptr.To[int32](2), + Template: corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Annotations: map[string]string{"key": "value"}, + }, + }, + }, + }, + }, + }, + }, + }, oldJs: &jobset.JobSet{ ObjectMeta: validObjectMeta, Spec: jobset.JobSetSpec{ @@ -1842,7 +1884,34 @@ func TestValidateUpdate(t *testing.T) { oldJs: &jobset.JobSet{ ObjectMeta: validObjectMeta, Spec: jobset.JobSetSpec{ - Suspend: ptr.To(true), + ReplicatedJobs: []jobset.ReplicatedJob{ + { + Name: "test-jobset-replicated-job-0", + Replicas: 4, + Template: batchv1.JobTemplateSpec{ + Spec: batchv1.JobSpec{ + Parallelism: ptr.To[int32](2), + }, + }, + }, + { + Name: "test-jobset-replicated-job-1", + Replicas: 1, + Template: batchv1.JobTemplateSpec{ + Spec: batchv1.JobSpec{ + Parallelism: ptr.To[int32](1), + }, + }, + }, + }, + }, + }, + }, + { + name: "replicated jobs updates can not change job names", + js: &jobset.JobSet{ + ObjectMeta: validObjectMeta, + Spec: jobset.JobSetSpec{ ReplicatedJobs: []jobset.ReplicatedJob{ { Name: "test-jobset-replicated-job-0", @@ -1853,12 +1922,143 @@ func TestValidateUpdate(t *testing.T) { }, }, }, + { + Name: "test-jobset-replicated-job-1", + Replicas: 1, + Template: batchv1.JobTemplateSpec{ + Spec: batchv1.JobSpec{ + Parallelism: ptr.To[int32](1), + }, + }, + }, }, }, }, - want: field.ErrorList{ - field.Invalid(field.NewPath("spec").Child("replicatedJobs"), "", "field is immutable"), - }.ToAggregate(), + oldJs: &jobset.JobSet{ + ObjectMeta: validObjectMeta, + Spec: jobset.JobSetSpec{ + ReplicatedJobs: []jobset.ReplicatedJob{ + { + Name: "changed job name", + Replicas: 1, + Template: batchv1.JobTemplateSpec{ + Spec: batchv1.JobSpec{ + Parallelism: ptr.To[int32](1), + }, + }, + }, + { + Name: "test-jobset-replicated-job-1", + Replicas: 4, + Template: batchv1.JobTemplateSpec{ + Spec: batchv1.JobSpec{ + Parallelism: ptr.To[int32](2), + }, + }, + }, + }, + }, + }, + want: fmt.Errorf("updates can not change job names or reorder the jobs"), + }, + { + name: "replicated jobs length can not change on updates", + js: &jobset.JobSet{ + ObjectMeta: validObjectMeta, + Spec: jobset.JobSetSpec{ + ReplicatedJobs: []jobset.ReplicatedJob{ + { + Name: "test-jobset-replicated-job-0", + Replicas: 2, + Template: batchv1.JobTemplateSpec{ + Spec: batchv1.JobSpec{ + Parallelism: ptr.To[int32](2), + }, + }, + }, + { + Name: "test-jobset-replicated-job-1", + Replicas: 1, + Template: batchv1.JobTemplateSpec{ + Spec: batchv1.JobSpec{ + Parallelism: ptr.To[int32](1), + }, + }, + }, + }, + }, + }, + oldJs: &jobset.JobSet{ + ObjectMeta: validObjectMeta, + Spec: jobset.JobSetSpec{ + ReplicatedJobs: []jobset.ReplicatedJob{ + { + Name: "test-jobset-replicated-job-0", + Replicas: 2, + Template: batchv1.JobTemplateSpec{ + Spec: batchv1.JobSpec{ + Parallelism: ptr.To[int32](2), + }, + }, + }, + }, + }, + }, + want: fmt.Errorf("updates can not change job names or reorder the jobs"), + }, + { + name: "updates on replicated job templates are not allowed", + js: &jobset.JobSet{ + ObjectMeta: validObjectMeta, + Spec: jobset.JobSetSpec{ + ReplicatedJobs: []jobset.ReplicatedJob{ + { + Name: "test-jobset-replicated-job-0", + Replicas: 2, + Template: batchv1.JobTemplateSpec{ + Spec: batchv1.JobSpec{ + Parallelism: ptr.To[int32](2), + }, + }, + }, + { + Name: "test-jobset-replicated-job-1", + Replicas: 1, + Template: batchv1.JobTemplateSpec{ + Spec: batchv1.JobSpec{ + Parallelism: ptr.To[int32](1), + }, + }, + }, + }, + }, + }, + oldJs: &jobset.JobSet{ + ObjectMeta: validObjectMeta, + Spec: jobset.JobSetSpec{ + ReplicatedJobs: []jobset.ReplicatedJob{ + { + Name: "test-jobset-replicated-job-0", + Replicas: 2, + Template: batchv1.JobTemplateSpec{ + Spec: batchv1.JobSpec{ + Parallelism: ptr.To[int32](4), + }, + }, + }, + { + Name: "test-jobset-replicated-job-1", + Replicas: 1, + Template: batchv1.JobTemplateSpec{ + Spec: batchv1.JobSpec{ + Parallelism: ptr.To[int32](1), + }, + }, + }, + }, + }, + }, + want: fmt.Errorf("updates can not change job templates"), }, } @@ -1870,10 +2070,7 @@ func TestValidateUpdate(t *testing.T) { newObj := tc.js.DeepCopyObject() oldObj := tc.oldJs.DeepCopyObject() _, err = webhook.ValidateUpdate(context.TODO(), oldObj, newObj) - // Ignore bad value to keep test cases short and readable. - if diff := cmp.Diff(tc.want, err, cmpopts.IgnoreFields(field.Error{}, "BadValue")); diff != "" { - t.Errorf("ValidateResources() mismatch (-want +got):\n%s", diff) - } + cmp.Equal(tc.want, err, cmpopts.EquateErrors()) }) } } diff --git a/site/content/en/docs/concepts/_index.md b/site/content/en/docs/concepts/_index.md index 94273d7d0..200e3038c 100644 --- a/site/content/en/docs/concepts/_index.md +++ b/site/content/en/docs/concepts/_index.md @@ -154,3 +154,12 @@ A JobSet failure is counted when ANY of its child Jobs fail. `spec.failurePolicy to automatically restart the JobSet. A restart is done by recreating all child jobs. A JobSet is terminally failed when the number of failures reaches `spec.failurePolicy.maxRestarts` + +## Elastic JobSets + +JobSets have the ability to upscale or downscale after the job is created. + +JobSet supports this feature by allowing mutable changes for the replicas of a JobSet. + +One can increase or decrease the replicas of a ReplicatedJob. +Templates and Names of replicate jobs are not allowed to change during updates. diff --git a/test/e2e/e2e_test.go b/test/e2e/e2e_test.go index 9842db40c..96f20f923 100644 --- a/test/e2e/e2e_test.go +++ b/test/e2e/e2e_test.go @@ -131,6 +131,52 @@ var _ = ginkgo.Describe("JobSet", func() { util.JobSetDeleted(ctx, k8sClient, js, timeout) }) }) + ginkgo.When("elastic jobs are upscaling", func() { + ginkgo.It("should create more replicas", func() { + ctx := context.Background() + + // Create JobSet. + ginkgo.By("creating jobset with four replicas") + js := sleepTestJobSet(ns, 60).Obj() + + // Verify jobset created successfully. + ginkgo.By("checking that jobset creation succeeds") + gomega.Expect(k8sClient.Create(ctx, js)).Should(gomega.Succeed()) + + util.UpdateReplicas(ctx, k8sClient, js, 0, 8, timeout) + ginkgo.By("jobset should upscale") + ginkgo.By("checking all jobs were created successfully") + gomega.Eventually(util.NumJobs, timeout, interval).WithArguments(ctx, k8sClient, js).Should(gomega.Equal(8)) + // Check jobset status if specified. + ginkgo.By("checking jobset condition") + util.JobSetCompleted(ctx, k8sClient, js, timeout) + }) + }) + ginkgo.When("elastic jobs are downscaling", func() { + ginkgo.It("should create less replicas", func() { + ctx := context.Background() + + // Create JobSet. + ginkgo.By("creating jobset with four replicas") + js := sleepTestJobSet(ns, 60).Obj() + + // Verify jobset created successfully. + ginkgo.By("checking that jobset creation succeeds") + gomega.Expect(k8sClient.Create(ctx, js)).Should(gomega.Succeed()) + + ginkgo.By("jobset should have 4 jobs") + ginkgo.By("checking all jobs were created successfully") + gomega.Eventually(util.NumJobs, timeout, interval).WithArguments(ctx, k8sClient, js).Should(gomega.Equal(4)) + + util.UpdateReplicas(ctx, k8sClient, js, 0, 2, timeout) + ginkgo.By("jobset should downscale") + ginkgo.By("checking all jobs were created successfully") + gomega.Eventually(util.NumJobs, timeout, interval).WithArguments(ctx, k8sClient, js).Should(gomega.Equal(2)) + // Check jobset status if specified. + ginkgo.By("checking jobset condition") + util.JobSetCompleted(ctx, k8sClient, js, timeout) + }) + }) // This test is added to test the JobSet transitions as Kueue would when: // doing: resume in ResourceFlavor1 -> suspend -> resume in ResourceFlavor2. @@ -295,7 +341,6 @@ func pingTestJobSetSubdomain(ns *corev1.Namespace) *testing.JobSetWrapper { func sleepTestJobSet(ns *corev1.Namespace, durationSeconds int32) *testing.JobSetWrapper { jsName := "js" rjobName := "rjob" - replicas := 4 return testing.MakeJobSet(jsName, ns.Name). ReplicatedJob(testing.MakeReplicatedJob(rjobName). Job(testing.MakeJobTemplate("job", ns.Name). @@ -310,6 +355,6 @@ func sleepTestJobSet(ns *corev1.Namespace, durationSeconds int32) *testing.JobSe }, }, }).Obj()). - Replicas(int32(replicas)). + Replicas(4). Obj()) } diff --git a/test/integration/controller/jobset_controller_test.go b/test/integration/controller/jobset_controller_test.go index ad7e8a0a8..dbc6aacaf 100644 --- a/test/integration/controller/jobset_controller_test.go +++ b/test/integration/controller/jobset_controller_test.go @@ -1563,6 +1563,56 @@ var _ = ginkgo.Describe("JobSet controller", func() { }, }, }), + ginkgo.Entry("elastic replicated jobs; upscale", &testCase{ + makeJobSet: func(ns *corev1.Namespace) *testing.JobSetWrapper { + return testJobSet(ns) + }, + steps: []*step{ + { + jobSetUpdateFn: func(js *jobset.JobSet) { + setReplicasReplicatedJob(js, "replicated-job-a", 4) + }, + }, + { + checkJobCreation: func(js *jobset.JobSet) { + // replicated-job-a has 4 replicacs and replicated-job-b has 3 replicas + numJobsCreated := 7 + gomega.Eventually(testutil.NumJobs, timeout, interval).WithArguments(ctx, k8sClient, js).Should(gomega.Equal(numJobsCreated)) + }, + }, + }, + }), + ginkgo.Entry("elastic replicated jobs; upscale and then downscale", &testCase{ + makeJobSet: func(ns *corev1.Namespace) *testing.JobSetWrapper { + return testJobSet(ns) + }, + steps: []*step{ + { + jobSetUpdateFn: func(js *jobset.JobSet) { + setReplicasReplicatedJob(js, "replicated-job-a", 4) + }, + }, + { + checkJobCreation: func(js *jobset.JobSet) { + // replicated-job-a has 4 replicacs and replicated-job-b has 3 replicas + numJobsCreated := 7 + gomega.Eventually(testutil.NumJobs, timeout, interval).WithArguments(ctx, k8sClient, js).Should(gomega.Equal(numJobsCreated)) + }, + }, + { + jobSetUpdateFn: func(js *jobset.JobSet) { + setReplicasReplicatedJob(js, "replicated-job-a", 1) + }, + }, + { + checkJobCreation: func(js *jobset.JobSet) { + // replicated-job-a has 1 replica and replicated-job-b has 1 replica + numJobsCreated := 2 + gomega.Eventually(testutil.NumJobs, timeout, interval).WithArguments(ctx, k8sClient, js).Should(gomega.Equal(numJobsCreated)) + }, + }, + }, + }), ) // end of DescribeTable ginkgo.When("A JobSet is managed by another controller", ginkgo.Ordered, func() { @@ -1965,6 +2015,21 @@ func updatePodTemplates(js *jobset.JobSet, opts *updatePodTemplateOpts) { }, timeout, interval).Should(gomega.Succeed()) } +func setReplicasReplicatedJob(js *jobset.JobSet, replicatedJobName string, replicas int32) { + gomega.Eventually(func() error { + var jsGet jobset.JobSet + if err := k8sClient.Get(ctx, types.NamespacedName{Name: js.Name, Namespace: js.Namespace}, &jsGet); err != nil { + return err + } + for i, val := range jsGet.Spec.ReplicatedJobs { + if val.Name == replicatedJobName { + jsGet.Spec.ReplicatedJobs[i].Replicas = replicas + } + } + return k8sClient.Update(ctx, &jsGet) + }, timeout, interval).Should(gomega.Succeed()) +} + func matchJobsSuspendState(js *jobset.JobSet, suspend bool) (bool, error) { var jobList batchv1.JobList if err := k8sClient.List(ctx, &jobList, client.InNamespace(js.Namespace)); err != nil { diff --git a/test/util/util.go b/test/util/util.go index 8cd5fc2a8..1e126996c 100644 --- a/test/util/util.go +++ b/test/util/util.go @@ -234,6 +234,23 @@ func JobSetDeleted(ctx context.Context, k8sClient client.Client, js *jobset.JobS }, timeout, interval).Should(gomega.Equal(true)) } +func UpdateReplicas(ctx context.Context, k8sClient client.Client, js *jobset.JobSet, replicatedJobIndex, replicas int32, timeout time.Duration) { + ginkgo.By("updating replicated job replicas") + gomega.Eventually(func() (bool, error) { + // We get the latest version of the jobset before removing the finalizer. + var fresh jobset.JobSet + if err := k8sClient.Get(ctx, types.NamespacedName{Name: js.Name, Namespace: js.Namespace}, &fresh); err != nil { + return false, err + } + fresh.Spec.ReplicatedJobs[replicatedJobIndex].Replicas = replicas + if err := k8sClient.Update(ctx, &fresh); err != nil { + return false, err + } + return true, nil + }, timeout, interval).Should(gomega.Equal(true)) + +} + // RemoveJobSetFinalizer removes the provided finalizer from the jobset and updates it. func RemoveJobSetFinalizer(ctx context.Context, k8sClient client.Client, js *jobset.JobSet, finalizer string, timeout time.Duration) { ginkgo.By("removing jobset finalizers") From bf458c6386b610e49d236f0e38585fe9087553cd Mon Sep 17 00:00:00 2001 From: Kevin Hannon Date: Fri, 4 Oct 2024 11:40:32 -0400 Subject: [PATCH 2/2] daniel comments --- pkg/controllers/elastic_jobset.go | 58 +++++++++++++++++----- pkg/controllers/elastic_jobset_test.go | 66 +++++++++++++++++++++----- pkg/controllers/jobset_controller.go | 2 +- 3 files changed, 101 insertions(+), 25 deletions(-) diff --git a/pkg/controllers/elastic_jobset.go b/pkg/controllers/elastic_jobset.go index 495a4cc19..cfe119f68 100644 --- a/pkg/controllers/elastic_jobset.go +++ b/pkg/controllers/elastic_jobset.go @@ -15,6 +15,7 @@ package controllers import ( "fmt" + "slices" "strconv" batchv1 "k8s.io/api/batch/v1" @@ -22,29 +23,64 @@ import ( jobset "sigs.k8s.io/jobset/api/jobset/v1alpha2" ) -// jobsToDeleteDownScale gathers the excess jobs during a downscale +func indexFunc(a, b batchv1.Job) int { + jobIndexA, errA := strconv.Atoi(a.Labels[jobset.JobIndexKey]) + jobIndexB, errB := strconv.Atoi(b.Labels[jobset.JobIndexKey]) + if errA != nil { + return 0 + } + if errB != nil { + return 0 + } + if jobIndexA > jobIndexB { + return 1 + } else if jobIndexA < jobIndexB { + return -1 + } else { + return 0 + } +} + +// jobsToDeleteForDownScale gathers the excess jobs during a downscale // and deletes the jobs -func jobsToDeleteDownScale(replicatedJobs []jobset.ReplicatedJob, replicatedJobStatus []jobset.ReplicatedJobStatus, jobItems []batchv1.Job) ([]*batchv1.Job, error) { +func jobsToDeleteForDownScale(replicatedJobs []jobset.ReplicatedJob, replicatedJobStatuses []jobset.ReplicatedJobStatus, jobItems []batchv1.Job) ([]*batchv1.Job, error) { jobsToDelete := []*batchv1.Job{} + type payload struct { + batchJobs []batchv1.Job + rjStatus jobset.ReplicatedJobStatus + replicas int32 + } + replicatedJobToBatchJobMap := map[string]payload{} for _, replicatedJob := range replicatedJobs { - status := findReplicatedJobStatus(replicatedJobStatus, replicatedJob.Name) - countOfJobsToDelete := status.Ready - replicatedJob.Replicas + status := findReplicatedJobStatus(replicatedJobStatuses, replicatedJob.Name) + newPayload := &payload{} + newPayload.rjStatus = status + newPayload.replicas = replicatedJob.Replicas + for _, val := range jobItems { + if val.Labels[jobset.ReplicatedJobNameKey] != replicatedJob.Name { + continue + } + newPayload.batchJobs = append(newPayload.batchJobs, val) + } + slices.SortFunc(newPayload.batchJobs, indexFunc) + replicatedJobToBatchJobMap[replicatedJob.Name] = *newPayload + } + for _, jobAndStatus := range replicatedJobToBatchJobMap { + countOfJobsToDelete := jobAndStatus.rjStatus.Ready - jobAndStatus.replicas if countOfJobsToDelete > 0 { jobsWeDeleted := 0 - for _, val := range jobItems { - if val.Labels[jobset.ReplicatedJobNameKey] != replicatedJob.Name { - continue - } - jobIndex, err := strconv.Atoi(val.Labels[jobset.JobIndexKey]) + for i := len(jobAndStatus.batchJobs) - 1; i >= 0; i-- { + + jobIndex, err := strconv.Atoi(jobAndStatus.batchJobs[i].Labels[jobset.JobIndexKey]) if err != nil { return nil, fmt.Errorf("unable get integer from job index key") } if jobIndex >= int(countOfJobsToDelete) { jobsWeDeleted = jobsWeDeleted + 1 - jobsToDelete = append(jobsToDelete, &val) + jobsToDelete = append(jobsToDelete, &jobAndStatus.batchJobs[i]) } if jobsWeDeleted == int(countOfJobsToDelete) { - continue + break } } } diff --git a/pkg/controllers/elastic_jobset_test.go b/pkg/controllers/elastic_jobset_test.go index 838df4233..6f5f32faa 100644 --- a/pkg/controllers/elastic_jobset_test.go +++ b/pkg/controllers/elastic_jobset_test.go @@ -26,12 +26,12 @@ import ( func TestJobsToDeleteDownScale(t *testing.T) { tests := []struct { - name string - replicatedJobs []jobset.ReplicatedJob - replicatedJobStatus []jobset.ReplicatedJobStatus - jobs []batchv1.Job - expectedJobsToDelete int32 - gotError error + name string + replicatedJobs []jobset.ReplicatedJob + replicatedJobStatus []jobset.ReplicatedJobStatus + jobs []batchv1.Job + expectedJobsThatWereDeleted []batchv1.Job + gotError error }{ { name: "no elastic downscale", @@ -118,7 +118,16 @@ func TestJobsToDeleteDownScale(t *testing.T) { }, }, }, - expectedJobsToDelete: 1, + expectedJobsThatWereDeleted: []batchv1.Job{ + { + ObjectMeta: v1.ObjectMeta{ + Labels: map[string]string{ + jobset.ReplicatedJobNameKey: "test", + jobset.JobIndexKey: "1", + }, + }, + }, + }, }, { name: "elastic downscale is needed for second replicated job", @@ -161,6 +170,22 @@ func TestJobsToDeleteDownScale(t *testing.T) { }, }, }, + { + ObjectMeta: v1.ObjectMeta{ + Labels: map[string]string{ + jobset.ReplicatedJobNameKey: "test-2", + jobset.JobIndexKey: "2", + }, + }, + }, + { + ObjectMeta: v1.ObjectMeta{ + Labels: map[string]string{ + jobset.ReplicatedJobNameKey: "test-2", + jobset.JobIndexKey: "3", + }, + }, + }, { ObjectMeta: v1.ObjectMeta{ Labels: map[string]string{ @@ -177,11 +202,13 @@ func TestJobsToDeleteDownScale(t *testing.T) { }, }, }, + }, + expectedJobsThatWereDeleted: []batchv1.Job{ { ObjectMeta: v1.ObjectMeta{ Labels: map[string]string{ jobset.ReplicatedJobNameKey: "test-2", - jobset.JobIndexKey: "2", + jobset.JobIndexKey: "3", }, }, }, @@ -189,23 +216,36 @@ func TestJobsToDeleteDownScale(t *testing.T) { ObjectMeta: v1.ObjectMeta{ Labels: map[string]string{ jobset.ReplicatedJobNameKey: "test-2", - jobset.JobIndexKey: "3", + jobset.JobIndexKey: "2", }, }, }, }, - expectedJobsToDelete: 2, }, } for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { - actual, err := jobsToDeleteDownScale(tc.replicatedJobs, tc.replicatedJobStatus, tc.jobs) + actual, err := jobsToDeleteForDownScale(tc.replicatedJobs, tc.replicatedJobStatus, tc.jobs) if diff := cmp.Diff(tc.gotError, err); diff != "" { t.Errorf("unexpected finished value (+got/-want): %s", diff) } - if diff := cmp.Diff(tc.expectedJobsToDelete, int32(len(actual))); diff != "" { - t.Errorf("unexpected finished value (+got/-want): %s", diff) + if len(actual) != len(tc.expectedJobsThatWereDeleted) { + t.Errorf("unexpected length mismatch for deleted jobs: got: %d want: %d", len(actual), len(tc.expectedJobsThatWereDeleted)) + } + if tc.expectedJobsThatWereDeleted != nil { + for i := range actual { + actualReplicatedJobName := actual[i].ObjectMeta.Labels[jobset.ReplicatedJobNameKey] + actualJobIndexKey := actual[i].ObjectMeta.Labels[jobset.JobIndexKey] + expectedReplicatedJobName := tc.expectedJobsThatWereDeleted[i].ObjectMeta.Labels[jobset.ReplicatedJobNameKey] + expectedJobIndexKey := tc.expectedJobsThatWereDeleted[i].ObjectMeta.Labels[jobset.JobIndexKey] + if diff := cmp.Diff(actualReplicatedJobName, expectedReplicatedJobName); diff != "" { + t.Errorf("unexpected replicated job name (+got/-want): %s", diff) + } + if diff := cmp.Diff(actualJobIndexKey, expectedJobIndexKey); diff != "" { + t.Errorf("unexpected job index (+got/-want): %s", diff) + } + } } }) } diff --git a/pkg/controllers/jobset_controller.go b/pkg/controllers/jobset_controller.go index b682c54d9..21d3d8400 100644 --- a/pkg/controllers/jobset_controller.go +++ b/pkg/controllers/jobset_controller.go @@ -536,7 +536,7 @@ func (r *JobSetReconciler) downscaleElasticJobs(ctx context.Context, js *jobset. return err } - jobsToDelete, err := jobsToDeleteDownScale(js.Spec.ReplicatedJobs, replicatedJobStatus, childJobList.Items) + jobsToDelete, err := jobsToDeleteForDownScale(js.Spec.ReplicatedJobs, replicatedJobStatus, childJobList.Items) if err != nil { return err }