From ea8c81af363e06020f824055ce76e5359f4357f7 Mon Sep 17 00:00:00 2001 From: Chris Martin Date: Sat, 12 Oct 2024 12:45:05 +0100 Subject: [PATCH 1/6] move lookback limit to the queue iterator Signed-off-by: Chris Martin --- internal/scheduler/scheduling/jobiteration.go | 22 +++++++++----- .../scheduler/scheduling/jobiteration_test.go | 17 ++++++----- .../scheduling/preempting_queue_scheduler.go | 3 +- .../scheduler/scheduling/queue_scheduler.go | 29 ++----------------- .../scheduling/queue_scheduler_test.go | 12 -------- 5 files changed, 28 insertions(+), 55 deletions(-) diff --git a/internal/scheduler/scheduling/jobiteration.go b/internal/scheduler/scheduling/jobiteration.go index 88a3f8e7818..a33c50c3e62 100644 --- a/internal/scheduler/scheduling/jobiteration.go +++ b/internal/scheduler/scheduling/jobiteration.go @@ -110,16 +110,20 @@ func (repo *InMemoryJobRepository) GetJobIterator(queue string) JobContextIterat // QueuedJobsIterator is an iterator over all jobs in a queue. type QueuedJobsIterator struct { - jobIter jobdb.JobIterator - pool string - ctx *armadacontext.Context + jobIter jobdb.JobIterator + pool string + maxLookback uint + jobsSeen uint + ctx *armadacontext.Context } -func NewQueuedJobsIterator(ctx *armadacontext.Context, queue string, pool string, repo JobRepository) *QueuedJobsIterator { +func NewQueuedJobsIterator(ctx *armadacontext.Context, queue string, pool string, maxLookback uint, repo JobRepository) *QueuedJobsIterator { return &QueuedJobsIterator{ - jobIter: repo.QueuedJobs(queue), - pool: pool, - ctx: ctx, + jobIter: repo.QueuedJobs(queue), + pool: pool, + maxLookback: maxLookback, + jobsSeen: 0, + ctx: ctx, } } @@ -129,11 +133,15 @@ func (it *QueuedJobsIterator) Next() (*schedulercontext.JobSchedulingContext, er case <-it.ctx.Done(): return nil, it.ctx.Err() default: + if it.jobsSeen > it.maxLookback { + return nil, nil + } job, _ := it.jobIter.Next() if job == nil { return nil, nil } if slices.Contains(job.Pools(), it.pool) { + it.jobsSeen++ return schedulercontext.JobSchedulingContextFromJob(job), nil } } diff --git a/internal/scheduler/scheduling/jobiteration_test.go b/internal/scheduler/scheduling/jobiteration_test.go index 060cb6d70bb..4c049e88a5b 100644 --- a/internal/scheduler/scheduling/jobiteration_test.go +++ b/internal/scheduler/scheduling/jobiteration_test.go @@ -2,6 +2,7 @@ package scheduling import ( "context" + "math" "testing" "time" @@ -64,7 +65,7 @@ func TestMultiJobsIterator_TwoQueues(t *testing.T) { ctx := armadacontext.Background() its := make([]JobContextIterator, 3) for i, queue := range []string{"A", "B", "C"} { - it := NewQueuedJobsIterator(ctx, queue, testfixtures.TestPool, repo) + it := NewQueuedJobsIterator(ctx, queue, testfixtures.TestPool, math.MaxUint, repo) its[i] = it } it := NewMultiJobsIterator(its...) @@ -93,7 +94,7 @@ func TestQueuedJobsIterator_OneQueue(t *testing.T) { expected = append(expected, job.Id()) } ctx := armadacontext.Background() - it := NewQueuedJobsIterator(ctx, "A", testfixtures.TestPool, repo) + it := NewQueuedJobsIterator(ctx, "A", testfixtures.TestPool, math.MaxUint, repo) actual := make([]string, 0) for { jctx, err := it.Next() @@ -115,7 +116,7 @@ func TestQueuedJobsIterator_ExceedsBufferSize(t *testing.T) { expected = append(expected, job.Id()) } ctx := armadacontext.Background() - it := NewQueuedJobsIterator(ctx, "A", testfixtures.TestPool, repo) + it := NewQueuedJobsIterator(ctx, "A", testfixtures.TestPool, math.MaxUint, repo) actual := make([]string, 0) for { jctx, err := it.Next() @@ -137,7 +138,7 @@ func TestQueuedJobsIterator_ManyJobs(t *testing.T) { expected = append(expected, job.Id()) } ctx := armadacontext.Background() - it := NewQueuedJobsIterator(ctx, "A", testfixtures.TestPool, repo) + it := NewQueuedJobsIterator(ctx, "A", testfixtures.TestPool, math.MaxUint, repo) actual := make([]string, 0) for { jctx, err := it.Next() @@ -164,7 +165,7 @@ func TestCreateQueuedJobsIterator_TwoQueues(t *testing.T) { repo.Enqueue(job) } ctx := armadacontext.Background() - it := NewQueuedJobsIterator(ctx, "A", testfixtures.TestPool, repo) + it := NewQueuedJobsIterator(ctx, "A", testfixtures.TestPool, math.MaxUint, repo) actual := make([]string, 0) for { jctx, err := it.Next() @@ -187,7 +188,7 @@ func TestCreateQueuedJobsIterator_RespectsTimeout(t *testing.T) { ctx, cancel := armadacontext.WithTimeout(armadacontext.Background(), time.Millisecond) time.Sleep(20 * time.Millisecond) defer cancel() - it := NewQueuedJobsIterator(ctx, "A", testfixtures.TestPool, repo) + it := NewQueuedJobsIterator(ctx, "A", testfixtures.TestPool, math.MaxUint, repo) job, err := it.Next() assert.Nil(t, job) assert.ErrorIs(t, err, context.DeadlineExceeded) @@ -205,7 +206,7 @@ func TestCreateQueuedJobsIterator_NilOnEmpty(t *testing.T) { repo.Enqueue(job) } ctx := armadacontext.Background() - it := NewQueuedJobsIterator(ctx, "A", testfixtures.TestPool, repo) + it := NewQueuedJobsIterator(ctx, "A", testfixtures.TestPool, math.MaxUint, repo) for job, err := it.Next(); job != nil; job, err = it.Next() { require.NoError(t, err) } @@ -266,7 +267,7 @@ func (repo *mockJobRepository) Enqueue(job *jobdb.Job) { } func (repo *mockJobRepository) GetJobIterator(ctx *armadacontext.Context, queue string) JobContextIterator { - return NewQueuedJobsIterator(ctx, queue, testfixtures.TestPool, repo) + return NewQueuedJobsIterator(ctx, queue, testfixtures.TestPool, math.MaxUint, repo) } func jobFromPodSpec(queue string, req *schedulerobjects.PodRequirements) *jobdb.Job { diff --git a/internal/scheduler/scheduling/preempting_queue_scheduler.go b/internal/scheduler/scheduling/preempting_queue_scheduler.go index 1ea53569e5f..ea36416d154 100644 --- a/internal/scheduler/scheduling/preempting_queue_scheduler.go +++ b/internal/scheduler/scheduling/preempting_queue_scheduler.go @@ -469,7 +469,6 @@ func addEvictedJobsToNodeDb(_ *armadacontext.Context, sctx *schedulercontext.Sch gangItByQueue[qctx.Queue] = NewQueuedGangIterator( sctx, inMemoryJobRepo.GetJobIterator(qctx.Queue), - 0, false, ) } @@ -511,7 +510,7 @@ func (sch *PreemptingQueueScheduler) schedule(ctx *armadacontext.Context, inMemo if jobRepo == nil || reflect.ValueOf(jobRepo).IsNil() { jobIteratorByQueue[qctx.Queue] = evictedIt } else { - queueIt := NewQueuedJobsIterator(ctx, qctx.Queue, sch.schedulingContext.Pool, jobRepo) + queueIt := NewQueuedJobsIterator(ctx, qctx.Queue, sch.schedulingContext.Pool, sch.constraints.GetMaxQueueLookBack(), jobRepo) jobIteratorByQueue[qctx.Queue] = NewMultiJobsIterator(evictedIt, queueIt) } } diff --git a/internal/scheduler/scheduling/queue_scheduler.go b/internal/scheduler/scheduling/queue_scheduler.go index 86c68495305..65a367f27bb 100644 --- a/internal/scheduler/scheduling/queue_scheduler.go +++ b/internal/scheduler/scheduling/queue_scheduler.go @@ -47,7 +47,7 @@ func NewQueueScheduler( } gangIteratorsByQueue := make(map[string]*QueuedGangIterator) for queue, it := range jobIteratorByQueue { - gangIteratorsByQueue[queue] = NewQueuedGangIterator(sctx, it, constraints.GetMaxQueueLookBack(), true) + gangIteratorsByQueue[queue] = NewQueuedGangIterator(sctx, it, true) } candidateGangIterator, err := NewCandidateGangIterator(sctx.Pool, sctx, sctx.FairnessCostProvider, gangIteratorsByQueue, considerPriorityClassPriority) if err != nil { @@ -220,20 +220,15 @@ type QueuedGangIterator struct { queuedJobsIterator JobContextIterator // Groups jctxs by the gang they belong to. jctxsByGangId map[string][]*schedulercontext.JobSchedulingContext - // Maximum number of jobs to look at before giving up. - maxLookback uint // If true, do not yield jobs known to be unschedulable. skipKnownUnschedulableJobs bool - // Number of jobs we have seen so far. - jobsSeen uint - next *schedulercontext.GangSchedulingContext + next *schedulercontext.GangSchedulingContext } -func NewQueuedGangIterator(sctx *schedulercontext.SchedulingContext, it JobContextIterator, maxLookback uint, skipKnownUnschedulableJobs bool) *QueuedGangIterator { +func NewQueuedGangIterator(sctx *schedulercontext.SchedulingContext, it JobContextIterator, skipKnownUnschedulableJobs bool) *QueuedGangIterator { return &QueuedGangIterator{ schedulingContext: sctx, queuedJobsIterator: it, - maxLookback: maxLookback, skipKnownUnschedulableJobs: skipKnownUnschedulableJobs, jctxsByGangId: make(map[string][]*schedulercontext.JobSchedulingContext), } @@ -256,9 +251,6 @@ func (it *QueuedGangIterator) Clear() error { } func (it *QueuedGangIterator) Peek() (*schedulercontext.GangSchedulingContext, error) { - if it.hitLookbackLimit() { - return nil, nil - } if it.next != nil { return it.next, nil } @@ -274,14 +266,6 @@ func (it *QueuedGangIterator) Peek() (*schedulercontext.GangSchedulingContext, e return nil, nil } - // Queue lookback limits. Rescheduled jobs don't count towards the limit. - if !jctx.IsEvicted { - it.jobsSeen++ - } - if it.hitLookbackLimit() { - return nil, nil - } - // Skip this job if it's known to be unschedulable. if it.skipKnownUnschedulableJobs && len(it.schedulingContext.UnfeasibleSchedulingKeys) > 0 { schedulingKey, ok := jctx.SchedulingKey() @@ -317,13 +301,6 @@ func (it *QueuedGangIterator) Peek() (*schedulercontext.GangSchedulingContext, e } } -func (it *QueuedGangIterator) hitLookbackLimit() bool { - if it.maxLookback == 0 { - return false - } - return it.jobsSeen > it.maxLookback -} - // CandidateGangIterator determines which gang to try scheduling next across queues. // Specifically, it yields the next gang in the queue with smallest fraction of its fair share, // where the fraction of fair share computation includes the yielded gang. diff --git a/internal/scheduler/scheduling/queue_scheduler_test.go b/internal/scheduler/scheduling/queue_scheduler_test.go index 6657caff8df..374aed92fdb 100644 --- a/internal/scheduler/scheduling/queue_scheduler_test.go +++ b/internal/scheduler/scheduling/queue_scheduler_test.go @@ -358,18 +358,6 @@ func TestQueueScheduler(t *testing.T) { Queues: testfixtures.SingleQueuePriorityOne("A"), ExpectedScheduledIndices: []int{0}, }, - "MaxQueueLookback": { - SchedulingConfig: testfixtures.WithMaxQueueLookbackConfig(3, testfixtures.TestSchedulingConfig()), - Nodes: testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), - Jobs: armadaslices.Concatenate( - testfixtures.N1Cpu4GiJobs("A", testfixtures.PriorityClass0, 1), - testfixtures.N32Cpu256GiJobs("A", testfixtures.PriorityClass0, 3), - testfixtures.N1Cpu4GiJobs("A", testfixtures.PriorityClass0, 1), - ), - Queues: testfixtures.SingleQueuePriorityOne("A"), - ExpectedScheduledIndices: []int{0}, - ExpectedNeverAttemptedIndices: []int{3, 4}, - }, "gang success": { SchedulingConfig: testfixtures.TestSchedulingConfig(), Nodes: testfixtures.N32CpuNodes(2, testfixtures.TestPriorities), From 1b4b93b997b7f686fc6148c7884f9962c06c22a4 Mon Sep 17 00:00:00 2001 From: Chris Martin Date: Sat, 12 Oct 2024 13:00:07 +0100 Subject: [PATCH 2/6] move lookback limit to the queue iterator Signed-off-by: Chris Martin --- internal/scheduler/scheduling/jobiteration.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/internal/scheduler/scheduling/jobiteration.go b/internal/scheduler/scheduling/jobiteration.go index a33c50c3e62..56104d92061 100644 --- a/internal/scheduler/scheduling/jobiteration.go +++ b/internal/scheduler/scheduling/jobiteration.go @@ -1,6 +1,7 @@ package scheduling import ( + "math" "sync" "golang.org/x/exp/slices" @@ -118,6 +119,9 @@ type QueuedJobsIterator struct { } func NewQueuedJobsIterator(ctx *armadacontext.Context, queue string, pool string, maxLookback uint, repo JobRepository) *QueuedJobsIterator { + if maxLookback == 0 { + maxLookback = math.MaxUint + } return &QueuedJobsIterator{ jobIter: repo.QueuedJobs(queue), pool: pool, @@ -133,7 +137,7 @@ func (it *QueuedJobsIterator) Next() (*schedulercontext.JobSchedulingContext, er case <-it.ctx.Done(): return nil, it.ctx.Err() default: - if it.jobsSeen > it.maxLookback { + if it.jobsSeen >= it.maxLookback { return nil, nil } job, _ := it.jobIter.Next() From 779eeeb6dc31ba685829461675863514e9843732 Mon Sep 17 00:00:00 2001 From: Chris Martin Date: Sat, 12 Oct 2024 17:05:39 +0100 Subject: [PATCH 3/6] move scheduling key check early on Signed-off-by: Chris Martin --- internal/scheduler/scheduling/jobiteration.go | 54 ++++++++++++++----- .../scheduler/scheduling/jobiteration_test.go | 16 +++--- .../scheduling/preempting_queue_scheduler.go | 4 +- .../scheduler/scheduling/queue_scheduler.go | 31 +++-------- 4 files changed, 55 insertions(+), 50 deletions(-) diff --git a/internal/scheduler/scheduling/jobiteration.go b/internal/scheduler/scheduling/jobiteration.go index 56104d92061..6a89dac6d83 100644 --- a/internal/scheduler/scheduling/jobiteration.go +++ b/internal/scheduler/scheduling/jobiteration.go @@ -111,23 +111,25 @@ func (repo *InMemoryJobRepository) GetJobIterator(queue string) JobContextIterat // QueuedJobsIterator is an iterator over all jobs in a queue. type QueuedJobsIterator struct { - jobIter jobdb.JobIterator - pool string - maxLookback uint - jobsSeen uint - ctx *armadacontext.Context + jobIter jobdb.JobIterator + pool string + maxLookback uint + jobsSeen uint + schedulingContext *schedulercontext.SchedulingContext + ctx *armadacontext.Context } -func NewQueuedJobsIterator(ctx *armadacontext.Context, queue string, pool string, maxLookback uint, repo JobRepository) *QueuedJobsIterator { +func NewQueuedJobsIterator(ctx *armadacontext.Context, queue string, pool string, maxLookback uint, schedulingContext *schedulercontext.SchedulingContext, repo JobRepository) *QueuedJobsIterator { if maxLookback == 0 { maxLookback = math.MaxUint } return &QueuedJobsIterator{ - jobIter: repo.QueuedJobs(queue), - pool: pool, - maxLookback: maxLookback, - jobsSeen: 0, - ctx: ctx, + jobIter: repo.QueuedJobs(queue), + pool: pool, + maxLookback: maxLookback, + jobsSeen: 0, + schedulingContext: schedulingContext, + ctx: ctx, } } @@ -137,16 +139,40 @@ func (it *QueuedJobsIterator) Next() (*schedulercontext.JobSchedulingContext, er case <-it.ctx.Done(): return nil, it.ctx.Err() default: + // If we're beyond the max lookback then return done if it.jobsSeen >= it.maxLookback { return nil, nil } + + // If there are no more jobs then return done job, _ := it.jobIter.Next() if job == nil { return nil, nil } - if slices.Contains(job.Pools(), it.pool) { - it.jobsSeen++ - return schedulercontext.JobSchedulingContextFromJob(job), nil + + // If the job isn't for this pool then skip it + if !slices.Contains(job.Pools(), it.pool) { + continue + } + + jctx := schedulercontext.JobSchedulingContextFromJob(job) + schedulingKey := job.SchedulingKey() + it.jobsSeen++ + + // If the job is known to be unscheduable then it can be skipped + unsuccessfulJctx, jobUnschedulable := it.schedulingContext.UnfeasibleSchedulingKeys[schedulingKey] + + if jobUnschedulable { + // Since jctx would fail to schedule for the same reason as unsuccessfulJctx, + // set the unschedulable reason and pctx equal to that of unsuccessfulJctx. + jctx.UnschedulableReason = unsuccessfulJctx.UnschedulableReason + jctx.PodSchedulingContext = unsuccessfulJctx.PodSchedulingContext + _, err := it.schedulingContext.AddJobSchedulingContext(jctx) + if err != nil { + return nil, err + } + } else { + return jctx, nil } } } diff --git a/internal/scheduler/scheduling/jobiteration_test.go b/internal/scheduler/scheduling/jobiteration_test.go index 4c049e88a5b..92697f4a3f7 100644 --- a/internal/scheduler/scheduling/jobiteration_test.go +++ b/internal/scheduler/scheduling/jobiteration_test.go @@ -65,7 +65,7 @@ func TestMultiJobsIterator_TwoQueues(t *testing.T) { ctx := armadacontext.Background() its := make([]JobContextIterator, 3) for i, queue := range []string{"A", "B", "C"} { - it := NewQueuedJobsIterator(ctx, queue, testfixtures.TestPool, math.MaxUint, repo) + it := NewQueuedJobsIterator(ctx, queue, testfixtures.TestPool, math.MaxUint, &schedulercontext.SchedulingContext{}, repo) its[i] = it } it := NewMultiJobsIterator(its...) @@ -94,7 +94,7 @@ func TestQueuedJobsIterator_OneQueue(t *testing.T) { expected = append(expected, job.Id()) } ctx := armadacontext.Background() - it := NewQueuedJobsIterator(ctx, "A", testfixtures.TestPool, math.MaxUint, repo) + it := NewQueuedJobsIterator(ctx, "A", testfixtures.TestPool, math.MaxUint, &schedulercontext.SchedulingContext{}, repo) actual := make([]string, 0) for { jctx, err := it.Next() @@ -116,7 +116,7 @@ func TestQueuedJobsIterator_ExceedsBufferSize(t *testing.T) { expected = append(expected, job.Id()) } ctx := armadacontext.Background() - it := NewQueuedJobsIterator(ctx, "A", testfixtures.TestPool, math.MaxUint, repo) + it := NewQueuedJobsIterator(ctx, "A", testfixtures.TestPool, math.MaxUint, &schedulercontext.SchedulingContext{}, repo) actual := make([]string, 0) for { jctx, err := it.Next() @@ -138,7 +138,7 @@ func TestQueuedJobsIterator_ManyJobs(t *testing.T) { expected = append(expected, job.Id()) } ctx := armadacontext.Background() - it := NewQueuedJobsIterator(ctx, "A", testfixtures.TestPool, math.MaxUint, repo) + it := NewQueuedJobsIterator(ctx, "A", testfixtures.TestPool, math.MaxUint, &schedulercontext.SchedulingContext{}, repo) actual := make([]string, 0) for { jctx, err := it.Next() @@ -165,7 +165,7 @@ func TestCreateQueuedJobsIterator_TwoQueues(t *testing.T) { repo.Enqueue(job) } ctx := armadacontext.Background() - it := NewQueuedJobsIterator(ctx, "A", testfixtures.TestPool, math.MaxUint, repo) + it := NewQueuedJobsIterator(ctx, "A", testfixtures.TestPool, math.MaxUint, &schedulercontext.SchedulingContext{}, repo) actual := make([]string, 0) for { jctx, err := it.Next() @@ -188,7 +188,7 @@ func TestCreateQueuedJobsIterator_RespectsTimeout(t *testing.T) { ctx, cancel := armadacontext.WithTimeout(armadacontext.Background(), time.Millisecond) time.Sleep(20 * time.Millisecond) defer cancel() - it := NewQueuedJobsIterator(ctx, "A", testfixtures.TestPool, math.MaxUint, repo) + it := NewQueuedJobsIterator(ctx, "A", testfixtures.TestPool, math.MaxUint, &schedulercontext.SchedulingContext{}, repo) job, err := it.Next() assert.Nil(t, job) assert.ErrorIs(t, err, context.DeadlineExceeded) @@ -206,7 +206,7 @@ func TestCreateQueuedJobsIterator_NilOnEmpty(t *testing.T) { repo.Enqueue(job) } ctx := armadacontext.Background() - it := NewQueuedJobsIterator(ctx, "A", testfixtures.TestPool, math.MaxUint, repo) + it := NewQueuedJobsIterator(ctx, "A", testfixtures.TestPool, math.MaxUint, &schedulercontext.SchedulingContext{}, repo) for job, err := it.Next(); job != nil; job, err = it.Next() { require.NoError(t, err) } @@ -267,7 +267,7 @@ func (repo *mockJobRepository) Enqueue(job *jobdb.Job) { } func (repo *mockJobRepository) GetJobIterator(ctx *armadacontext.Context, queue string) JobContextIterator { - return NewQueuedJobsIterator(ctx, queue, testfixtures.TestPool, math.MaxUint, repo) + return NewQueuedJobsIterator(ctx, queue, testfixtures.TestPool, math.MaxUint, &schedulercontext.SchedulingContext{}, repo) } func jobFromPodSpec(queue string, req *schedulerobjects.PodRequirements) *jobdb.Job { diff --git a/internal/scheduler/scheduling/preempting_queue_scheduler.go b/internal/scheduler/scheduling/preempting_queue_scheduler.go index ea36416d154..12a43fa21e6 100644 --- a/internal/scheduler/scheduling/preempting_queue_scheduler.go +++ b/internal/scheduler/scheduling/preempting_queue_scheduler.go @@ -467,9 +467,7 @@ func addEvictedJobsToNodeDb(_ *armadacontext.Context, sctx *schedulercontext.Sch gangItByQueue := make(map[string]*QueuedGangIterator) for _, qctx := range sctx.QueueSchedulingContexts { gangItByQueue[qctx.Queue] = NewQueuedGangIterator( - sctx, inMemoryJobRepo.GetJobIterator(qctx.Queue), - false, ) } qr := NewMinimalQueueRepositoryFromSchedulingContext(sctx) @@ -510,7 +508,7 @@ func (sch *PreemptingQueueScheduler) schedule(ctx *armadacontext.Context, inMemo if jobRepo == nil || reflect.ValueOf(jobRepo).IsNil() { jobIteratorByQueue[qctx.Queue] = evictedIt } else { - queueIt := NewQueuedJobsIterator(ctx, qctx.Queue, sch.schedulingContext.Pool, sch.constraints.GetMaxQueueLookBack(), jobRepo) + queueIt := NewQueuedJobsIterator(ctx, qctx.Queue, sch.schedulingContext.Pool, sch.constraints.GetMaxQueueLookBack(), sch.schedulingContext, jobRepo) jobIteratorByQueue[qctx.Queue] = NewMultiJobsIterator(evictedIt, queueIt) } } diff --git a/internal/scheduler/scheduling/queue_scheduler.go b/internal/scheduler/scheduling/queue_scheduler.go index 65a367f27bb..26b4d0b9c12 100644 --- a/internal/scheduler/scheduling/queue_scheduler.go +++ b/internal/scheduler/scheduling/queue_scheduler.go @@ -4,7 +4,6 @@ import ( "container/heap" "fmt" "math" - "reflect" "time" "github.com/pkg/errors" @@ -47,7 +46,7 @@ func NewQueueScheduler( } gangIteratorsByQueue := make(map[string]*QueuedGangIterator) for queue, it := range jobIteratorByQueue { - gangIteratorsByQueue[queue] = NewQueuedGangIterator(sctx, it, true) + gangIteratorsByQueue[queue] = NewQueuedGangIterator(it) } candidateGangIterator, err := NewCandidateGangIterator(sctx.Pool, sctx, sctx.FairnessCostProvider, gangIteratorsByQueue, considerPriorityClassPriority) if err != nil { @@ -216,7 +215,7 @@ func (sch *QueueScheduler) Schedule(ctx *armadacontext.Context) (*SchedulerResul // Each gang is yielded once its final member is received from the underlying iterator. // Jobs without gangIdAnnotation are considered gangs of cardinality 1. type QueuedGangIterator struct { - schedulingContext *schedulercontext.SchedulingContext + //schedulingContext *schedulercontext.SchedulingContext queuedJobsIterator JobContextIterator // Groups jctxs by the gang they belong to. jctxsByGangId map[string][]*schedulercontext.JobSchedulingContext @@ -225,12 +224,10 @@ type QueuedGangIterator struct { next *schedulercontext.GangSchedulingContext } -func NewQueuedGangIterator(sctx *schedulercontext.SchedulingContext, it JobContextIterator, skipKnownUnschedulableJobs bool) *QueuedGangIterator { +func NewQueuedGangIterator(it JobContextIterator) *QueuedGangIterator { return &QueuedGangIterator{ - schedulingContext: sctx, - queuedJobsIterator: it, - skipKnownUnschedulableJobs: skipKnownUnschedulableJobs, - jctxsByGangId: make(map[string][]*schedulercontext.JobSchedulingContext), + queuedJobsIterator: it, + jctxsByGangId: make(map[string][]*schedulercontext.JobSchedulingContext), } } @@ -262,26 +259,10 @@ func (it *QueuedGangIterator) Peek() (*schedulercontext.GangSchedulingContext, e jctx, err := it.queuedJobsIterator.Next() if err != nil { return nil, err - } else if jctx == nil || reflect.ValueOf(jctx).IsNil() { + } else if jctx == nil { return nil, nil } - // Skip this job if it's known to be unschedulable. - if it.skipKnownUnschedulableJobs && len(it.schedulingContext.UnfeasibleSchedulingKeys) > 0 { - schedulingKey, ok := jctx.SchedulingKey() - if ok && schedulingKey != schedulerobjects.EmptySchedulingKey { - if unsuccessfulJctx, ok := it.schedulingContext.UnfeasibleSchedulingKeys[schedulingKey]; ok { - // Since jctx would fail to schedule for the same reason as unsuccessfulJctx, - // set the unschedulable reason and pctx equal to that of unsuccessfulJctx. - jctx.UnschedulableReason = unsuccessfulJctx.UnschedulableReason - jctx.PodSchedulingContext = unsuccessfulJctx.PodSchedulingContext - if _, err := it.schedulingContext.AddJobSchedulingContext(jctx); err != nil { - return nil, err - } - continue - } - } - } if gangId := jctx.GangInfo.Id; gangId != "" { gang := it.jctxsByGangId[gangId] gang = append(gang, jctx) From 1e59b21a3e66ee8f918dc3898cc8790bdf679965 Mon Sep 17 00:00:00 2001 From: Chris Martin Date: Sat, 12 Oct 2024 17:07:25 +0100 Subject: [PATCH 4/6] lint Signed-off-by: Chris Martin --- internal/scheduler/scheduling/queue_scheduler.go | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/internal/scheduler/scheduling/queue_scheduler.go b/internal/scheduler/scheduling/queue_scheduler.go index 26b4d0b9c12..2b4033a1480 100644 --- a/internal/scheduler/scheduling/queue_scheduler.go +++ b/internal/scheduler/scheduling/queue_scheduler.go @@ -215,13 +215,11 @@ func (sch *QueueScheduler) Schedule(ctx *armadacontext.Context) (*SchedulerResul // Each gang is yielded once its final member is received from the underlying iterator. // Jobs without gangIdAnnotation are considered gangs of cardinality 1. type QueuedGangIterator struct { - //schedulingContext *schedulercontext.SchedulingContext + // schedulingContext *schedulercontext.SchedulingContext queuedJobsIterator JobContextIterator // Groups jctxs by the gang they belong to. jctxsByGangId map[string][]*schedulercontext.JobSchedulingContext - // If true, do not yield jobs known to be unschedulable. - skipKnownUnschedulableJobs bool - next *schedulercontext.GangSchedulingContext + next *schedulercontext.GangSchedulingContext } func NewQueuedGangIterator(it JobContextIterator) *QueuedGangIterator { From cc6ad2251db5a332b9ad70d6e0ab18d4f4c672b7 Mon Sep 17 00:00:00 2001 From: Chris Martin Date: Sat, 12 Oct 2024 17:31:49 +0100 Subject: [PATCH 5/6] lint Signed-off-by: Chris Martin --- internal/scheduler/scheduling/context/scheduling.go | 4 ---- internal/scheduler/scheduling/gang_scheduler.go | 10 ++++------ internal/scheduler/scheduling/gang_scheduler_test.go | 2 +- .../scheduler/scheduling/preempting_queue_scheduler.go | 4 ---- internal/scheduler/scheduling/queue_scheduler.go | 3 +-- internal/scheduler/scheduling/queue_scheduler_test.go | 2 +- 6 files changed, 7 insertions(+), 18 deletions(-) diff --git a/internal/scheduler/scheduling/context/scheduling.go b/internal/scheduler/scheduling/context/scheduling.go index e3bb7ae9d32..9f770fd8d1d 100644 --- a/internal/scheduler/scheduling/context/scheduling.go +++ b/internal/scheduler/scheduling/context/scheduling.go @@ -84,10 +84,6 @@ func NewSchedulingContext( } } -func (sctx *SchedulingContext) ClearUnfeasibleSchedulingKeys() { - sctx.UnfeasibleSchedulingKeys = make(map[schedulerobjects.SchedulingKey]*JobSchedulingContext) -} - func (sctx *SchedulingContext) AddQueueSchedulingContext( queue string, weight float64, initialAllocatedByPriorityClass schedulerobjects.QuantityByTAndResourceType[string], diff --git a/internal/scheduler/scheduling/gang_scheduler.go b/internal/scheduler/scheduling/gang_scheduler.go index 247c83e9b77..3ca0f4ed2ab 100644 --- a/internal/scheduler/scheduling/gang_scheduler.go +++ b/internal/scheduler/scheduling/gang_scheduler.go @@ -30,14 +30,12 @@ func NewGangScheduler( constraints schedulerconstraints.SchedulingConstraints, floatingResourceTypes *floatingresources.FloatingResourceTypes, nodeDb *nodedb.NodeDb, - skipUnsuccessfulSchedulingKeyCheck bool, ) (*GangScheduler, error) { return &GangScheduler{ - constraints: constraints, - floatingResourceTypes: floatingResourceTypes, - schedulingContext: sctx, - nodeDb: nodeDb, - skipUnsuccessfulSchedulingKeyCheck: skipUnsuccessfulSchedulingKeyCheck, + constraints: constraints, + floatingResourceTypes: floatingResourceTypes, + schedulingContext: sctx, + nodeDb: nodeDb, }, nil } diff --git a/internal/scheduler/scheduling/gang_scheduler_test.go b/internal/scheduler/scheduling/gang_scheduler_test.go index e7f2dd17e77..f5b0d6a303b 100644 --- a/internal/scheduler/scheduling/gang_scheduler_test.go +++ b/internal/scheduler/scheduling/gang_scheduler_test.go @@ -645,7 +645,7 @@ func TestGangScheduler(t *testing.T) { constraints := schedulerconstraints.NewSchedulingConstraints("pool", tc.TotalResources, tc.SchedulingConfig, nil) floatingResourceTypes, err := floatingresources.NewFloatingResourceTypes(tc.SchedulingConfig.ExperimentalFloatingResources) require.NoError(t, err) - sch, err := NewGangScheduler(sctx, constraints, floatingResourceTypes, nodeDb, false) + sch, err := NewGangScheduler(sctx, constraints, floatingResourceTypes, nodeDb) require.NoError(t, err) var actualScheduledIndices []int diff --git a/internal/scheduler/scheduling/preempting_queue_scheduler.go b/internal/scheduler/scheduling/preempting_queue_scheduler.go index 12a43fa21e6..7b880bf2f42 100644 --- a/internal/scheduler/scheduling/preempting_queue_scheduler.go +++ b/internal/scheduler/scheduling/preempting_queue_scheduler.go @@ -513,16 +513,12 @@ func (sch *PreemptingQueueScheduler) schedule(ctx *armadacontext.Context, inMemo } } - // Reset the scheduling keys cache after evicting jobs. - sch.schedulingContext.ClearUnfeasibleSchedulingKeys() - sched, err := NewQueueScheduler( sch.schedulingContext, sch.constraints, sch.floatingResourceTypes, sch.nodeDb, jobIteratorByQueue, - skipUnsuccessfulSchedulingKeyCheck, considerPriorityCLassPriority, ) if err != nil { diff --git a/internal/scheduler/scheduling/queue_scheduler.go b/internal/scheduler/scheduling/queue_scheduler.go index 2b4033a1480..485068bb965 100644 --- a/internal/scheduler/scheduling/queue_scheduler.go +++ b/internal/scheduler/scheduling/queue_scheduler.go @@ -32,7 +32,6 @@ func NewQueueScheduler( floatingResourceTypes *floatingresources.FloatingResourceTypes, nodeDb *nodedb.NodeDb, jobIteratorByQueue map[string]JobContextIterator, - skipUnsuccessfulSchedulingKeyCheck bool, considerPriorityClassPriority bool, ) (*QueueScheduler, error) { for queue := range jobIteratorByQueue { @@ -40,7 +39,7 @@ func NewQueueScheduler( return nil, errors.Errorf("no scheduling context for queue %s", queue) } } - gangScheduler, err := NewGangScheduler(sctx, constraints, floatingResourceTypes, nodeDb, skipUnsuccessfulSchedulingKeyCheck) + gangScheduler, err := NewGangScheduler(sctx, constraints, floatingResourceTypes, nodeDb) if err != nil { return nil, err } diff --git a/internal/scheduler/scheduling/queue_scheduler_test.go b/internal/scheduler/scheduling/queue_scheduler_test.go index 374aed92fdb..bd4f340d39a 100644 --- a/internal/scheduler/scheduling/queue_scheduler_test.go +++ b/internal/scheduler/scheduling/queue_scheduler_test.go @@ -526,7 +526,7 @@ func TestQueueScheduler(t *testing.T) { it := jobRepo.GetJobIterator(q.Name) jobIteratorByQueue[q.Name] = it } - sch, err := NewQueueScheduler(sctx, constraints, testfixtures.TestEmptyFloatingResources, nodeDb, jobIteratorByQueue, false, false) + sch, err := NewQueueScheduler(sctx, constraints, testfixtures.TestEmptyFloatingResources, nodeDb, jobIteratorByQueue, false) require.NoError(t, err) result, err := sch.Schedule(armadacontext.Background()) From 5366aec6722e0c79f6a498c390fc6e3aa6f5fd35 Mon Sep 17 00:00:00 2001 From: Chris Martin Date: Sat, 12 Oct 2024 17:46:18 +0100 Subject: [PATCH 6/6] more removal Signed-off-by: Chris Martin --- internal/scheduler/scheduling/context/job.go | 10 --------- .../scheduler/scheduling/gang_scheduler.go | 22 +++++++------------ .../scheduling/gang_scheduler_test.go | 5 +---- 3 files changed, 9 insertions(+), 28 deletions(-) diff --git a/internal/scheduler/scheduling/context/job.go b/internal/scheduler/scheduling/context/job.go index ae8fd0966dd..5f7982c4f3d 100644 --- a/internal/scheduler/scheduling/context/job.go +++ b/internal/scheduler/scheduling/context/job.go @@ -83,16 +83,6 @@ func (jctx *JobSchedulingContext) String() string { return sb.String() } -// SchedulingKey returns the scheduling key of the embedded job. -// If the jctx contains additional node selectors or tolerations, -// the key is invalid and the second return value is false. -func (jctx *JobSchedulingContext) SchedulingKey() (schedulerobjects.SchedulingKey, bool) { - if len(jctx.AdditionalNodeSelectors) != 0 || len(jctx.AdditionalTolerations) != 0 { - return schedulerobjects.EmptySchedulingKey, false - } - return jctx.Job.SchedulingKey(), true -} - func (jctx *JobSchedulingContext) IsSuccessful() bool { return jctx.UnschedulableReason == "" } diff --git a/internal/scheduler/scheduling/gang_scheduler.go b/internal/scheduler/scheduling/gang_scheduler.go index 3ca0f4ed2ab..e727c69ce6a 100644 --- a/internal/scheduler/scheduling/gang_scheduler.go +++ b/internal/scheduler/scheduling/gang_scheduler.go @@ -10,7 +10,6 @@ import ( "github.com/armadaproject/armada/internal/scheduler/floatingresources" "github.com/armadaproject/armada/internal/scheduler/internaltypes" "github.com/armadaproject/armada/internal/scheduler/nodedb" - "github.com/armadaproject/armada/internal/scheduler/schedulerobjects" schedulerconstraints "github.com/armadaproject/armada/internal/scheduler/scheduling/constraints" "github.com/armadaproject/armada/internal/scheduler/scheduling/context" ) @@ -21,8 +20,6 @@ type GangScheduler struct { floatingResourceTypes *floatingresources.FloatingResourceTypes schedulingContext *context.SchedulingContext nodeDb *nodedb.NodeDb - // If true, the unsuccessfulSchedulingKeys check is omitted. - skipUnsuccessfulSchedulingKeyCheck bool } func NewGangScheduler( @@ -78,18 +75,15 @@ func (sch *GangScheduler) updateGangSchedulingContextOnFailure(gctx *context.Gan globallyUnschedulable := schedulerconstraints.UnschedulableReasonIsPropertyOfGang(unschedulableReason) - // Register globally unfeasible scheduling keys. - // - // Only record unfeasible scheduling keys for single-job gangs. - // Since a gang may be unschedulable even if all its members are individually schedulable. - if !sch.skipUnsuccessfulSchedulingKeyCheck && gctx.Cardinality() == 1 && globallyUnschedulable { + // Register globally unfeasible scheduling keys. This allows us to discard subsequent jobs with the same + // key as we know they cannot be scheduled. We only do this for: + // * Queued jobs, as evicted jobs no longer have a valid scheduling key + // * Single jobs as we have no concept of the scheduling key for a gang + if globallyUnschedulable && gctx.Cardinality() == 1 { jctx := gctx.JobSchedulingContexts[0] - schedulingKey, ok := jctx.SchedulingKey() - if ok && schedulingKey != schedulerobjects.EmptySchedulingKey { - if _, ok := sch.schedulingContext.UnfeasibleSchedulingKeys[schedulingKey]; !ok { - // Keep the first jctx for each unfeasible schedulingKey. - sch.schedulingContext.UnfeasibleSchedulingKeys[schedulingKey] = jctx - } + if !jctx.IsEvicted { + // Keep the first jctx for each unfeasible schedulingKey. + sch.schedulingContext.UnfeasibleSchedulingKeys[jctx.Job.SchedulingKey()] = jctx } } diff --git a/internal/scheduler/scheduling/gang_scheduler_test.go b/internal/scheduler/scheduling/gang_scheduler_test.go index f5b0d6a303b..b00d689b93f 100644 --- a/internal/scheduler/scheduling/gang_scheduler_test.go +++ b/internal/scheduler/scheduling/gang_scheduler_test.go @@ -576,10 +576,7 @@ func TestGangScheduler(t *testing.T) { jctxs := context.JobSchedulingContextsFromJobs(gang) require.Equal(t, 1, len(jctxs), fmt.Sprintf("gangs with cardinality greater than 1 don't have a single scheduling key: %v", gang)) jctx := jctxs[0] - key, _ := jctx.SchedulingKey() - require.NotEqual(t, key, schedulerobjects.EmptySchedulingKey, "expected unfeasible scheduling key cannot be the empty key") - - expectedUnfeasibleJobSchedulingKeys = append(expectedUnfeasibleJobSchedulingKeys, key) + expectedUnfeasibleJobSchedulingKeys = append(expectedUnfeasibleJobSchedulingKeys, jctx.Job.SchedulingKey()) } nodesById := make(map[string]*schedulerobjects.Node, len(tc.Nodes))