Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move scheduling key check into the QueuedJobsIterator #4004

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 0 additions & 10 deletions internal/scheduler/scheduling/context/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,16 +83,6 @@ func (jctx *JobSchedulingContext) String() string {
return sb.String()
}

// SchedulingKey returns the scheduling key of the embedded job.
// If the jctx contains additional node selectors or tolerations,
// the key is invalid and the second return value is false.
func (jctx *JobSchedulingContext) SchedulingKey() (schedulerobjects.SchedulingKey, bool) {
if len(jctx.AdditionalNodeSelectors) != 0 || len(jctx.AdditionalTolerations) != 0 {
return schedulerobjects.EmptySchedulingKey, false
}
return jctx.Job.SchedulingKey(), true
}

func (jctx *JobSchedulingContext) IsSuccessful() bool {
return jctx.UnschedulableReason == ""
}
Expand Down
4 changes: 0 additions & 4 deletions internal/scheduler/scheduling/context/scheduling.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,10 +84,6 @@ func NewSchedulingContext(
}
}

func (sctx *SchedulingContext) ClearUnfeasibleSchedulingKeys() {
sctx.UnfeasibleSchedulingKeys = make(map[schedulerobjects.SchedulingKey]*JobSchedulingContext)
}

func (sctx *SchedulingContext) AddQueueSchedulingContext(
queue string, weight float64,
initialAllocatedByPriorityClass schedulerobjects.QuantityByTAndResourceType[string],
Expand Down
32 changes: 12 additions & 20 deletions internal/scheduler/scheduling/gang_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"github.com/armadaproject/armada/internal/scheduler/floatingresources"
"github.com/armadaproject/armada/internal/scheduler/internaltypes"
"github.com/armadaproject/armada/internal/scheduler/nodedb"
"github.com/armadaproject/armada/internal/scheduler/schedulerobjects"
schedulerconstraints "github.com/armadaproject/armada/internal/scheduler/scheduling/constraints"
"github.com/armadaproject/armada/internal/scheduler/scheduling/context"
)
Expand All @@ -21,23 +20,19 @@ type GangScheduler struct {
floatingResourceTypes *floatingresources.FloatingResourceTypes
schedulingContext *context.SchedulingContext
nodeDb *nodedb.NodeDb
// If true, the unsuccessfulSchedulingKeys check is omitted.
skipUnsuccessfulSchedulingKeyCheck bool
}

func NewGangScheduler(
sctx *context.SchedulingContext,
constraints schedulerconstraints.SchedulingConstraints,
floatingResourceTypes *floatingresources.FloatingResourceTypes,
nodeDb *nodedb.NodeDb,
skipUnsuccessfulSchedulingKeyCheck bool,
) (*GangScheduler, error) {
return &GangScheduler{
constraints: constraints,
floatingResourceTypes: floatingResourceTypes,
schedulingContext: sctx,
nodeDb: nodeDb,
skipUnsuccessfulSchedulingKeyCheck: skipUnsuccessfulSchedulingKeyCheck,
constraints: constraints,
floatingResourceTypes: floatingResourceTypes,
schedulingContext: sctx,
nodeDb: nodeDb,
}, nil
}

Expand Down Expand Up @@ -80,18 +75,15 @@ func (sch *GangScheduler) updateGangSchedulingContextOnFailure(gctx *context.Gan

globallyUnschedulable := schedulerconstraints.UnschedulableReasonIsPropertyOfGang(unschedulableReason)

// Register globally unfeasible scheduling keys.
//
// Only record unfeasible scheduling keys for single-job gangs.
// Since a gang may be unschedulable even if all its members are individually schedulable.
if !sch.skipUnsuccessfulSchedulingKeyCheck && gctx.Cardinality() == 1 && globallyUnschedulable {
// Register globally unfeasible scheduling keys. This allows us to discard subsequent jobs with the same
// key as we know they cannot be scheduled. We only do this for:
// * Queued jobs, as evicted jobs no longer have a valid scheduling key
// * Single jobs as we have no concept of the scheduling key for a gang
if globallyUnschedulable && gctx.Cardinality() == 1 {
jctx := gctx.JobSchedulingContexts[0]
schedulingKey, ok := jctx.SchedulingKey()
if ok && schedulingKey != schedulerobjects.EmptySchedulingKey {
if _, ok := sch.schedulingContext.UnfeasibleSchedulingKeys[schedulingKey]; !ok {
// Keep the first jctx for each unfeasible schedulingKey.
sch.schedulingContext.UnfeasibleSchedulingKeys[schedulingKey] = jctx
}
if !jctx.IsEvicted {
// Keep the first jctx for each unfeasible schedulingKey.
sch.schedulingContext.UnfeasibleSchedulingKeys[jctx.Job.SchedulingKey()] = jctx
}
}

Expand Down
7 changes: 2 additions & 5 deletions internal/scheduler/scheduling/gang_scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -576,10 +576,7 @@ func TestGangScheduler(t *testing.T) {
jctxs := context.JobSchedulingContextsFromJobs(gang)
require.Equal(t, 1, len(jctxs), fmt.Sprintf("gangs with cardinality greater than 1 don't have a single scheduling key: %v", gang))
jctx := jctxs[0]
key, _ := jctx.SchedulingKey()
require.NotEqual(t, key, schedulerobjects.EmptySchedulingKey, "expected unfeasible scheduling key cannot be the empty key")

expectedUnfeasibleJobSchedulingKeys = append(expectedUnfeasibleJobSchedulingKeys, key)
expectedUnfeasibleJobSchedulingKeys = append(expectedUnfeasibleJobSchedulingKeys, jctx.Job.SchedulingKey())
}

nodesById := make(map[string]*schedulerobjects.Node, len(tc.Nodes))
Expand Down Expand Up @@ -645,7 +642,7 @@ func TestGangScheduler(t *testing.T) {
constraints := schedulerconstraints.NewSchedulingConstraints("pool", tc.TotalResources, tc.SchedulingConfig, nil)
floatingResourceTypes, err := floatingresources.NewFloatingResourceTypes(tc.SchedulingConfig.ExperimentalFloatingResources)
require.NoError(t, err)
sch, err := NewGangScheduler(sctx, constraints, floatingResourceTypes, nodeDb, false)
sch, err := NewGangScheduler(sctx, constraints, floatingResourceTypes, nodeDb)
require.NoError(t, err)

var actualScheduledIndices []int
Expand Down
56 changes: 47 additions & 9 deletions internal/scheduler/scheduling/jobiteration.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package scheduling

import (
"math"
"sync"

"golang.org/x/exp/slices"
Expand Down Expand Up @@ -110,16 +111,25 @@ func (repo *InMemoryJobRepository) GetJobIterator(queue string) JobContextIterat

// QueuedJobsIterator is an iterator over all jobs in a queue.
type QueuedJobsIterator struct {
jobIter jobdb.JobIterator
pool string
ctx *armadacontext.Context
jobIter jobdb.JobIterator
pool string
maxLookback uint
jobsSeen uint
schedulingContext *schedulercontext.SchedulingContext
ctx *armadacontext.Context
}

func NewQueuedJobsIterator(ctx *armadacontext.Context, queue string, pool string, repo JobRepository) *QueuedJobsIterator {
func NewQueuedJobsIterator(ctx *armadacontext.Context, queue string, pool string, maxLookback uint, schedulingContext *schedulercontext.SchedulingContext, repo JobRepository) *QueuedJobsIterator {
if maxLookback == 0 {
maxLookback = math.MaxUint
}
return &QueuedJobsIterator{
jobIter: repo.QueuedJobs(queue),
pool: pool,
ctx: ctx,
jobIter: repo.QueuedJobs(queue),
pool: pool,
maxLookback: maxLookback,
jobsSeen: 0,
schedulingContext: schedulingContext,
ctx: ctx,
}
}

Expand All @@ -129,12 +139,40 @@ func (it *QueuedJobsIterator) Next() (*schedulercontext.JobSchedulingContext, er
case <-it.ctx.Done():
return nil, it.ctx.Err()
default:
// If we're beyond the max lookback then return done
if it.jobsSeen >= it.maxLookback {
return nil, nil
}

// If there are no more jobs then return done
job, _ := it.jobIter.Next()
if job == nil {
return nil, nil
}
if slices.Contains(job.Pools(), it.pool) {
return schedulercontext.JobSchedulingContextFromJob(job), nil

// If the job isn't for this pool then skip it
if !slices.Contains(job.Pools(), it.pool) {
continue
}

jctx := schedulercontext.JobSchedulingContextFromJob(job)
schedulingKey := job.SchedulingKey()
it.jobsSeen++

// If the job is known to be unscheduable then it can be skipped
unsuccessfulJctx, jobUnschedulable := it.schedulingContext.UnfeasibleSchedulingKeys[schedulingKey]

if jobUnschedulable {
// Since jctx would fail to schedule for the same reason as unsuccessfulJctx,
// set the unschedulable reason and pctx equal to that of unsuccessfulJctx.
jctx.UnschedulableReason = unsuccessfulJctx.UnschedulableReason
jctx.PodSchedulingContext = unsuccessfulJctx.PodSchedulingContext
_, err := it.schedulingContext.AddJobSchedulingContext(jctx)
if err != nil {
return nil, err
}
} else {
return jctx, nil
}
}
}
Expand Down
17 changes: 9 additions & 8 deletions internal/scheduler/scheduling/jobiteration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package scheduling

import (
"context"
"math"
"testing"
"time"

Expand Down Expand Up @@ -64,7 +65,7 @@ func TestMultiJobsIterator_TwoQueues(t *testing.T) {
ctx := armadacontext.Background()
its := make([]JobContextIterator, 3)
for i, queue := range []string{"A", "B", "C"} {
it := NewQueuedJobsIterator(ctx, queue, testfixtures.TestPool, repo)
it := NewQueuedJobsIterator(ctx, queue, testfixtures.TestPool, math.MaxUint, &schedulercontext.SchedulingContext{}, repo)
its[i] = it
}
it := NewMultiJobsIterator(its...)
Expand Down Expand Up @@ -93,7 +94,7 @@ func TestQueuedJobsIterator_OneQueue(t *testing.T) {
expected = append(expected, job.Id())
}
ctx := armadacontext.Background()
it := NewQueuedJobsIterator(ctx, "A", testfixtures.TestPool, repo)
it := NewQueuedJobsIterator(ctx, "A", testfixtures.TestPool, math.MaxUint, &schedulercontext.SchedulingContext{}, repo)
actual := make([]string, 0)
for {
jctx, err := it.Next()
Expand All @@ -115,7 +116,7 @@ func TestQueuedJobsIterator_ExceedsBufferSize(t *testing.T) {
expected = append(expected, job.Id())
}
ctx := armadacontext.Background()
it := NewQueuedJobsIterator(ctx, "A", testfixtures.TestPool, repo)
it := NewQueuedJobsIterator(ctx, "A", testfixtures.TestPool, math.MaxUint, &schedulercontext.SchedulingContext{}, repo)
actual := make([]string, 0)
for {
jctx, err := it.Next()
Expand All @@ -137,7 +138,7 @@ func TestQueuedJobsIterator_ManyJobs(t *testing.T) {
expected = append(expected, job.Id())
}
ctx := armadacontext.Background()
it := NewQueuedJobsIterator(ctx, "A", testfixtures.TestPool, repo)
it := NewQueuedJobsIterator(ctx, "A", testfixtures.TestPool, math.MaxUint, &schedulercontext.SchedulingContext{}, repo)
actual := make([]string, 0)
for {
jctx, err := it.Next()
Expand All @@ -164,7 +165,7 @@ func TestCreateQueuedJobsIterator_TwoQueues(t *testing.T) {
repo.Enqueue(job)
}
ctx := armadacontext.Background()
it := NewQueuedJobsIterator(ctx, "A", testfixtures.TestPool, repo)
it := NewQueuedJobsIterator(ctx, "A", testfixtures.TestPool, math.MaxUint, &schedulercontext.SchedulingContext{}, repo)
actual := make([]string, 0)
for {
jctx, err := it.Next()
Expand All @@ -187,7 +188,7 @@ func TestCreateQueuedJobsIterator_RespectsTimeout(t *testing.T) {
ctx, cancel := armadacontext.WithTimeout(armadacontext.Background(), time.Millisecond)
time.Sleep(20 * time.Millisecond)
defer cancel()
it := NewQueuedJobsIterator(ctx, "A", testfixtures.TestPool, repo)
it := NewQueuedJobsIterator(ctx, "A", testfixtures.TestPool, math.MaxUint, &schedulercontext.SchedulingContext{}, repo)
job, err := it.Next()
assert.Nil(t, job)
assert.ErrorIs(t, err, context.DeadlineExceeded)
Expand All @@ -205,7 +206,7 @@ func TestCreateQueuedJobsIterator_NilOnEmpty(t *testing.T) {
repo.Enqueue(job)
}
ctx := armadacontext.Background()
it := NewQueuedJobsIterator(ctx, "A", testfixtures.TestPool, repo)
it := NewQueuedJobsIterator(ctx, "A", testfixtures.TestPool, math.MaxUint, &schedulercontext.SchedulingContext{}, repo)
for job, err := it.Next(); job != nil; job, err = it.Next() {
require.NoError(t, err)
}
Expand Down Expand Up @@ -266,7 +267,7 @@ func (repo *mockJobRepository) Enqueue(job *jobdb.Job) {
}

func (repo *mockJobRepository) GetJobIterator(ctx *armadacontext.Context, queue string) JobContextIterator {
return NewQueuedJobsIterator(ctx, queue, testfixtures.TestPool, repo)
return NewQueuedJobsIterator(ctx, queue, testfixtures.TestPool, math.MaxUint, &schedulercontext.SchedulingContext{}, repo)
}

func jobFromPodSpec(queue string, req *schedulerobjects.PodRequirements) *jobdb.Job {
Expand Down
9 changes: 1 addition & 8 deletions internal/scheduler/scheduling/preempting_queue_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -467,10 +467,7 @@ func addEvictedJobsToNodeDb(_ *armadacontext.Context, sctx *schedulercontext.Sch
gangItByQueue := make(map[string]*QueuedGangIterator)
for _, qctx := range sctx.QueueSchedulingContexts {
gangItByQueue[qctx.Queue] = NewQueuedGangIterator(
sctx,
inMemoryJobRepo.GetJobIterator(qctx.Queue),
0,
false,
)
}
qr := NewMinimalQueueRepositoryFromSchedulingContext(sctx)
Expand Down Expand Up @@ -511,21 +508,17 @@ func (sch *PreemptingQueueScheduler) schedule(ctx *armadacontext.Context, inMemo
if jobRepo == nil || reflect.ValueOf(jobRepo).IsNil() {
jobIteratorByQueue[qctx.Queue] = evictedIt
} else {
queueIt := NewQueuedJobsIterator(ctx, qctx.Queue, sch.schedulingContext.Pool, jobRepo)
queueIt := NewQueuedJobsIterator(ctx, qctx.Queue, sch.schedulingContext.Pool, sch.constraints.GetMaxQueueLookBack(), sch.schedulingContext, jobRepo)
jobIteratorByQueue[qctx.Queue] = NewMultiJobsIterator(evictedIt, queueIt)
}
}

// Reset the scheduling keys cache after evicting jobs.
sch.schedulingContext.ClearUnfeasibleSchedulingKeys()

sched, err := NewQueueScheduler(
sch.schedulingContext,
sch.constraints,
sch.floatingResourceTypes,
sch.nodeDb,
jobIteratorByQueue,
skipUnsuccessfulSchedulingKeyCheck,
considerPriorityCLassPriority,
)
if err != nil {
Expand Down
Loading