diff --git a/scheduler/pkg/scheduler/scheduler.go b/scheduler/pkg/scheduler/scheduler.go index c43465547a..8b4132c813 100644 --- a/scheduler/pkg/scheduler/scheduler.go +++ b/scheduler/pkg/scheduler/scheduler.go @@ -43,7 +43,7 @@ func DefaultSchedulerConfig(store store.ModelStore) SchedulerConfig { return SchedulerConfig{ serverFilters: []filters.ServerFilter{filters.ServerReplicaFilter{}, filters.SharingServerFilter{}, filters.DeletedServerFilter{}, filters.ServerRequirementFilter{}}, replicaFilters: []filters.ReplicaFilter{filters.AvailableMemoryReplicaFilter{}, filters.ExplainerFilter{}, filters.ReplicaDrainingFilter{}}, - serverSorts: []sorters.ServerSorter{}, + serverSorts: []sorters.ServerSorter{sorters.ModelAlreadyLoadedOnServerSorter{}}, replicaSorts: []sorters.ReplicaSorter{sorters.ReplicaIndexSorter{}, sorters.AvailableMemorySorter{}, sorters.ModelAlreadyLoadedSorter{}}, } } @@ -84,6 +84,11 @@ func (s *SimpleScheduler) ScheduleFailedModels() ([]string, error) { return updatedModels, nil } +// Get failed models +// Currently this includes: +// - models that have failed to schedule +// - models that have failed to load +// - models that have loaded but not all replicas are available (e.g. min replicas is met but not desired replicas) func (s *SimpleScheduler) getFailedModels() ([]string, error) { models, err := s.store.GetModels() if err != nil { @@ -94,7 +99,8 @@ func (s *SimpleScheduler) getFailedModels() ([]string, error) { version := model.GetLatest() if version != nil { versionState := version.ModelState() - if versionState.State == store.ModelFailed || versionState.State == store.ScheduleFailed { + if versionState.State == store.ModelFailed || versionState.State == store.ScheduleFailed || + (versionState.State == store.ModelAvailable && versionState.AvailableReplicas < version.GetDeploymentSpec().GetReplicas()) { failedModels = append(failedModels, model.Name) } } @@ -160,13 +166,52 @@ func (s *SimpleScheduler) scheduleToServer(modelName string) error { return errors.New(msg) } + desiredReplicas := latestModel.DesiredReplicas() + minReplicas := latestModel.GetDeploymentSpec().GetMinReplicas() + s.sortServers(latestModel, filteredServers) logger. WithField("candidate_servers", filteredServers). - WithField("desired_replicas", latestModel.DesiredReplicas()). + WithField("desired_replicas", desiredReplicas). Debug("Identified candidate servers for model") + // The main logic of trying to find a server for the model is as follows: + // 1. If there are enough replicas on a server, schedule the model + // 2. If there are not enough replicas on a server, try to schedule with min replicas. In this case we actually should get + // the models loaded on all the replicas of the servers (assuming min replicas is less than the number of replicas on the server) + // we also mark the model in this case as failed to schedule so that if the infra changes in the future we can try to reschedule + // For each server filter and sort replicas and attempt schedule if enough replicas + ok := s.findAndUpdateToServers(filteredServers, latestModel, desiredReplicas, desiredReplicas) + // Try to scheduler with min replicas if not enough replicas + okWithMinReplicas := false + if !ok && minReplicas > 0 { + okWithMinReplicas = s.findAndUpdateToServers(filteredServers, latestModel, desiredReplicas, int(minReplicas)) + if okWithMinReplicas { + msg := "Failed to schedule model as no matching server had enough suitable replicas, managed to schedule with min replicas" + logger.Warn(msg) + } + } + + if !ok && !okWithMinReplicas { + msg := "Failed to schedule model as no matching server had enough suitable replicas" + logger.Debug(msg) + // we do not want to reset the server if it has live replicas or loading replicas + // in the case of loading replicas, we need to make sure that we can unload them later. + // for example in the case that a model is just marked as loading on a particular server replica + // then it gets a delete request (before it is marked as loaded or available) we need to make sure + // that we can unload it from the server + s.store.FailedScheduling(latestModel, msg, !latestModel.HasLiveReplicas() && !latestModel.IsLoadingOrLoadedOnServer()) + return errors.New(msg) + } + + //TODO Cleanup previous version if needed? + return nil +} + +func (s *SimpleScheduler) findAndUpdateToServers(filteredServers []*store.ServerSnapshot, latestModel *store.ModelVersion, desiredReplicas, minReplicas int) bool { + modelName := latestModel.GetMeta().GetName() + logger := s.logger.WithField("func", "findAndUpdateToServers").WithField("model", modelName) ok := false for _, candidateServer := range filteredServers { logger.WithField("server", candidateServer.Name).Debug("Checking compatibility with candidate server") @@ -176,11 +221,13 @@ func (s *SimpleScheduler) scheduleToServer(modelName string) error { // without the store being reflected and hence sorting on stale values s.muSortAndUpdate.Lock() candidateReplicas = s.filterReplicas(latestModel, candidateServer) - if len(candidateReplicas.ChosenReplicas) < latestModel.DesiredReplicas() { + numServerReplicas := len(candidateReplicas.ChosenReplicas) + if numServerReplicas < minReplicas { logger. WithField("server", candidateServer.Name). - WithField("available_replicas", len(candidateReplicas.ChosenReplicas)). - WithField("desired_replicas", latestModel.DesiredReplicas()). + WithField("available_replicas", numServerReplicas). + WithField("desired_replicas", desiredReplicas). + WithField("min_replicas", minReplicas). Debug("Skipping server due to insufficient suitable replicas") s.muSortAndUpdate.Unlock() @@ -188,11 +235,15 @@ func (s *SimpleScheduler) scheduleToServer(modelName string) error { } s.sortReplicas(candidateReplicas) - err = s.store.UpdateLoadedModels( + numReplicas := minReplicas + if minReplicas != desiredReplicas { + numReplicas = min(numServerReplicas, desiredReplicas) // we have more replicas for the server than min, so we can use all of them + } + err := s.store.UpdateLoadedModels( modelName, latestModel.GetVersion(), candidateServer.Name, - candidateReplicas.ChosenReplicas[0:latestModel.DesiredReplicas()], + candidateReplicas.ChosenReplicas[0:numReplicas], ) s.muSortAndUpdate.Unlock() @@ -204,21 +255,7 @@ func (s *SimpleScheduler) scheduleToServer(modelName string) error { break } } - - if !ok { - msg := "Failed to schedule model as no matching server had enough suitable replicas" - logger.Debug(msg) - // we do not want to reset the server if it has live replicas or loading replicas - // in the case of loading replicas, we need to make sure that we can unload them later. - // for example in the case that a model is just marked as loading on a particular server replica - // then it gets a delete request (before it is marked as loaded or available) we need to make sure - // that we can unload it from the server - s.store.FailedScheduling(latestModel, msg, !latestModel.HasLiveReplicas() && !latestModel.IsLoadingOrLoadedOnServer()) - return errors.New(msg) - } - - //TODO Cleanup previous version if needed? - return nil + return ok } func showServerSlice(servers []*store.ServerSnapshot) string { diff --git a/scheduler/pkg/scheduler/scheduler_test.go b/scheduler/pkg/scheduler/scheduler_test.go index e7e4ee7609..5e053c4b50 100644 --- a/scheduler/pkg/scheduler/scheduler_test.go +++ b/scheduler/pkg/scheduler/scheduler_test.go @@ -126,8 +126,8 @@ func TestScheduler(t *testing.T) { logger := log.New() g := NewGomegaWithT(t) - newTestModel := func(name string, requiredMemory uint64, requirements []string, server *string, replicas uint32, loadedModels []int, deleted bool, scheduledServer string, drainedModels []int) *store.ModelSnapshot { - config := &pb.Model{ModelSpec: &pb.ModelSpec{MemoryBytes: &requiredMemory, Requirements: requirements, Server: server}, DeploymentSpec: &pb.DeploymentSpec{Replicas: replicas}} + newTestModel := func(name string, requiredMemory uint64, requirements []string, server *string, replicas, minReplicas uint32, loadedModels []int, deleted bool, scheduledServer string, drainedModels []int) *store.ModelSnapshot { + config := &pb.Model{ModelSpec: &pb.ModelSpec{MemoryBytes: &requiredMemory, Requirements: requirements, Server: server}, DeploymentSpec: &pb.DeploymentSpec{Replicas: replicas, MinReplicas: minReplicas}} rmap := make(map[int]store.ReplicaStatus) for _, ridx := range loadedModels { rmap[ridx] = store.ReplicaStatus{State: store.Loaded} @@ -162,7 +162,7 @@ func TestScheduler(t *testing.T) { tests := []test{ { name: "SmokeTest", - model: newTestModel("model1", 100, []string{"sklearn"}, nil, 1, []int{}, false, "", nil), + model: newTestModel("model1", 100, []string{"sklearn"}, nil, 1, 0, []int{}, false, "", nil), servers: []*store.ServerSnapshot{ { Name: "server1", @@ -177,7 +177,7 @@ func TestScheduler(t *testing.T) { }, { name: "ReplicasTwo", - model: newTestModel("model1", 100, []string{"sklearn"}, nil, 2, []int{}, false, "", nil), + model: newTestModel("model1", 100, []string{"sklearn"}, nil, 2, 0, []int{}, false, "", nil), servers: []*store.ServerSnapshot{ { Name: "server1", @@ -201,7 +201,7 @@ func TestScheduler(t *testing.T) { }, { name: "NotEnoughReplicas", - model: newTestModel("model1", 100, []string{"sklearn"}, nil, 2, []int{}, false, "", nil), + model: newTestModel("model1", 100, []string{"sklearn"}, nil, 2, 0, []int{}, false, "", nil), servers: []*store.ServerSnapshot{ { Name: "server1", @@ -221,9 +221,33 @@ func TestScheduler(t *testing.T) { }, scheduled: false, }, + { + name: "NotEnoughReplicas - schedule min replicas", + model: newTestModel("model1", 100, []string{"sklearn"}, nil, 3, 2, []int{}, false, "", nil), + servers: []*store.ServerSnapshot{ + { + Name: "server1", + Replicas: map[int]*store.ServerReplica{0: gsr(0, 200, []string{"sklearn"}, "server1", true, false)}, + Shared: true, + ExpectedReplicas: -1, + }, + { + Name: "server2", + Replicas: map[int]*store.ServerReplica{ + 0: gsr(0, 200, []string{"sklearn"}, "server2", true, false), // expect schedule here + 1: gsr(1, 200, []string{"sklearn"}, "server2", true, false), // expect schedule here + }, + Shared: true, + ExpectedReplicas: -1, + }, + }, + scheduled: true, // not here that we still trying to mark the model as Available + scheduledServer: "server2", + scheduledReplicas: []int{0, 1}, + }, { name: "MemoryOneServer", - model: newTestModel("model1", 100, []string{"sklearn"}, nil, 1, []int{}, false, "", nil), + model: newTestModel("model1", 100, []string{"sklearn"}, nil, 1, 0, []int{}, false, "", nil), servers: []*store.ServerSnapshot{ { Name: "server1", @@ -246,7 +270,7 @@ func TestScheduler(t *testing.T) { }, { name: "ModelsLoaded", - model: newTestModel("model1", 100, []string{"sklearn"}, nil, 2, []int{1}, false, "", nil), + model: newTestModel("model1", 100, []string{"sklearn"}, nil, 2, 0, []int{1}, false, "", nil), servers: []*store.ServerSnapshot{ { Name: "server1", @@ -270,7 +294,7 @@ func TestScheduler(t *testing.T) { }, { name: "ModelUnLoaded", - model: newTestModel("model1", 100, []string{"sklearn"}, nil, 2, []int{1}, true, "server2", nil), + model: newTestModel("model1", 100, []string{"sklearn"}, nil, 2, 0, []int{1}, true, "server2", nil), servers: []*store.ServerSnapshot{ { Name: "server2", @@ -288,7 +312,7 @@ func TestScheduler(t *testing.T) { }, { name: "DeletedServer", - model: newTestModel("model1", 100, []string{"sklearn"}, nil, 1, []int{}, false, "", nil), + model: newTestModel("model1", 100, []string{"sklearn"}, nil, 1, 0, []int{}, false, "", nil), servers: []*store.ServerSnapshot{ { Name: "server1", @@ -312,7 +336,7 @@ func TestScheduler(t *testing.T) { }, { name: "Reschedule", - model: newTestModel("model1", 100, []string{"sklearn"}, nil, 1, []int{0}, false, "server1", nil), + model: newTestModel("model1", 100, []string{"sklearn"}, nil, 1, 0, []int{0}, false, "server1", nil), servers: []*store.ServerSnapshot{ { Name: "server1", @@ -336,7 +360,7 @@ func TestScheduler(t *testing.T) { }, { name: "DeletedServerFail", - model: newTestModel("model1", 100, []string{"sklearn"}, nil, 1, []int{1}, false, "", nil), + model: newTestModel("model1", 100, []string{"sklearn"}, nil, 1, 0, []int{1}, false, "", nil), servers: []*store.ServerSnapshot{ { Name: "server1", @@ -349,7 +373,7 @@ func TestScheduler(t *testing.T) { }, { name: "Available memory sorting", - model: newTestModel("model1", 100, []string{"sklearn"}, nil, 1, []int{1}, false, "", nil), + model: newTestModel("model1", 100, []string{"sklearn"}, nil, 1, 0, []int{1}, false, "", nil), servers: []*store.ServerSnapshot{ { Name: "server2", @@ -367,7 +391,7 @@ func TestScheduler(t *testing.T) { }, { name: "Available memory sorting with multiple replicas", - model: newTestModel("model1", 100, []string{"sklearn"}, nil, 2, []int{1}, false, "", nil), + model: newTestModel("model1", 100, []string{"sklearn"}, nil, 2, 0, []int{1}, false, "", nil), servers: []*store.ServerSnapshot{ { Name: "server2", @@ -386,7 +410,7 @@ func TestScheduler(t *testing.T) { }, { name: "Scale up", - model: newTestModel("model1", 100, []string{"sklearn"}, nil, 3, []int{1, 2}, false, "server1", nil), + model: newTestModel("model1", 100, []string{"sklearn"}, nil, 3, 0, []int{1, 2}, false, "server1", nil), servers: []*store.ServerSnapshot{ { Name: "server1", @@ -406,7 +430,7 @@ func TestScheduler(t *testing.T) { }, { name: "Scale down", - model: newTestModel("model1", 100, []string{"sklearn"}, nil, 1, []int{1, 2}, false, "server1", nil), + model: newTestModel("model1", 100, []string{"sklearn"}, nil, 1, 0, []int{1, 2}, false, "server1", nil), servers: []*store.ServerSnapshot{ { Name: "server1", @@ -424,9 +448,29 @@ func TestScheduler(t *testing.T) { scheduledServer: "server1", scheduledReplicas: []int{1}, }, + { + name: "Scale up - not enough replicas use max of the server", + model: newTestModel("model1", 100, []string{"sklearn"}, nil, 5, 3, []int{1, 2}, false, "server1", nil), + servers: []*store.ServerSnapshot{ + { + Name: "server1", + Replicas: map[int]*store.ServerReplica{ + 0: gsr(0, 100, []string{"sklearn"}, "server1", true, false), // expect schedule here + 1: gsr(1, 100, []string{"sklearn"}, "server1", true, false), // expect schedule here - nop + 2: gsr(2, 100, []string{"sklearn"}, "server1", true, false), // expect schedule here - nop + 3: gsr(3, 100, []string{"sklearn"}, "server1", true, false), // expect schedule here + }, + Shared: true, + ExpectedReplicas: -1, + }, + }, + scheduled: true, // note that we are still trying to make the model as Available + scheduledServer: "server1", + scheduledReplicas: []int{0, 1, 2, 3}, // used all replicas + }, { name: "Scale up - no capacity on loaded replica servers, should still go there", - model: newTestModel("model1", 100, []string{"sklearn"}, nil, 3, []int{1, 2}, false, "server1", nil), + model: newTestModel("model1", 100, []string{"sklearn"}, nil, 3, 0, []int{1, 2}, false, "server1", nil), servers: []*store.ServerSnapshot{ { Name: "server1", @@ -446,7 +490,7 @@ func TestScheduler(t *testing.T) { }, { name: "Scale down - no capacity on loaded replica servers, should still go there", - model: newTestModel("model1", 100, []string{"sklearn"}, nil, 1, []int{1, 2}, false, "server1", nil), + model: newTestModel("model1", 100, []string{"sklearn"}, nil, 1, 0, []int{1, 2}, false, "server1", nil), servers: []*store.ServerSnapshot{ { Name: "server1", @@ -466,7 +510,7 @@ func TestScheduler(t *testing.T) { }, { name: "Drain", - model: newTestModel("model1", 100, []string{"sklearn"}, nil, 2, []int{1}, false, "server1", []int{2}), + model: newTestModel("model1", 100, []string{"sklearn"}, nil, 2, 0, []int{1}, false, "server1", []int{2}), servers: []*store.ServerSnapshot{ { Name: "server1", @@ -502,12 +546,14 @@ func TestScheduler(t *testing.T) { err := scheduler.Schedule(test.model.Name) if test.scheduled { g.Expect(err).To(BeNil()) + } else { + g.Expect(err).ToNot(BeNil()) + } + if test.scheduledServer != "" { g.Expect(test.scheduledServer).To(Equal(mockStore.scheduledServer)) sort.Ints(test.scheduledReplicas) sort.Ints(mockStore.scheduledReplicas) g.Expect(test.scheduledReplicas).To(Equal(mockStore.scheduledReplicas)) - } else { - g.Expect(err).ToNot(BeNil()) } }) } @@ -517,12 +563,23 @@ func TestFailedModels(t *testing.T) { logger := log.New() g := NewGomegaWithT(t) - newMockStore := func(models map[string]store.ModelState) *mockStore { + type modelStateWithMetadata struct { + state store.ModelState + deploymentSpec *pb.DeploymentSpec + availableReplicas uint32 + } + + newMockStore := func(models map[string]modelStateWithMetadata) *mockStore { snapshots := map[string]*store.ModelSnapshot{} for name, state := range models { + mv := store.NewModelVersion(&pb.Model{DeploymentSpec: state.deploymentSpec}, 1, "", map[int]store.ReplicaStatus{}, false, state.state) + mv.SetModelState(store.ModelStatus{ + State: state.state, + AvailableReplicas: state.availableReplicas, + }) snapshot := &store.ModelSnapshot{ Name: name, - Versions: []*store.ModelVersion{store.NewModelVersion(&pb.Model{}, 1, "", map[int]store.ReplicaStatus{}, false, state)}, + Versions: []*store.ModelVersion{mv}, } snapshots[name] = snapshot } @@ -533,24 +590,25 @@ func TestFailedModels(t *testing.T) { type test struct { name string - models map[string]store.ModelState + models map[string]modelStateWithMetadata expectedFailedModels []string } tests := []test{ { name: "SmokeTest", - models: map[string]store.ModelState{ - "model1": store.ScheduleFailed, - "model2": store.ModelFailed, - "model3": store.ModelAvailable, + models: map[string]modelStateWithMetadata{ + "model1": {store.ScheduleFailed, &pb.DeploymentSpec{Replicas: 1}, 0}, + "model2": {store.ModelFailed, &pb.DeploymentSpec{Replicas: 1}, 0}, + "model3": {store.ModelAvailable, &pb.DeploymentSpec{Replicas: 1}, 1}, + "model4": {store.ModelAvailable, &pb.DeploymentSpec{Replicas: 2, MinReplicas: 1}, 1}, // retry models that have not reached desired replicas }, - expectedFailedModels: []string{"model1", "model2"}, + expectedFailedModels: []string{"model1", "model2", "model4"}, }, { name: "SmokeTest", - models: map[string]store.ModelState{ - "model3": store.ModelAvailable, + models: map[string]modelStateWithMetadata{ + "model3": {store.ModelAvailable, &pb.DeploymentSpec{Replicas: 1}, 1}, }, expectedFailedModels: nil, }, diff --git a/scheduler/pkg/scheduler/sorters/loaded.go b/scheduler/pkg/scheduler/sorters/loaded.go index 4f9b413956..0991ca522c 100644 --- a/scheduler/pkg/scheduler/sorters/loaded.go +++ b/scheduler/pkg/scheduler/sorters/loaded.go @@ -20,3 +20,15 @@ func (m ModelAlreadyLoadedSorter) IsLess(i *CandidateReplica, j *CandidateReplic jIsLoading := j.Model.IsLoadingOrLoaded(j.Server.Name, j.Replica.GetReplicaIdx()) return iIsLoading && !jIsLoading } + +// This sorter favours servers that have the models already loaded on them, this is useful to minimise ping-pong of models between servers +// which can be expensive in terms of model loading time. +type ModelAlreadyLoadedOnServerSorter struct{} + +func (m ModelAlreadyLoadedOnServerSorter) Name() string { + return "ModelAlreadyLoadedOnServerSorter" +} + +func (m ModelAlreadyLoadedOnServerSorter) IsLess(i *CandidateServer, j *CandidateServer) bool { + return i.Model.Server() == i.Server.Name +} diff --git a/scheduler/pkg/scheduler/sorters/loaded_test.go b/scheduler/pkg/scheduler/sorters/loaded_test.go index 34978c2da5..153194aba0 100644 --- a/scheduler/pkg/scheduler/sorters/loaded_test.go +++ b/scheduler/pkg/scheduler/sorters/loaded_test.go @@ -74,3 +74,60 @@ func TestModelAlreadyLoadedSort(t *testing.T) { }) } } + +func TestModelAlreadyLoadedOnServerSort(t *testing.T) { + g := NewGomegaWithT(t) + + type test struct { + name string + servers []*CandidateServer + ordering []string + } + + modelServer1 := store.NewModelVersion( + nil, + 1, + "server1", + map[int]store.ReplicaStatus{}, + false, + store.ModelAvailable) + + modelNoServer := store.NewModelVersion( + nil, + 1, + "", + map[int]store.ReplicaStatus{}, + false, + store.ModelStateUnknown) + + tests := []test{ + { + name: "LoadedOnOneServer", + servers: []*CandidateServer{ + {Model: modelServer1, Server: &store.ServerSnapshot{Name: "server3"}}, + {Model: modelServer1, Server: &store.ServerSnapshot{Name: "server2"}}, + {Model: modelServer1, Server: &store.ServerSnapshot{Name: "server1"}}, + }, + ordering: []string{"server1", "server3", "server2"}, + }, + { + name: "Not", + servers: []*CandidateServer{ + {Model: modelNoServer, Server: &store.ServerSnapshot{Name: "server3"}}, + {Model: modelNoServer, Server: &store.ServerSnapshot{Name: "server2"}}, + {Model: modelNoServer, Server: &store.ServerSnapshot{Name: "server1"}}, + }, + ordering: []string{"server3", "server2", "server1"}, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + sorter := ModelAlreadyLoadedOnServerSorter{} + sort.SliceStable(test.servers, func(i, j int) bool { return sorter.IsLess(test.servers[i], test.servers[j]) }) + for idx, expected := range test.ordering { + g.Expect(test.servers[idx].Server.Name).To(Equal(expected)) + } + }) + } +} diff --git a/scheduler/pkg/store/memory.go b/scheduler/pkg/store/memory.go index a3ea242221..356a7fac3c 100644 --- a/scheduler/pkg/store/memory.go +++ b/scheduler/pkg/store/memory.go @@ -375,7 +375,14 @@ func (m *MemoryStore) updateLoadedModelsImpl( // this could be in the cases where we are scaling down a model and the new replica count can be all deployed // and always send an update for deleted models, so the operator will remove them from k8s // also send an update for progressing models so the operator can update the status in the case of a network glitch where the model generation has been updated - if replicaStateUpdated || modelVersion.state.State == ScheduleFailed || model.IsDeleted() || modelVersion.state.State == ModelProgressing { + // also send an update if the model is not yet at desired replicas, if we have partial scheduling + + // note that we use len(modelVersion.GetAssignment()) to calculate the number of replicas as the status of the model at this point might not reflect the actual number of replicas + // in modelVersion.state.AvailableReplicas (we call updateModelStatus later) + + // TODO: the conditions here keep growing, refactor or consider a simpler check. + if replicaStateUpdated || modelVersion.state.State == ScheduleFailed || model.IsDeleted() || modelVersion.state.State == ModelProgressing || + (modelVersion.state.State == ModelAvailable && len(modelVersion.GetAssignment()) < modelVersion.DesiredReplicas()) { logger.Debugf("Updating model status for model %s server %s", modelKey, serverKey) modelVersion.server = serverKey m.updateModelStatus(true, model.IsDeleted(), modelVersion, model.GetLastAvailableModelVersion()) diff --git a/scheduler/pkg/store/memory_status.go b/scheduler/pkg/store/memory_status.go index d7e3dc6bfb..0c0187782d 100644 --- a/scheduler/pkg/store/memory_status.go +++ b/scheduler/pkg/store/memory_status.go @@ -88,7 +88,8 @@ func updateModelState(isLatest bool, modelVersion *ModelVersion, prevModelVersio modelReason = stats.lastFailedReason modelTimestamp = stats.lastFailedStateTime } else if (modelVersion.GetDeploymentSpec() != nil && stats.replicasAvailable == modelVersion.GetDeploymentSpec().Replicas) || // equal to desired replicas - (stats.replicasAvailable > 0 && prevModelVersion != nil && modelVersion != prevModelVersion && prevModelVersion.state.State == ModelAvailable) { // TODO In future check if available replicas is > minReplicas + (modelVersion.GetDeploymentSpec() != nil && stats.replicasAvailable >= modelVersion.GetDeploymentSpec().MinReplicas && modelVersion.GetDeploymentSpec().MinReplicas > 0) || // min replicas is set and available replicas are greater than or equal to min replicas + (stats.replicasAvailable > 0 && prevModelVersion != nil && modelVersion != prevModelVersion && prevModelVersion.state.State == ModelAvailable) { modelState = ModelAvailable } else { modelState = ModelProgressing @@ -105,12 +106,14 @@ func updateModelState(isLatest bool, modelVersion *ModelVersion, prevModelVersio } func (m *MemoryStore) FailedScheduling(modelVersion *ModelVersion, reason string, reset bool) { + availableReplicas := modelVersion.state.AvailableReplicas + modelVersion.state = ModelStatus{ State: ScheduleFailed, Reason: reason, Timestamp: time.Now(), - AvailableReplicas: modelVersion.state.AvailableReplicas, - UnavailableReplicas: modelVersion.GetModel().GetDeploymentSpec().GetReplicas() - modelVersion.state.AvailableReplicas, + AvailableReplicas: availableReplicas, + UnavailableReplicas: modelVersion.GetModel().GetDeploymentSpec().GetReplicas() - availableReplicas, } // make sure we reset server but only if there are no available replicas if reset { diff --git a/scheduler/pkg/store/memory_status_test.go b/scheduler/pkg/store/memory_status_test.go index 113e293048..07d3d194ab 100644 --- a/scheduler/pkg/store/memory_status_test.go +++ b/scheduler/pkg/store/memory_status_test.go @@ -96,6 +96,69 @@ func TestUpdateStatus(t *testing.T) { prevVersion: nil, expectedModelStatus: ModelAvailable, }, + { + name: "Available - Min replicas", + store: &LocalSchedulerStore{ + models: map[string]*Model{ + "model": { + versions: []*ModelVersion{ + { + version: 1, + modelDefn: &pb.Model{ + Meta: &pb.MetaData{ + Name: "model", + }, + ModelSpec: &pb.ModelSpec{}, + DeploymentSpec: &pb.DeploymentSpec{ + Replicas: 1, + }, + }, + server: "server2", + replicas: map[int]ReplicaStatus{ + 0: {State: Loaded}, + }, + }, + { + version: 2, + modelDefn: &pb.Model{ + Meta: &pb.MetaData{ + Name: "model", + }, + ModelSpec: &pb.ModelSpec{}, + DeploymentSpec: &pb.DeploymentSpec{ + Replicas: 2, + MinReplicas: 1, + }, + }, + server: "server1", + replicas: map[int]ReplicaStatus{ + 0: {State: Available}, + }, + }, + }, + }, + }, + servers: map[string]*Server{ + "server1": { + name: "server1", + replicas: map[int]*ServerReplica{ + 0: {}, + }, + }, + "server2": { + name: "server2", + replicas: map[int]*ServerReplica{ + 0: {}, + }, + }, + }, + }, + modelName: "model", + serverName: "server2", + version: 2, + prevVersion: nil, + expectedModelStatus: ModelAvailable, + }, { name: "NotEnoughReplicasButPreviousAvailable", store: &LocalSchedulerStore{ diff --git a/scheduler/pkg/store/memory_test.go b/scheduler/pkg/store/memory_test.go index 76d4027aed..3823392bf3 100644 --- a/scheduler/pkg/store/memory_test.go +++ b/scheduler/pkg/store/memory_test.go @@ -725,6 +725,42 @@ func TestUpdateLoadedModels(t *testing.T) { expectedStates: map[int]ReplicaStatus{0: {State: Available}, 1: {State: Unloaded}}, expectedModelState: &ModelStatus{State: ModelAvailable}, }, + { + name: "PartiallyAvailableModels", + store: &LocalSchedulerStore{ + models: map[string]*Model{"model": { + versions: []*ModelVersion{ + { + modelDefn: &pb.Model{ModelSpec: &pb.ModelSpec{MemoryBytes: &memBytes}, DeploymentSpec: &pb.DeploymentSpec{Replicas: 3, MinReplicas: 2}}, + server: "server", + version: 1, + replicas: map[int]ReplicaStatus{ + 0: {State: Available}, + 1: {State: Available}, + }, + state: ModelStatus{State: ModelProgressing}, + }, + }, + }}, + servers: map[string]*Server{ + "server": { + name: "server", + replicas: map[int]*ServerReplica{ + 0: {}, + 1: {}, + }, + }, + }, + }, + modelKey: "model", + version: 1, + serverKey: "server", + replicas: []*ServerReplica{ + {replicaIdx: 0}, {replicaIdx: 1}, + }, + expectedStates: map[int]ReplicaStatus{0: {State: Available}, 1: {State: Available}}, + expectedModelState: &ModelStatus{State: ModelAvailable}, + }, } for _, test := range tests {