From f4e6034e0a9b429afcd7b4c332749ca99e58dfcd Mon Sep 17 00:00:00 2001 From: Kevin Hannon Date: Thu, 18 Apr 2024 15:46:26 -0400 Subject: [PATCH] support elastic jobset --- pkg/webhooks/jobset_webhook.go | 34 ++- pkg/webhooks/jobset_webhook_test.go | 215 +++++++++++++++++- site/content/en/docs/concepts/_index.md | 9 + test/e2e/e2e_test.go | 53 ++++- .../controller/jobset_controller_test.go | 62 +++++ test/util/util.go | 17 ++ 6 files changed, 372 insertions(+), 18 deletions(-) diff --git a/pkg/webhooks/jobset_webhook.go b/pkg/webhooks/jobset_webhook.go index 8c7588def..092cd9987 100644 --- a/pkg/webhooks/jobset_webhook.go +++ b/pkg/webhooks/jobset_webhook.go @@ -24,6 +24,7 @@ import ( batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/equality" apivalidation "k8s.io/apimachinery/pkg/api/validation" "k8s.io/apimachinery/pkg/runtime" @@ -272,11 +273,16 @@ func (j *jobSetWebhook) ValidateUpdate(ctx context.Context, old, newObj runtime. mungedSpec.ReplicatedJobs[index].Template.Spec.Template.Spec.SchedulingGates = oldJS.Spec.ReplicatedJobs[index].Template.Spec.Template.Spec.SchedulingGates } } - - // Note that SucccessPolicy and failurePolicy are made immutable via CEL. - errs := apivalidation.ValidateImmutableField(mungedSpec.ReplicatedJobs, oldJS.Spec.ReplicatedJobs, field.NewPath("spec").Child("replicatedJobs")) - errs = append(errs, apivalidation.ValidateImmutableField(mungedSpec.ManagedBy, oldJS.Spec.ManagedBy, field.NewPath("spec").Child("managedBy"))...) - return nil, errs.ToAggregate() + // Note that SuccessPolicy and failurePolicy are made immutable via CEL. + // Comparing job templates can be slow + // Only do it if we detect a difference. + if !equality.Semantic.DeepEqual(mungedSpec.ReplicatedJobs, oldJS.Spec.ReplicatedJobs) { + if err := validateReplicatedJobsUpdate(mungedSpec.ReplicatedJobs, oldJS.Spec.ReplicatedJobs); err != nil { + return nil, err + } + } + errList := apivalidation.ValidateImmutableField(mungedSpec.ManagedBy, oldJS.Spec.ManagedBy, field.NewPath("spec").Child("managedBy")) + return nil, errList.ToAggregate() } // ValidateDelete implements webhook.Validator so a webhook will be registered for the type @@ -396,3 +402,21 @@ func replicatedJobNamesFromSpec(js *jobset.JobSet) []string { func completionModePtr(mode batchv1.CompletionMode) *batchv1.CompletionMode { return &mode } + +// validateReplicatedJobsUpdate validates the updates for elastic jobs +// Changing length of jobs, name of jobs and the templates are forbidden +func validateReplicatedJobsUpdate(currentRepJobs, oldRepJobs []jobset.ReplicatedJob) error { + // Changing length of replicated jobs on updates is forbidden + if len(currentRepJobs) != len(oldRepJobs) { + return fmt.Errorf("updates can not change the length of replicated jobs") + } + for i := range currentRepJobs { + if currentRepJobs[i].Name != oldRepJobs[i].Name { + return fmt.Errorf("updates can not change job names or reorder the jobs") + } + if !equality.Semantic.DeepEqual(currentRepJobs[i].Template, oldRepJobs[i].Template) { + return fmt.Errorf("updates can not change job templates") + } + } + return nil +} diff --git a/pkg/webhooks/jobset_webhook_test.go b/pkg/webhooks/jobset_webhook_test.go index e86a6e4ae..80b89c7ff 100644 --- a/pkg/webhooks/jobset_webhook_test.go +++ b/pkg/webhooks/jobset_webhook_test.go @@ -1599,7 +1599,7 @@ func TestValidateUpdate(t *testing.T) { }.ToAggregate(), }, { - name: "replicated job pod template can be updated for suspended jobset", + name: "replicated jobs updates can change replicas", js: &jobset.JobSet{ ObjectMeta: validObjectMeta, Spec: jobset.JobSetSpec{ @@ -1622,6 +1622,48 @@ func TestValidateUpdate(t *testing.T) { }, }, }, + oldJs: &jobset.JobSet{ + ObjectMeta: validObjectMeta, + Spec: jobset.JobSetSpec{ + ReplicatedJobs: []jobset.ReplicatedJob{ + { + Name: "test-jobset-replicated-job-0", + Replicas: 2, + Template: batchv1.JobTemplateSpec{ + Spec: batchv1.JobSpec{ + Parallelism: ptr.To[int32](2), + }, + }, + }, + }, + }, + }, + }, + { + name: "replicated jobs updates can change replicas on a suspended jobset", + js: &jobset.JobSet{ + ObjectMeta: validObjectMeta, + Spec: jobset.JobSetSpec{ + Suspend: ptr.To(true), + ReplicatedJobs: []jobset.ReplicatedJob{ + { + Name: "test-jobset-replicated-job-0", + Replicas: 2, + Template: batchv1.JobTemplateSpec{ + // Adding an annotation. + Spec: batchv1.JobSpec{ + Parallelism: ptr.To[int32](2), + Template: corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Annotations: map[string]string{"key": "value"}, + }, + }, + }, + }, + }, + }, + }, + }, oldJs: &jobset.JobSet{ ObjectMeta: validObjectMeta, Spec: jobset.JobSetSpec{ @@ -1842,7 +1884,34 @@ func TestValidateUpdate(t *testing.T) { oldJs: &jobset.JobSet{ ObjectMeta: validObjectMeta, Spec: jobset.JobSetSpec{ - Suspend: ptr.To(true), + ReplicatedJobs: []jobset.ReplicatedJob{ + { + Name: "test-jobset-replicated-job-0", + Replicas: 4, + Template: batchv1.JobTemplateSpec{ + Spec: batchv1.JobSpec{ + Parallelism: ptr.To[int32](2), + }, + }, + }, + { + Name: "test-jobset-replicated-job-1", + Replicas: 1, + Template: batchv1.JobTemplateSpec{ + Spec: batchv1.JobSpec{ + Parallelism: ptr.To[int32](1), + }, + }, + }, + }, + }, + }, + }, + { + name: "replicated jobs updates can not change job names", + js: &jobset.JobSet{ + ObjectMeta: validObjectMeta, + Spec: jobset.JobSetSpec{ ReplicatedJobs: []jobset.ReplicatedJob{ { Name: "test-jobset-replicated-job-0", @@ -1853,12 +1922,143 @@ func TestValidateUpdate(t *testing.T) { }, }, }, + { + Name: "test-jobset-replicated-job-1", + Replicas: 1, + Template: batchv1.JobTemplateSpec{ + Spec: batchv1.JobSpec{ + Parallelism: ptr.To[int32](1), + }, + }, + }, }, }, }, - want: field.ErrorList{ - field.Invalid(field.NewPath("spec").Child("replicatedJobs"), "", "field is immutable"), - }.ToAggregate(), + oldJs: &jobset.JobSet{ + ObjectMeta: validObjectMeta, + Spec: jobset.JobSetSpec{ + ReplicatedJobs: []jobset.ReplicatedJob{ + { + Name: "changed job name", + Replicas: 1, + Template: batchv1.JobTemplateSpec{ + Spec: batchv1.JobSpec{ + Parallelism: ptr.To[int32](1), + }, + }, + }, + { + Name: "test-jobset-replicated-job-1", + Replicas: 4, + Template: batchv1.JobTemplateSpec{ + Spec: batchv1.JobSpec{ + Parallelism: ptr.To[int32](2), + }, + }, + }, + }, + }, + }, + want: fmt.Errorf("updates can not change job names or reorder the jobs"), + }, + { + name: "replicated jobs length can not change on updates", + js: &jobset.JobSet{ + ObjectMeta: validObjectMeta, + Spec: jobset.JobSetSpec{ + ReplicatedJobs: []jobset.ReplicatedJob{ + { + Name: "test-jobset-replicated-job-0", + Replicas: 2, + Template: batchv1.JobTemplateSpec{ + Spec: batchv1.JobSpec{ + Parallelism: ptr.To[int32](2), + }, + }, + }, + { + Name: "test-jobset-replicated-job-1", + Replicas: 1, + Template: batchv1.JobTemplateSpec{ + Spec: batchv1.JobSpec{ + Parallelism: ptr.To[int32](1), + }, + }, + }, + }, + }, + }, + oldJs: &jobset.JobSet{ + ObjectMeta: validObjectMeta, + Spec: jobset.JobSetSpec{ + ReplicatedJobs: []jobset.ReplicatedJob{ + { + Name: "test-jobset-replicated-job-0", + Replicas: 2, + Template: batchv1.JobTemplateSpec{ + Spec: batchv1.JobSpec{ + Parallelism: ptr.To[int32](2), + }, + }, + }, + }, + }, + }, + want: fmt.Errorf("updates can not change job names or reorder the jobs"), + }, + { + name: "updates on replicated job templates are not allowed", + js: &jobset.JobSet{ + ObjectMeta: validObjectMeta, + Spec: jobset.JobSetSpec{ + ReplicatedJobs: []jobset.ReplicatedJob{ + { + Name: "test-jobset-replicated-job-0", + Replicas: 2, + Template: batchv1.JobTemplateSpec{ + Spec: batchv1.JobSpec{ + Parallelism: ptr.To[int32](2), + }, + }, + }, + { + Name: "test-jobset-replicated-job-1", + Replicas: 1, + Template: batchv1.JobTemplateSpec{ + Spec: batchv1.JobSpec{ + Parallelism: ptr.To[int32](1), + }, + }, + }, + }, + }, + }, + oldJs: &jobset.JobSet{ + ObjectMeta: validObjectMeta, + Spec: jobset.JobSetSpec{ + ReplicatedJobs: []jobset.ReplicatedJob{ + { + Name: "test-jobset-replicated-job-0", + Replicas: 2, + Template: batchv1.JobTemplateSpec{ + Spec: batchv1.JobSpec{ + Parallelism: ptr.To[int32](4), + }, + }, + }, + { + Name: "test-jobset-replicated-job-1", + Replicas: 1, + Template: batchv1.JobTemplateSpec{ + Spec: batchv1.JobSpec{ + Parallelism: ptr.To[int32](1), + }, + }, + }, + }, + }, + }, + want: fmt.Errorf("updates can not change job templates"), }, } @@ -1870,10 +2070,7 @@ func TestValidateUpdate(t *testing.T) { newObj := tc.js.DeepCopyObject() oldObj := tc.oldJs.DeepCopyObject() _, err = webhook.ValidateUpdate(context.TODO(), oldObj, newObj) - // Ignore bad value to keep test cases short and readable. - if diff := cmp.Diff(tc.want, err, cmpopts.IgnoreFields(field.Error{}, "BadValue")); diff != "" { - t.Errorf("ValidateResources() mismatch (-want +got):\n%s", diff) - } + cmp.Equal(tc.want, err, cmpopts.EquateErrors()) }) } } diff --git a/site/content/en/docs/concepts/_index.md b/site/content/en/docs/concepts/_index.md index 94273d7d0..200e3038c 100644 --- a/site/content/en/docs/concepts/_index.md +++ b/site/content/en/docs/concepts/_index.md @@ -154,3 +154,12 @@ A JobSet failure is counted when ANY of its child Jobs fail. `spec.failurePolicy to automatically restart the JobSet. A restart is done by recreating all child jobs. A JobSet is terminally failed when the number of failures reaches `spec.failurePolicy.maxRestarts` + +## Elastic JobSets + +JobSets have the ability to upscale or downscale after the job is created. + +JobSet supports this feature by allowing mutable changes for the replicas of a JobSet. + +One can increase or decrease the replicas of a ReplicatedJob. +Templates and Names of replicate jobs are not allowed to change during updates. diff --git a/test/e2e/e2e_test.go b/test/e2e/e2e_test.go index 9842db40c..2fa61c150 100644 --- a/test/e2e/e2e_test.go +++ b/test/e2e/e2e_test.go @@ -113,7 +113,7 @@ var _ = ginkgo.Describe("JobSet", func() { // Create JobSet. testFinalizer := "fake.example.com/blockDeletion" ginkgo.By("creating jobset with ttl seconds after finished") - js := sleepTestJobSet(ns, 20).Finalizers([]string{testFinalizer}).TTLSecondsAfterFinished(5).Obj() + js := sleepTestJobSet(ns, 4, 20).Finalizers([]string{testFinalizer}).TTLSecondsAfterFinished(5).Obj() // Verify jobset created successfully. ginkgo.By("checking that jobset creation succeeds") @@ -131,6 +131,52 @@ var _ = ginkgo.Describe("JobSet", func() { util.JobSetDeleted(ctx, k8sClient, js, timeout) }) }) + ginkgo.When("elastic jobs are upscaling", func() { + ginkgo.It("should create more replicas", func() { + ctx := context.Background() + + // Create JobSet. + ginkgo.By("creating jobset with four replicas") + js := sleepTestJobSet(ns, 4, 10000).Obj() + + // Verify jobset created successfully. + ginkgo.By("checking that jobset creation succeeds") + gomega.Expect(k8sClient.Create(ctx, js)).Should(gomega.Succeed()) + + util.UpdateReplicas(ctx, k8sClient, js, 0, 8, timeout) + ginkgo.By("jobset should upscale") + ginkgo.By("checking all jobs were created successfully") + gomega.Eventually(util.NumJobs, timeout, interval).WithArguments(ctx, k8sClient, js).Should(gomega.Equal(8)) + // Check jobset status if specified. + ginkgo.By("checking jobset condition") + util.JobSetCompleted(ctx, k8sClient, js, timeout) + }) + }) + ginkgo.When("elastic jobs are downscaling", func() { + ginkgo.It("should create less replicas", func() { + ctx := context.Background() + + // Create JobSet. + ginkgo.By("creating jobset with four replicas") + js := sleepTestJobSet(ns, 4, 10000).Obj() + + // Verify jobset created successfully. + ginkgo.By("checking that jobset creation succeeds") + gomega.Expect(k8sClient.Create(ctx, js)).Should(gomega.Succeed()) + + ginkgo.By("jobset should have 4 jobs") + ginkgo.By("checking all jobs were created successfully") + gomega.Eventually(util.NumJobs, timeout, interval).WithArguments(ctx, k8sClient, js).Should(gomega.Equal(4)) + + util.UpdateReplicas(ctx, k8sClient, js, 0, 2, timeout) + ginkgo.By("jobset should downscale") + ginkgo.By("checking all jobs were created successfully") + gomega.Eventually(util.NumJobs, timeout, interval).WithArguments(ctx, k8sClient, js).Should(gomega.Equal(2)) + // Check jobset status if specified. + ginkgo.By("checking jobset condition") + util.JobSetCompleted(ctx, k8sClient, js, timeout) + }) + }) // This test is added to test the JobSet transitions as Kueue would when: // doing: resume in ResourceFlavor1 -> suspend -> resume in ResourceFlavor2. @@ -140,7 +186,7 @@ var _ = ginkgo.Describe("JobSet", func() { ginkgo.It("should allow to resume JobSet after updating PodTemplate", func() { ctx := context.Background() - js := sleepTestJobSet(ns, 1).Obj() + js := sleepTestJobSet(ns, 4, 1).Obj() jsKey := types.NamespacedName{Name: js.Name, Namespace: js.Namespace} ginkgo.By("Create a suspended JobSet", func() { @@ -292,10 +338,9 @@ func pingTestJobSetSubdomain(ns *corev1.Namespace) *testing.JobSetWrapper { Obj()) } -func sleepTestJobSet(ns *corev1.Namespace, durationSeconds int32) *testing.JobSetWrapper { +func sleepTestJobSet(ns *corev1.Namespace, replicas int32, durationSeconds int32) *testing.JobSetWrapper { jsName := "js" rjobName := "rjob" - replicas := 4 return testing.MakeJobSet(jsName, ns.Name). ReplicatedJob(testing.MakeReplicatedJob(rjobName). Job(testing.MakeJobTemplate("job", ns.Name). diff --git a/test/integration/controller/jobset_controller_test.go b/test/integration/controller/jobset_controller_test.go index ad7e8a0a8..d67d71ad5 100644 --- a/test/integration/controller/jobset_controller_test.go +++ b/test/integration/controller/jobset_controller_test.go @@ -1563,6 +1563,53 @@ var _ = ginkgo.Describe("JobSet controller", func() { }, }, }), + ginkgo.Entry("elastic replicated jobs; upscale", &testCase{ + makeJobSet: func(ns *corev1.Namespace) *testing.JobSetWrapper { + return testJobSet(ns) + }, + steps: []*step{ + { + jobSetUpdateFn: func(js *jobset.JobSet) { + setReplicasReplicatedJob(js, "replicated-job-a", 4) + }, + }, + { + checkJobCreation: func(js *jobset.JobSet) { + expectedStarts := 7 + gomega.Eventually(testutil.NumJobs, timeout, interval).WithArguments(ctx, k8sClient, js).Should(gomega.Equal(expectedStarts)) + }, + }, + }, + }), + ginkgo.Entry("elastic replicated jobs; upscale and then downscale", &testCase{ + makeJobSet: func(ns *corev1.Namespace) *testing.JobSetWrapper { + return testJobSet(ns) + }, + steps: []*step{ + { + jobSetUpdateFn: func(js *jobset.JobSet) { + setReplicasReplicatedJob(js, "replicated-job-a", 4) + }, + }, + { + checkJobCreation: func(js *jobset.JobSet) { + expectedStarts := 7 + gomega.Eventually(testutil.NumJobs, timeout, interval).WithArguments(ctx, k8sClient, js).Should(gomega.Equal(expectedStarts)) + }, + }, + { + jobSetUpdateFn: func(js *jobset.JobSet) { + setReplicasReplicatedJob(js, "replicated-job-a", 1) + }, + }, + { + checkJobCreation: func(js *jobset.JobSet) { + expectedStarts := 2 + gomega.Eventually(testutil.NumJobs, timeout, interval).WithArguments(ctx, k8sClient, js).Should(gomega.Equal(expectedStarts)) + }, + }, + }, + }), ) // end of DescribeTable ginkgo.When("A JobSet is managed by another controller", ginkgo.Ordered, func() { @@ -1965,6 +2012,21 @@ func updatePodTemplates(js *jobset.JobSet, opts *updatePodTemplateOpts) { }, timeout, interval).Should(gomega.Succeed()) } +func setReplicasReplicatedJob(js *jobset.JobSet, replicatedJobName string, replicas int32) { + gomega.Eventually(func() error { + var jsGet jobset.JobSet + if err := k8sClient.Get(ctx, types.NamespacedName{Name: js.Name, Namespace: js.Namespace}, &jsGet); err != nil { + return err + } + for i, val := range jsGet.Spec.ReplicatedJobs { + if val.Name == replicatedJobName { + jsGet.Spec.ReplicatedJobs[i].Replicas = replicas + } + } + return k8sClient.Update(ctx, &jsGet) + }, timeout, interval).Should(gomega.Succeed()) +} + func matchJobsSuspendState(js *jobset.JobSet, suspend bool) (bool, error) { var jobList batchv1.JobList if err := k8sClient.List(ctx, &jobList, client.InNamespace(js.Namespace)); err != nil { diff --git a/test/util/util.go b/test/util/util.go index 8cd5fc2a8..1e126996c 100644 --- a/test/util/util.go +++ b/test/util/util.go @@ -234,6 +234,23 @@ func JobSetDeleted(ctx context.Context, k8sClient client.Client, js *jobset.JobS }, timeout, interval).Should(gomega.Equal(true)) } +func UpdateReplicas(ctx context.Context, k8sClient client.Client, js *jobset.JobSet, replicatedJobIndex, replicas int32, timeout time.Duration) { + ginkgo.By("updating replicated job replicas") + gomega.Eventually(func() (bool, error) { + // We get the latest version of the jobset before removing the finalizer. + var fresh jobset.JobSet + if err := k8sClient.Get(ctx, types.NamespacedName{Name: js.Name, Namespace: js.Namespace}, &fresh); err != nil { + return false, err + } + fresh.Spec.ReplicatedJobs[replicatedJobIndex].Replicas = replicas + if err := k8sClient.Update(ctx, &fresh); err != nil { + return false, err + } + return true, nil + }, timeout, interval).Should(gomega.Equal(true)) + +} + // RemoveJobSetFinalizer removes the provided finalizer from the jobset and updates it. func RemoveJobSetFinalizer(ctx context.Context, k8sClient client.Client, js *jobset.JobSet, finalizer string, timeout time.Duration) { ginkgo.By("removing jobset finalizers")