Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(scheduler): add partial scheduling based on min replicas #6221

Merged
merged 20 commits into from
Jan 28, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 60 additions & 23 deletions scheduler/pkg/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func DefaultSchedulerConfig(store store.ModelStore) SchedulerConfig {
return SchedulerConfig{
serverFilters: []filters.ServerFilter{filters.ServerReplicaFilter{}, filters.SharingServerFilter{}, filters.DeletedServerFilter{}, filters.ServerRequirementFilter{}},
replicaFilters: []filters.ReplicaFilter{filters.AvailableMemoryReplicaFilter{}, filters.ExplainerFilter{}, filters.ReplicaDrainingFilter{}},
serverSorts: []sorters.ServerSorter{},
serverSorts: []sorters.ServerSorter{sorters.ModelAlreadyLoadedOnServerSorter{}},
replicaSorts: []sorters.ReplicaSorter{sorters.ReplicaIndexSorter{}, sorters.AvailableMemorySorter{}, sorters.ModelAlreadyLoadedSorter{}},
}
}
Expand Down Expand Up @@ -84,6 +84,11 @@ func (s *SimpleScheduler) ScheduleFailedModels() ([]string, error) {
return updatedModels, nil
}

// Get failed models
// Currently this includes:
// - models that have failed to schedule
// - models that have failed to load
// - models that have loaded but not all replicas are available (e.g. min replicas is met but not desired replicas)
func (s *SimpleScheduler) getFailedModels() ([]string, error) {
models, err := s.store.GetModels()
if err != nil {
Expand All @@ -94,7 +99,8 @@ func (s *SimpleScheduler) getFailedModels() ([]string, error) {
version := model.GetLatest()
if version != nil {
versionState := version.ModelState()
if versionState.State == store.ModelFailed || versionState.State == store.ScheduleFailed {
if versionState.State == store.ModelFailed || versionState.State == store.ScheduleFailed ||
(versionState.State == store.ModelAvailable && versionState.AvailableReplicas < version.GetDeploymentSpec().GetReplicas()) {
failedModels = append(failedModels, model.Name)
}
}
Expand Down Expand Up @@ -160,13 +166,52 @@ func (s *SimpleScheduler) scheduleToServer(modelName string) error {
return errors.New(msg)
}

desiredReplicas := latestModel.DesiredReplicas()
minReplicas := latestModel.GetDeploymentSpec().GetMinReplicas()

s.sortServers(latestModel, filteredServers)
logger.
WithField("candidate_servers", filteredServers).
WithField("desired_replicas", latestModel.DesiredReplicas()).
WithField("desired_replicas", desiredReplicas).
Debug("Identified candidate servers for model")

// The main logic of trying to find a server for the model is as follows:
// 1. If there are enough replicas on a server, schedule the model
// 2. If there are not enough replicas on a server, try to schedule with min replicas. In this case we actually should get
// the models loaded on all the replicas of the servers (assuming min replicas is less than the number of replicas on the server)
// we also mark the model in this case as failed to schedule so that if the infra changes in the future we can try to reschedule

// For each server filter and sort replicas and attempt schedule if enough replicas
ok := s.findAndUpdateToServers(filteredServers, latestModel, desiredReplicas, desiredReplicas)
// Try to scheduler with min replicas if not enough replicas
okWithMinReplicas := false
if !ok && minReplicas > 0 {
okWithMinReplicas = s.findAndUpdateToServers(filteredServers, latestModel, desiredReplicas, int(minReplicas))
if okWithMinReplicas {
msg := "Failed to schedule model as no matching server had enough suitable replicas, managed to schedule with min replicas"
logger.Warn(msg)
}
}

if !ok && !okWithMinReplicas {
msg := "Failed to schedule model as no matching server had enough suitable replicas"
logger.Debug(msg)
// we do not want to reset the server if it has live replicas or loading replicas
// in the case of loading replicas, we need to make sure that we can unload them later.
// for example in the case that a model is just marked as loading on a particular server replica
// then it gets a delete request (before it is marked as loaded or available) we need to make sure
// that we can unload it from the server
s.store.FailedScheduling(latestModel, msg, !latestModel.HasLiveReplicas() && !latestModel.IsLoadingOrLoadedOnServer())
return errors.New(msg)
}

//TODO Cleanup previous version if needed?
return nil
}

func (s *SimpleScheduler) findAndUpdateToServers(filteredServers []*store.ServerSnapshot, latestModel *store.ModelVersion, desiredReplicas, minReplicas int) bool {
modelName := latestModel.GetMeta().GetName()
logger := s.logger.WithField("func", "findAndUpdateToServers").WithField("model", modelName)
ok := false
for _, candidateServer := range filteredServers {
logger.WithField("server", candidateServer.Name).Debug("Checking compatibility with candidate server")
Expand All @@ -176,23 +221,29 @@ func (s *SimpleScheduler) scheduleToServer(modelName string) error {
// without the store being reflected and hence sorting on stale values
s.muSortAndUpdate.Lock()
candidateReplicas = s.filterReplicas(latestModel, candidateServer)
if len(candidateReplicas.ChosenReplicas) < latestModel.DesiredReplicas() {
numServerReplicas := len(candidateReplicas.ChosenReplicas)
if numServerReplicas < minReplicas {
logger.
WithField("server", candidateServer.Name).
WithField("available_replicas", len(candidateReplicas.ChosenReplicas)).
WithField("desired_replicas", latestModel.DesiredReplicas()).
WithField("available_replicas", numServerReplicas).
WithField("desired_replicas", desiredReplicas).
WithField("min_replicas", minReplicas).
Debug("Skipping server due to insufficient suitable replicas")

s.muSortAndUpdate.Unlock()
continue
}

s.sortReplicas(candidateReplicas)
err = s.store.UpdateLoadedModels(
numReplicas := minReplicas
if minReplicas != desiredReplicas {
numReplicas = min(numServerReplicas, desiredReplicas) // we have more replicas for the server than min, so we can use all of them
}
err := s.store.UpdateLoadedModels(
modelName,
latestModel.GetVersion(),
candidateServer.Name,
candidateReplicas.ChosenReplicas[0:latestModel.DesiredReplicas()],
candidateReplicas.ChosenReplicas[0:numReplicas],
)
s.muSortAndUpdate.Unlock()

Expand All @@ -204,21 +255,7 @@ func (s *SimpleScheduler) scheduleToServer(modelName string) error {
break
}
}

if !ok {
msg := "Failed to schedule model as no matching server had enough suitable replicas"
logger.Debug(msg)
// we do not want to reset the server if it has live replicas or loading replicas
// in the case of loading replicas, we need to make sure that we can unload them later.
// for example in the case that a model is just marked as loading on a particular server replica
// then it gets a delete request (before it is marked as loaded or available) we need to make sure
// that we can unload it from the server
s.store.FailedScheduling(latestModel, msg, !latestModel.HasLiveReplicas() && !latestModel.IsLoadingOrLoadedOnServer())
return errors.New(msg)
}

//TODO Cleanup previous version if needed?
return nil
return ok
}

func showServerSlice(servers []*store.ServerSnapshot) string {
Expand Down
Loading
Loading