diff --git a/internal/scheduler/scheduling/context/job.go b/internal/scheduler/scheduling/context/job.go index ae8fd0966dd..5f7982c4f3d 100644 --- a/internal/scheduler/scheduling/context/job.go +++ b/internal/scheduler/scheduling/context/job.go @@ -83,16 +83,6 @@ func (jctx *JobSchedulingContext) String() string { return sb.String() } -// SchedulingKey returns the scheduling key of the embedded job. -// If the jctx contains additional node selectors or tolerations, -// the key is invalid and the second return value is false. -func (jctx *JobSchedulingContext) SchedulingKey() (schedulerobjects.SchedulingKey, bool) { - if len(jctx.AdditionalNodeSelectors) != 0 || len(jctx.AdditionalTolerations) != 0 { - return schedulerobjects.EmptySchedulingKey, false - } - return jctx.Job.SchedulingKey(), true -} - func (jctx *JobSchedulingContext) IsSuccessful() bool { return jctx.UnschedulableReason == "" } diff --git a/internal/scheduler/scheduling/context/scheduling.go b/internal/scheduler/scheduling/context/scheduling.go index e3bb7ae9d32..9f770fd8d1d 100644 --- a/internal/scheduler/scheduling/context/scheduling.go +++ b/internal/scheduler/scheduling/context/scheduling.go @@ -84,10 +84,6 @@ func NewSchedulingContext( } } -func (sctx *SchedulingContext) ClearUnfeasibleSchedulingKeys() { - sctx.UnfeasibleSchedulingKeys = make(map[schedulerobjects.SchedulingKey]*JobSchedulingContext) -} - func (sctx *SchedulingContext) AddQueueSchedulingContext( queue string, weight float64, initialAllocatedByPriorityClass schedulerobjects.QuantityByTAndResourceType[string], diff --git a/internal/scheduler/scheduling/gang_scheduler.go b/internal/scheduler/scheduling/gang_scheduler.go index 247c83e9b77..e727c69ce6a 100644 --- a/internal/scheduler/scheduling/gang_scheduler.go +++ b/internal/scheduler/scheduling/gang_scheduler.go @@ -10,7 +10,6 @@ import ( "github.com/armadaproject/armada/internal/scheduler/floatingresources" "github.com/armadaproject/armada/internal/scheduler/internaltypes" "github.com/armadaproject/armada/internal/scheduler/nodedb" - "github.com/armadaproject/armada/internal/scheduler/schedulerobjects" schedulerconstraints "github.com/armadaproject/armada/internal/scheduler/scheduling/constraints" "github.com/armadaproject/armada/internal/scheduler/scheduling/context" ) @@ -21,8 +20,6 @@ type GangScheduler struct { floatingResourceTypes *floatingresources.FloatingResourceTypes schedulingContext *context.SchedulingContext nodeDb *nodedb.NodeDb - // If true, the unsuccessfulSchedulingKeys check is omitted. - skipUnsuccessfulSchedulingKeyCheck bool } func NewGangScheduler( @@ -30,14 +27,12 @@ func NewGangScheduler( constraints schedulerconstraints.SchedulingConstraints, floatingResourceTypes *floatingresources.FloatingResourceTypes, nodeDb *nodedb.NodeDb, - skipUnsuccessfulSchedulingKeyCheck bool, ) (*GangScheduler, error) { return &GangScheduler{ - constraints: constraints, - floatingResourceTypes: floatingResourceTypes, - schedulingContext: sctx, - nodeDb: nodeDb, - skipUnsuccessfulSchedulingKeyCheck: skipUnsuccessfulSchedulingKeyCheck, + constraints: constraints, + floatingResourceTypes: floatingResourceTypes, + schedulingContext: sctx, + nodeDb: nodeDb, }, nil } @@ -80,18 +75,15 @@ func (sch *GangScheduler) updateGangSchedulingContextOnFailure(gctx *context.Gan globallyUnschedulable := schedulerconstraints.UnschedulableReasonIsPropertyOfGang(unschedulableReason) - // Register globally unfeasible scheduling keys. - // - // Only record unfeasible scheduling keys for single-job gangs. - // Since a gang may be unschedulable even if all its members are individually schedulable. - if !sch.skipUnsuccessfulSchedulingKeyCheck && gctx.Cardinality() == 1 && globallyUnschedulable { + // Register globally unfeasible scheduling keys. This allows us to discard subsequent jobs with the same + // key as we know they cannot be scheduled. We only do this for: + // * Queued jobs, as evicted jobs no longer have a valid scheduling key + // * Single jobs as we have no concept of the scheduling key for a gang + if globallyUnschedulable && gctx.Cardinality() == 1 { jctx := gctx.JobSchedulingContexts[0] - schedulingKey, ok := jctx.SchedulingKey() - if ok && schedulingKey != schedulerobjects.EmptySchedulingKey { - if _, ok := sch.schedulingContext.UnfeasibleSchedulingKeys[schedulingKey]; !ok { - // Keep the first jctx for each unfeasible schedulingKey. - sch.schedulingContext.UnfeasibleSchedulingKeys[schedulingKey] = jctx - } + if !jctx.IsEvicted { + // Keep the first jctx for each unfeasible schedulingKey. + sch.schedulingContext.UnfeasibleSchedulingKeys[jctx.Job.SchedulingKey()] = jctx } } diff --git a/internal/scheduler/scheduling/gang_scheduler_test.go b/internal/scheduler/scheduling/gang_scheduler_test.go index e7f2dd17e77..b00d689b93f 100644 --- a/internal/scheduler/scheduling/gang_scheduler_test.go +++ b/internal/scheduler/scheduling/gang_scheduler_test.go @@ -576,10 +576,7 @@ func TestGangScheduler(t *testing.T) { jctxs := context.JobSchedulingContextsFromJobs(gang) require.Equal(t, 1, len(jctxs), fmt.Sprintf("gangs with cardinality greater than 1 don't have a single scheduling key: %v", gang)) jctx := jctxs[0] - key, _ := jctx.SchedulingKey() - require.NotEqual(t, key, schedulerobjects.EmptySchedulingKey, "expected unfeasible scheduling key cannot be the empty key") - - expectedUnfeasibleJobSchedulingKeys = append(expectedUnfeasibleJobSchedulingKeys, key) + expectedUnfeasibleJobSchedulingKeys = append(expectedUnfeasibleJobSchedulingKeys, jctx.Job.SchedulingKey()) } nodesById := make(map[string]*schedulerobjects.Node, len(tc.Nodes)) @@ -645,7 +642,7 @@ func TestGangScheduler(t *testing.T) { constraints := schedulerconstraints.NewSchedulingConstraints("pool", tc.TotalResources, tc.SchedulingConfig, nil) floatingResourceTypes, err := floatingresources.NewFloatingResourceTypes(tc.SchedulingConfig.ExperimentalFloatingResources) require.NoError(t, err) - sch, err := NewGangScheduler(sctx, constraints, floatingResourceTypes, nodeDb, false) + sch, err := NewGangScheduler(sctx, constraints, floatingResourceTypes, nodeDb) require.NoError(t, err) var actualScheduledIndices []int diff --git a/internal/scheduler/scheduling/jobiteration.go b/internal/scheduler/scheduling/jobiteration.go index 88a3f8e7818..6a89dac6d83 100644 --- a/internal/scheduler/scheduling/jobiteration.go +++ b/internal/scheduler/scheduling/jobiteration.go @@ -1,6 +1,7 @@ package scheduling import ( + "math" "sync" "golang.org/x/exp/slices" @@ -110,16 +111,25 @@ func (repo *InMemoryJobRepository) GetJobIterator(queue string) JobContextIterat // QueuedJobsIterator is an iterator over all jobs in a queue. type QueuedJobsIterator struct { - jobIter jobdb.JobIterator - pool string - ctx *armadacontext.Context + jobIter jobdb.JobIterator + pool string + maxLookback uint + jobsSeen uint + schedulingContext *schedulercontext.SchedulingContext + ctx *armadacontext.Context } -func NewQueuedJobsIterator(ctx *armadacontext.Context, queue string, pool string, repo JobRepository) *QueuedJobsIterator { +func NewQueuedJobsIterator(ctx *armadacontext.Context, queue string, pool string, maxLookback uint, schedulingContext *schedulercontext.SchedulingContext, repo JobRepository) *QueuedJobsIterator { + if maxLookback == 0 { + maxLookback = math.MaxUint + } return &QueuedJobsIterator{ - jobIter: repo.QueuedJobs(queue), - pool: pool, - ctx: ctx, + jobIter: repo.QueuedJobs(queue), + pool: pool, + maxLookback: maxLookback, + jobsSeen: 0, + schedulingContext: schedulingContext, + ctx: ctx, } } @@ -129,12 +139,40 @@ func (it *QueuedJobsIterator) Next() (*schedulercontext.JobSchedulingContext, er case <-it.ctx.Done(): return nil, it.ctx.Err() default: + // If we're beyond the max lookback then return done + if it.jobsSeen >= it.maxLookback { + return nil, nil + } + + // If there are no more jobs then return done job, _ := it.jobIter.Next() if job == nil { return nil, nil } - if slices.Contains(job.Pools(), it.pool) { - return schedulercontext.JobSchedulingContextFromJob(job), nil + + // If the job isn't for this pool then skip it + if !slices.Contains(job.Pools(), it.pool) { + continue + } + + jctx := schedulercontext.JobSchedulingContextFromJob(job) + schedulingKey := job.SchedulingKey() + it.jobsSeen++ + + // If the job is known to be unscheduable then it can be skipped + unsuccessfulJctx, jobUnschedulable := it.schedulingContext.UnfeasibleSchedulingKeys[schedulingKey] + + if jobUnschedulable { + // Since jctx would fail to schedule for the same reason as unsuccessfulJctx, + // set the unschedulable reason and pctx equal to that of unsuccessfulJctx. + jctx.UnschedulableReason = unsuccessfulJctx.UnschedulableReason + jctx.PodSchedulingContext = unsuccessfulJctx.PodSchedulingContext + _, err := it.schedulingContext.AddJobSchedulingContext(jctx) + if err != nil { + return nil, err + } + } else { + return jctx, nil } } } diff --git a/internal/scheduler/scheduling/jobiteration_test.go b/internal/scheduler/scheduling/jobiteration_test.go index 060cb6d70bb..92697f4a3f7 100644 --- a/internal/scheduler/scheduling/jobiteration_test.go +++ b/internal/scheduler/scheduling/jobiteration_test.go @@ -2,6 +2,7 @@ package scheduling import ( "context" + "math" "testing" "time" @@ -64,7 +65,7 @@ func TestMultiJobsIterator_TwoQueues(t *testing.T) { ctx := armadacontext.Background() its := make([]JobContextIterator, 3) for i, queue := range []string{"A", "B", "C"} { - it := NewQueuedJobsIterator(ctx, queue, testfixtures.TestPool, repo) + it := NewQueuedJobsIterator(ctx, queue, testfixtures.TestPool, math.MaxUint, &schedulercontext.SchedulingContext{}, repo) its[i] = it } it := NewMultiJobsIterator(its...) @@ -93,7 +94,7 @@ func TestQueuedJobsIterator_OneQueue(t *testing.T) { expected = append(expected, job.Id()) } ctx := armadacontext.Background() - it := NewQueuedJobsIterator(ctx, "A", testfixtures.TestPool, repo) + it := NewQueuedJobsIterator(ctx, "A", testfixtures.TestPool, math.MaxUint, &schedulercontext.SchedulingContext{}, repo) actual := make([]string, 0) for { jctx, err := it.Next() @@ -115,7 +116,7 @@ func TestQueuedJobsIterator_ExceedsBufferSize(t *testing.T) { expected = append(expected, job.Id()) } ctx := armadacontext.Background() - it := NewQueuedJobsIterator(ctx, "A", testfixtures.TestPool, repo) + it := NewQueuedJobsIterator(ctx, "A", testfixtures.TestPool, math.MaxUint, &schedulercontext.SchedulingContext{}, repo) actual := make([]string, 0) for { jctx, err := it.Next() @@ -137,7 +138,7 @@ func TestQueuedJobsIterator_ManyJobs(t *testing.T) { expected = append(expected, job.Id()) } ctx := armadacontext.Background() - it := NewQueuedJobsIterator(ctx, "A", testfixtures.TestPool, repo) + it := NewQueuedJobsIterator(ctx, "A", testfixtures.TestPool, math.MaxUint, &schedulercontext.SchedulingContext{}, repo) actual := make([]string, 0) for { jctx, err := it.Next() @@ -164,7 +165,7 @@ func TestCreateQueuedJobsIterator_TwoQueues(t *testing.T) { repo.Enqueue(job) } ctx := armadacontext.Background() - it := NewQueuedJobsIterator(ctx, "A", testfixtures.TestPool, repo) + it := NewQueuedJobsIterator(ctx, "A", testfixtures.TestPool, math.MaxUint, &schedulercontext.SchedulingContext{}, repo) actual := make([]string, 0) for { jctx, err := it.Next() @@ -187,7 +188,7 @@ func TestCreateQueuedJobsIterator_RespectsTimeout(t *testing.T) { ctx, cancel := armadacontext.WithTimeout(armadacontext.Background(), time.Millisecond) time.Sleep(20 * time.Millisecond) defer cancel() - it := NewQueuedJobsIterator(ctx, "A", testfixtures.TestPool, repo) + it := NewQueuedJobsIterator(ctx, "A", testfixtures.TestPool, math.MaxUint, &schedulercontext.SchedulingContext{}, repo) job, err := it.Next() assert.Nil(t, job) assert.ErrorIs(t, err, context.DeadlineExceeded) @@ -205,7 +206,7 @@ func TestCreateQueuedJobsIterator_NilOnEmpty(t *testing.T) { repo.Enqueue(job) } ctx := armadacontext.Background() - it := NewQueuedJobsIterator(ctx, "A", testfixtures.TestPool, repo) + it := NewQueuedJobsIterator(ctx, "A", testfixtures.TestPool, math.MaxUint, &schedulercontext.SchedulingContext{}, repo) for job, err := it.Next(); job != nil; job, err = it.Next() { require.NoError(t, err) } @@ -266,7 +267,7 @@ func (repo *mockJobRepository) Enqueue(job *jobdb.Job) { } func (repo *mockJobRepository) GetJobIterator(ctx *armadacontext.Context, queue string) JobContextIterator { - return NewQueuedJobsIterator(ctx, queue, testfixtures.TestPool, repo) + return NewQueuedJobsIterator(ctx, queue, testfixtures.TestPool, math.MaxUint, &schedulercontext.SchedulingContext{}, repo) } func jobFromPodSpec(queue string, req *schedulerobjects.PodRequirements) *jobdb.Job { diff --git a/internal/scheduler/scheduling/preempting_queue_scheduler.go b/internal/scheduler/scheduling/preempting_queue_scheduler.go index 1ea53569e5f..7b880bf2f42 100644 --- a/internal/scheduler/scheduling/preempting_queue_scheduler.go +++ b/internal/scheduler/scheduling/preempting_queue_scheduler.go @@ -467,10 +467,7 @@ func addEvictedJobsToNodeDb(_ *armadacontext.Context, sctx *schedulercontext.Sch gangItByQueue := make(map[string]*QueuedGangIterator) for _, qctx := range sctx.QueueSchedulingContexts { gangItByQueue[qctx.Queue] = NewQueuedGangIterator( - sctx, inMemoryJobRepo.GetJobIterator(qctx.Queue), - 0, - false, ) } qr := NewMinimalQueueRepositoryFromSchedulingContext(sctx) @@ -511,21 +508,17 @@ func (sch *PreemptingQueueScheduler) schedule(ctx *armadacontext.Context, inMemo if jobRepo == nil || reflect.ValueOf(jobRepo).IsNil() { jobIteratorByQueue[qctx.Queue] = evictedIt } else { - queueIt := NewQueuedJobsIterator(ctx, qctx.Queue, sch.schedulingContext.Pool, jobRepo) + queueIt := NewQueuedJobsIterator(ctx, qctx.Queue, sch.schedulingContext.Pool, sch.constraints.GetMaxQueueLookBack(), sch.schedulingContext, jobRepo) jobIteratorByQueue[qctx.Queue] = NewMultiJobsIterator(evictedIt, queueIt) } } - // Reset the scheduling keys cache after evicting jobs. - sch.schedulingContext.ClearUnfeasibleSchedulingKeys() - sched, err := NewQueueScheduler( sch.schedulingContext, sch.constraints, sch.floatingResourceTypes, sch.nodeDb, jobIteratorByQueue, - skipUnsuccessfulSchedulingKeyCheck, considerPriorityCLassPriority, ) if err != nil { diff --git a/internal/scheduler/scheduling/queue_scheduler.go b/internal/scheduler/scheduling/queue_scheduler.go index 86c68495305..485068bb965 100644 --- a/internal/scheduler/scheduling/queue_scheduler.go +++ b/internal/scheduler/scheduling/queue_scheduler.go @@ -4,7 +4,6 @@ import ( "container/heap" "fmt" "math" - "reflect" "time" "github.com/pkg/errors" @@ -33,7 +32,6 @@ func NewQueueScheduler( floatingResourceTypes *floatingresources.FloatingResourceTypes, nodeDb *nodedb.NodeDb, jobIteratorByQueue map[string]JobContextIterator, - skipUnsuccessfulSchedulingKeyCheck bool, considerPriorityClassPriority bool, ) (*QueueScheduler, error) { for queue := range jobIteratorByQueue { @@ -41,13 +39,13 @@ func NewQueueScheduler( return nil, errors.Errorf("no scheduling context for queue %s", queue) } } - gangScheduler, err := NewGangScheduler(sctx, constraints, floatingResourceTypes, nodeDb, skipUnsuccessfulSchedulingKeyCheck) + gangScheduler, err := NewGangScheduler(sctx, constraints, floatingResourceTypes, nodeDb) if err != nil { return nil, err } gangIteratorsByQueue := make(map[string]*QueuedGangIterator) for queue, it := range jobIteratorByQueue { - gangIteratorsByQueue[queue] = NewQueuedGangIterator(sctx, it, constraints.GetMaxQueueLookBack(), true) + gangIteratorsByQueue[queue] = NewQueuedGangIterator(it) } candidateGangIterator, err := NewCandidateGangIterator(sctx.Pool, sctx, sctx.FairnessCostProvider, gangIteratorsByQueue, considerPriorityClassPriority) if err != nil { @@ -216,26 +214,17 @@ func (sch *QueueScheduler) Schedule(ctx *armadacontext.Context) (*SchedulerResul // Each gang is yielded once its final member is received from the underlying iterator. // Jobs without gangIdAnnotation are considered gangs of cardinality 1. type QueuedGangIterator struct { - schedulingContext *schedulercontext.SchedulingContext + // schedulingContext *schedulercontext.SchedulingContext queuedJobsIterator JobContextIterator // Groups jctxs by the gang they belong to. jctxsByGangId map[string][]*schedulercontext.JobSchedulingContext - // Maximum number of jobs to look at before giving up. - maxLookback uint - // If true, do not yield jobs known to be unschedulable. - skipKnownUnschedulableJobs bool - // Number of jobs we have seen so far. - jobsSeen uint - next *schedulercontext.GangSchedulingContext + next *schedulercontext.GangSchedulingContext } -func NewQueuedGangIterator(sctx *schedulercontext.SchedulingContext, it JobContextIterator, maxLookback uint, skipKnownUnschedulableJobs bool) *QueuedGangIterator { +func NewQueuedGangIterator(it JobContextIterator) *QueuedGangIterator { return &QueuedGangIterator{ - schedulingContext: sctx, - queuedJobsIterator: it, - maxLookback: maxLookback, - skipKnownUnschedulableJobs: skipKnownUnschedulableJobs, - jctxsByGangId: make(map[string][]*schedulercontext.JobSchedulingContext), + queuedJobsIterator: it, + jctxsByGangId: make(map[string][]*schedulercontext.JobSchedulingContext), } } @@ -256,9 +245,6 @@ func (it *QueuedGangIterator) Clear() error { } func (it *QueuedGangIterator) Peek() (*schedulercontext.GangSchedulingContext, error) { - if it.hitLookbackLimit() { - return nil, nil - } if it.next != nil { return it.next, nil } @@ -270,34 +256,10 @@ func (it *QueuedGangIterator) Peek() (*schedulercontext.GangSchedulingContext, e jctx, err := it.queuedJobsIterator.Next() if err != nil { return nil, err - } else if jctx == nil || reflect.ValueOf(jctx).IsNil() { - return nil, nil - } - - // Queue lookback limits. Rescheduled jobs don't count towards the limit. - if !jctx.IsEvicted { - it.jobsSeen++ - } - if it.hitLookbackLimit() { + } else if jctx == nil { return nil, nil } - // Skip this job if it's known to be unschedulable. - if it.skipKnownUnschedulableJobs && len(it.schedulingContext.UnfeasibleSchedulingKeys) > 0 { - schedulingKey, ok := jctx.SchedulingKey() - if ok && schedulingKey != schedulerobjects.EmptySchedulingKey { - if unsuccessfulJctx, ok := it.schedulingContext.UnfeasibleSchedulingKeys[schedulingKey]; ok { - // Since jctx would fail to schedule for the same reason as unsuccessfulJctx, - // set the unschedulable reason and pctx equal to that of unsuccessfulJctx. - jctx.UnschedulableReason = unsuccessfulJctx.UnschedulableReason - jctx.PodSchedulingContext = unsuccessfulJctx.PodSchedulingContext - if _, err := it.schedulingContext.AddJobSchedulingContext(jctx); err != nil { - return nil, err - } - continue - } - } - } if gangId := jctx.GangInfo.Id; gangId != "" { gang := it.jctxsByGangId[gangId] gang = append(gang, jctx) @@ -317,13 +279,6 @@ func (it *QueuedGangIterator) Peek() (*schedulercontext.GangSchedulingContext, e } } -func (it *QueuedGangIterator) hitLookbackLimit() bool { - if it.maxLookback == 0 { - return false - } - return it.jobsSeen > it.maxLookback -} - // CandidateGangIterator determines which gang to try scheduling next across queues. // Specifically, it yields the next gang in the queue with smallest fraction of its fair share, // where the fraction of fair share computation includes the yielded gang. diff --git a/internal/scheduler/scheduling/queue_scheduler_test.go b/internal/scheduler/scheduling/queue_scheduler_test.go index 6657caff8df..bd4f340d39a 100644 --- a/internal/scheduler/scheduling/queue_scheduler_test.go +++ b/internal/scheduler/scheduling/queue_scheduler_test.go @@ -358,18 +358,6 @@ func TestQueueScheduler(t *testing.T) { Queues: testfixtures.SingleQueuePriorityOne("A"), ExpectedScheduledIndices: []int{0}, }, - "MaxQueueLookback": { - SchedulingConfig: testfixtures.WithMaxQueueLookbackConfig(3, testfixtures.TestSchedulingConfig()), - Nodes: testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), - Jobs: armadaslices.Concatenate( - testfixtures.N1Cpu4GiJobs("A", testfixtures.PriorityClass0, 1), - testfixtures.N32Cpu256GiJobs("A", testfixtures.PriorityClass0, 3), - testfixtures.N1Cpu4GiJobs("A", testfixtures.PriorityClass0, 1), - ), - Queues: testfixtures.SingleQueuePriorityOne("A"), - ExpectedScheduledIndices: []int{0}, - ExpectedNeverAttemptedIndices: []int{3, 4}, - }, "gang success": { SchedulingConfig: testfixtures.TestSchedulingConfig(), Nodes: testfixtures.N32CpuNodes(2, testfixtures.TestPriorities), @@ -538,7 +526,7 @@ func TestQueueScheduler(t *testing.T) { it := jobRepo.GetJobIterator(q.Name) jobIteratorByQueue[q.Name] = it } - sch, err := NewQueueScheduler(sctx, constraints, testfixtures.TestEmptyFloatingResources, nodeDb, jobIteratorByQueue, false, false) + sch, err := NewQueueScheduler(sctx, constraints, testfixtures.TestEmptyFloatingResources, nodeDb, jobIteratorByQueue, false) require.NoError(t, err) result, err := sch.Schedule(armadacontext.Background())