Skip to content

Commit

Permalink
support elastic jobset
Browse files Browse the repository at this point in the history
  • Loading branch information
kannon92 committed Aug 13, 2024
1 parent ec39730 commit f4e6034
Show file tree
Hide file tree
Showing 6 changed files with 372 additions and 18 deletions.
34 changes: 29 additions & 5 deletions pkg/webhooks/jobset_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
apivalidation "k8s.io/apimachinery/pkg/api/validation"

"k8s.io/apimachinery/pkg/runtime"
Expand Down Expand Up @@ -272,11 +273,16 @@ func (j *jobSetWebhook) ValidateUpdate(ctx context.Context, old, newObj runtime.
mungedSpec.ReplicatedJobs[index].Template.Spec.Template.Spec.SchedulingGates = oldJS.Spec.ReplicatedJobs[index].Template.Spec.Template.Spec.SchedulingGates
}
}

// Note that SucccessPolicy and failurePolicy are made immutable via CEL.
errs := apivalidation.ValidateImmutableField(mungedSpec.ReplicatedJobs, oldJS.Spec.ReplicatedJobs, field.NewPath("spec").Child("replicatedJobs"))
errs = append(errs, apivalidation.ValidateImmutableField(mungedSpec.ManagedBy, oldJS.Spec.ManagedBy, field.NewPath("spec").Child("managedBy"))...)
return nil, errs.ToAggregate()
// Note that SuccessPolicy and failurePolicy are made immutable via CEL.
// Comparing job templates can be slow
// Only do it if we detect a difference.
if !equality.Semantic.DeepEqual(mungedSpec.ReplicatedJobs, oldJS.Spec.ReplicatedJobs) {
if err := validateReplicatedJobsUpdate(mungedSpec.ReplicatedJobs, oldJS.Spec.ReplicatedJobs); err != nil {
return nil, err
}
}
errList := apivalidation.ValidateImmutableField(mungedSpec.ManagedBy, oldJS.Spec.ManagedBy, field.NewPath("spec").Child("managedBy"))
return nil, errList.ToAggregate()
}

// ValidateDelete implements webhook.Validator so a webhook will be registered for the type
Expand Down Expand Up @@ -396,3 +402,21 @@ func replicatedJobNamesFromSpec(js *jobset.JobSet) []string {
func completionModePtr(mode batchv1.CompletionMode) *batchv1.CompletionMode {
return &mode
}

// validateReplicatedJobsUpdate validates the updates for elastic jobs
// Changing length of jobs, name of jobs and the templates are forbidden
func validateReplicatedJobsUpdate(currentRepJobs, oldRepJobs []jobset.ReplicatedJob) error {
// Changing length of replicated jobs on updates is forbidden
if len(currentRepJobs) != len(oldRepJobs) {
return fmt.Errorf("updates can not change the length of replicated jobs")
}
for i := range currentRepJobs {
if currentRepJobs[i].Name != oldRepJobs[i].Name {
return fmt.Errorf("updates can not change job names or reorder the jobs")
}
if !equality.Semantic.DeepEqual(currentRepJobs[i].Template, oldRepJobs[i].Template) {
return fmt.Errorf("updates can not change job templates")
}
}
return nil
}
215 changes: 206 additions & 9 deletions pkg/webhooks/jobset_webhook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1599,7 +1599,7 @@ func TestValidateUpdate(t *testing.T) {
}.ToAggregate(),
},
{
name: "replicated job pod template can be updated for suspended jobset",
name: "replicated jobs updates can change replicas",
js: &jobset.JobSet{
ObjectMeta: validObjectMeta,
Spec: jobset.JobSetSpec{
Expand All @@ -1622,6 +1622,48 @@ func TestValidateUpdate(t *testing.T) {
},
},
},
oldJs: &jobset.JobSet{
ObjectMeta: validObjectMeta,
Spec: jobset.JobSetSpec{
ReplicatedJobs: []jobset.ReplicatedJob{
{
Name: "test-jobset-replicated-job-0",
Replicas: 2,
Template: batchv1.JobTemplateSpec{
Spec: batchv1.JobSpec{
Parallelism: ptr.To[int32](2),
},
},
},
},
},
},
},
{
name: "replicated jobs updates can change replicas on a suspended jobset",
js: &jobset.JobSet{
ObjectMeta: validObjectMeta,
Spec: jobset.JobSetSpec{
Suspend: ptr.To(true),
ReplicatedJobs: []jobset.ReplicatedJob{
{
Name: "test-jobset-replicated-job-0",
Replicas: 2,
Template: batchv1.JobTemplateSpec{
// Adding an annotation.
Spec: batchv1.JobSpec{
Parallelism: ptr.To[int32](2),
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Annotations: map[string]string{"key": "value"},
},
},
},
},
},
},
},
},
oldJs: &jobset.JobSet{
ObjectMeta: validObjectMeta,
Spec: jobset.JobSetSpec{
Expand Down Expand Up @@ -1842,7 +1884,34 @@ func TestValidateUpdate(t *testing.T) {
oldJs: &jobset.JobSet{
ObjectMeta: validObjectMeta,
Spec: jobset.JobSetSpec{
Suspend: ptr.To(true),
ReplicatedJobs: []jobset.ReplicatedJob{
{
Name: "test-jobset-replicated-job-0",
Replicas: 4,
Template: batchv1.JobTemplateSpec{
Spec: batchv1.JobSpec{
Parallelism: ptr.To[int32](2),
},
},
},
{
Name: "test-jobset-replicated-job-1",
Replicas: 1,
Template: batchv1.JobTemplateSpec{
Spec: batchv1.JobSpec{
Parallelism: ptr.To[int32](1),
},
},
},
},
},
},
},
{
name: "replicated jobs updates can not change job names",
js: &jobset.JobSet{
ObjectMeta: validObjectMeta,
Spec: jobset.JobSetSpec{
ReplicatedJobs: []jobset.ReplicatedJob{
{
Name: "test-jobset-replicated-job-0",
Expand All @@ -1853,12 +1922,143 @@ func TestValidateUpdate(t *testing.T) {
},
},
},
{
Name: "test-jobset-replicated-job-1",
Replicas: 1,
Template: batchv1.JobTemplateSpec{
Spec: batchv1.JobSpec{
Parallelism: ptr.To[int32](1),
},
},
},
},
},
},
want: field.ErrorList{
field.Invalid(field.NewPath("spec").Child("replicatedJobs"), "", "field is immutable"),
}.ToAggregate(),
oldJs: &jobset.JobSet{
ObjectMeta: validObjectMeta,
Spec: jobset.JobSetSpec{
ReplicatedJobs: []jobset.ReplicatedJob{
{
Name: "changed job name",
Replicas: 1,
Template: batchv1.JobTemplateSpec{
Spec: batchv1.JobSpec{
Parallelism: ptr.To[int32](1),
},
},
},
{
Name: "test-jobset-replicated-job-1",
Replicas: 4,
Template: batchv1.JobTemplateSpec{
Spec: batchv1.JobSpec{
Parallelism: ptr.To[int32](2),
},
},
},
},
},
},
want: fmt.Errorf("updates can not change job names or reorder the jobs"),
},
{
name: "replicated jobs length can not change on updates",
js: &jobset.JobSet{
ObjectMeta: validObjectMeta,
Spec: jobset.JobSetSpec{
ReplicatedJobs: []jobset.ReplicatedJob{
{
Name: "test-jobset-replicated-job-0",
Replicas: 2,
Template: batchv1.JobTemplateSpec{
Spec: batchv1.JobSpec{
Parallelism: ptr.To[int32](2),
},
},
},
{
Name: "test-jobset-replicated-job-1",
Replicas: 1,
Template: batchv1.JobTemplateSpec{
Spec: batchv1.JobSpec{
Parallelism: ptr.To[int32](1),
},
},
},
},
},
},
oldJs: &jobset.JobSet{
ObjectMeta: validObjectMeta,
Spec: jobset.JobSetSpec{
ReplicatedJobs: []jobset.ReplicatedJob{
{
Name: "test-jobset-replicated-job-0",
Replicas: 2,
Template: batchv1.JobTemplateSpec{
Spec: batchv1.JobSpec{
Parallelism: ptr.To[int32](2),
},
},
},
},
},
},
want: fmt.Errorf("updates can not change job names or reorder the jobs"),
},
{
name: "updates on replicated job templates are not allowed",
js: &jobset.JobSet{
ObjectMeta: validObjectMeta,
Spec: jobset.JobSetSpec{
ReplicatedJobs: []jobset.ReplicatedJob{
{
Name: "test-jobset-replicated-job-0",
Replicas: 2,
Template: batchv1.JobTemplateSpec{
Spec: batchv1.JobSpec{
Parallelism: ptr.To[int32](2),
},
},
},
{
Name: "test-jobset-replicated-job-1",
Replicas: 1,
Template: batchv1.JobTemplateSpec{
Spec: batchv1.JobSpec{
Parallelism: ptr.To[int32](1),
},
},
},
},
},
},
oldJs: &jobset.JobSet{
ObjectMeta: validObjectMeta,
Spec: jobset.JobSetSpec{
ReplicatedJobs: []jobset.ReplicatedJob{
{
Name: "test-jobset-replicated-job-0",
Replicas: 2,
Template: batchv1.JobTemplateSpec{
Spec: batchv1.JobSpec{
Parallelism: ptr.To[int32](4),
},
},
},
{
Name: "test-jobset-replicated-job-1",
Replicas: 1,
Template: batchv1.JobTemplateSpec{
Spec: batchv1.JobSpec{
Parallelism: ptr.To[int32](1),
},
},
},
},
},
},
want: fmt.Errorf("updates can not change job templates"),
},
}

Expand All @@ -1870,10 +2070,7 @@ func TestValidateUpdate(t *testing.T) {
newObj := tc.js.DeepCopyObject()
oldObj := tc.oldJs.DeepCopyObject()
_, err = webhook.ValidateUpdate(context.TODO(), oldObj, newObj)
// Ignore bad value to keep test cases short and readable.
if diff := cmp.Diff(tc.want, err, cmpopts.IgnoreFields(field.Error{}, "BadValue")); diff != "" {
t.Errorf("ValidateResources() mismatch (-want +got):\n%s", diff)
}
cmp.Equal(tc.want, err, cmpopts.EquateErrors())
})
}
}
9 changes: 9 additions & 0 deletions site/content/en/docs/concepts/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -154,3 +154,12 @@ A JobSet failure is counted when ANY of its child Jobs fail. `spec.failurePolicy
to automatically restart the JobSet. A restart is done by recreating all child jobs.

A JobSet is terminally failed when the number of failures reaches `spec.failurePolicy.maxRestarts`

## Elastic JobSets

JobSets have the ability to upscale or downscale after the job is created.

JobSet supports this feature by allowing mutable changes for the replicas of a JobSet.

One can increase or decrease the replicas of a ReplicatedJob.
Templates and Names of replicate jobs are not allowed to change during updates.
Loading

0 comments on commit f4e6034

Please sign in to comment.