Skip to content

Commit

Permalink
feat(scheduler): add partial scheduling based on min replicas (#6221)
Browse files Browse the repository at this point in the history
* add sorter to prefer servers already assigned to the model

* add partial scheduling based on min replicas

* simplify condition

* do not reset server if we can schedule at least min replicas

* mark model as available if min replicas are scheduled

* reschedule models not on desired replicas

* treat scheduling with min replicas as good

* send an event for infra changes with partial scheduling models
  • Loading branch information
sakoush authored Jan 28, 2025
1 parent 1120b34 commit c078211
Show file tree
Hide file tree
Showing 8 changed files with 330 additions and 57 deletions.
83 changes: 60 additions & 23 deletions scheduler/pkg/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func DefaultSchedulerConfig(store store.ModelStore) SchedulerConfig {
return SchedulerConfig{
serverFilters: []filters.ServerFilter{filters.ServerReplicaFilter{}, filters.SharingServerFilter{}, filters.DeletedServerFilter{}, filters.ServerRequirementFilter{}},
replicaFilters: []filters.ReplicaFilter{filters.AvailableMemoryReplicaFilter{}, filters.ExplainerFilter{}, filters.ReplicaDrainingFilter{}},
serverSorts: []sorters.ServerSorter{},
serverSorts: []sorters.ServerSorter{sorters.ModelAlreadyLoadedOnServerSorter{}},
replicaSorts: []sorters.ReplicaSorter{sorters.ReplicaIndexSorter{}, sorters.AvailableMemorySorter{}, sorters.ModelAlreadyLoadedSorter{}},
}
}
Expand Down Expand Up @@ -84,6 +84,11 @@ func (s *SimpleScheduler) ScheduleFailedModels() ([]string, error) {
return updatedModels, nil
}

// Get failed models
// Currently this includes:
// - models that have failed to schedule
// - models that have failed to load
// - models that have loaded but not all replicas are available (e.g. min replicas is met but not desired replicas)
func (s *SimpleScheduler) getFailedModels() ([]string, error) {
models, err := s.store.GetModels()
if err != nil {
Expand All @@ -94,7 +99,8 @@ func (s *SimpleScheduler) getFailedModels() ([]string, error) {
version := model.GetLatest()
if version != nil {
versionState := version.ModelState()
if versionState.State == store.ModelFailed || versionState.State == store.ScheduleFailed {
if versionState.State == store.ModelFailed || versionState.State == store.ScheduleFailed ||
(versionState.State == store.ModelAvailable && versionState.AvailableReplicas < version.GetDeploymentSpec().GetReplicas()) {
failedModels = append(failedModels, model.Name)
}
}
Expand Down Expand Up @@ -160,13 +166,52 @@ func (s *SimpleScheduler) scheduleToServer(modelName string) error {
return errors.New(msg)
}

desiredReplicas := latestModel.DesiredReplicas()
minReplicas := latestModel.GetDeploymentSpec().GetMinReplicas()

s.sortServers(latestModel, filteredServers)
logger.
WithField("candidate_servers", filteredServers).
WithField("desired_replicas", latestModel.DesiredReplicas()).
WithField("desired_replicas", desiredReplicas).
Debug("Identified candidate servers for model")

// The main logic of trying to find a server for the model is as follows:
// 1. If there are enough replicas on a server, schedule the model
// 2. If there are not enough replicas on a server, try to schedule with min replicas. In this case we actually should get
// the models loaded on all the replicas of the servers (assuming min replicas is less than the number of replicas on the server)
// we also mark the model in this case as failed to schedule so that if the infra changes in the future we can try to reschedule

// For each server filter and sort replicas and attempt schedule if enough replicas
ok := s.findAndUpdateToServers(filteredServers, latestModel, desiredReplicas, desiredReplicas)
// Try to scheduler with min replicas if not enough replicas
okWithMinReplicas := false
if !ok && minReplicas > 0 {
okWithMinReplicas = s.findAndUpdateToServers(filteredServers, latestModel, desiredReplicas, int(minReplicas))
if okWithMinReplicas {
msg := "Failed to schedule model as no matching server had enough suitable replicas, managed to schedule with min replicas"
logger.Warn(msg)
}
}

if !ok && !okWithMinReplicas {
msg := "Failed to schedule model as no matching server had enough suitable replicas"
logger.Debug(msg)
// we do not want to reset the server if it has live replicas or loading replicas
// in the case of loading replicas, we need to make sure that we can unload them later.
// for example in the case that a model is just marked as loading on a particular server replica
// then it gets a delete request (before it is marked as loaded or available) we need to make sure
// that we can unload it from the server
s.store.FailedScheduling(latestModel, msg, !latestModel.HasLiveReplicas() && !latestModel.IsLoadingOrLoadedOnServer())
return errors.New(msg)
}

//TODO Cleanup previous version if needed?
return nil
}

func (s *SimpleScheduler) findAndUpdateToServers(filteredServers []*store.ServerSnapshot, latestModel *store.ModelVersion, desiredReplicas, minReplicas int) bool {
modelName := latestModel.GetMeta().GetName()
logger := s.logger.WithField("func", "findAndUpdateToServers").WithField("model", modelName)
ok := false
for _, candidateServer := range filteredServers {
logger.WithField("server", candidateServer.Name).Debug("Checking compatibility with candidate server")
Expand All @@ -176,23 +221,29 @@ func (s *SimpleScheduler) scheduleToServer(modelName string) error {
// without the store being reflected and hence sorting on stale values
s.muSortAndUpdate.Lock()
candidateReplicas = s.filterReplicas(latestModel, candidateServer)
if len(candidateReplicas.ChosenReplicas) < latestModel.DesiredReplicas() {
numServerReplicas := len(candidateReplicas.ChosenReplicas)
if numServerReplicas < minReplicas {
logger.
WithField("server", candidateServer.Name).
WithField("available_replicas", len(candidateReplicas.ChosenReplicas)).
WithField("desired_replicas", latestModel.DesiredReplicas()).
WithField("available_replicas", numServerReplicas).
WithField("desired_replicas", desiredReplicas).
WithField("min_replicas", minReplicas).
Debug("Skipping server due to insufficient suitable replicas")

s.muSortAndUpdate.Unlock()
continue
}

s.sortReplicas(candidateReplicas)
err = s.store.UpdateLoadedModels(
numReplicas := minReplicas
if minReplicas != desiredReplicas {
numReplicas = min(numServerReplicas, desiredReplicas) // we have more replicas for the server than min, so we can use all of them
}
err := s.store.UpdateLoadedModels(
modelName,
latestModel.GetVersion(),
candidateServer.Name,
candidateReplicas.ChosenReplicas[0:latestModel.DesiredReplicas()],
candidateReplicas.ChosenReplicas[0:numReplicas],
)
s.muSortAndUpdate.Unlock()

Expand All @@ -204,21 +255,7 @@ func (s *SimpleScheduler) scheduleToServer(modelName string) error {
break
}
}

if !ok {
msg := "Failed to schedule model as no matching server had enough suitable replicas"
logger.Debug(msg)
// we do not want to reset the server if it has live replicas or loading replicas
// in the case of loading replicas, we need to make sure that we can unload them later.
// for example in the case that a model is just marked as loading on a particular server replica
// then it gets a delete request (before it is marked as loaded or available) we need to make sure
// that we can unload it from the server
s.store.FailedScheduling(latestModel, msg, !latestModel.HasLiveReplicas() && !latestModel.IsLoadingOrLoadedOnServer())
return errors.New(msg)
}

//TODO Cleanup previous version if needed?
return nil
return ok
}

func showServerSlice(servers []*store.ServerSnapshot) string {
Expand Down
Loading

0 comments on commit c078211

Please sign in to comment.