Skip to content

Commit

Permalink
add utils for send server messages to controller
Browse files Browse the repository at this point in the history
  • Loading branch information
sakoush committed Feb 11, 2025
1 parent a9145c8 commit 1b2f531
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 11 deletions.
13 changes: 13 additions & 0 deletions scheduler/pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -508,6 +508,19 @@ func createServerStatusResponse(s *store.ServerSnapshot) *pb.ServerStatusRespons
return resp
}

func createServerScaleResponse(s *store.ServerSnapshot, expectedReplicas uint32) *pb.ServerStatusResponse {

Check failure on line 511 in scheduler/pkg/server/server.go

View workflow job for this annotation

GitHub Actions / lint

func `createServerScaleResponse` is unused (unused)
// we dont care about populating the other fields as they should not be used by the controller, reconsider if this changes

resp := &pb.ServerStatusResponse{
Type: pb.ServerStatusResponse_ScalingRequest,
ServerName: s.Name,
ExpectedReplicas: int32(expectedReplicas),
KubernetesMeta: s.KubernetesMeta,
}

return resp
}

func (s *SchedulerServer) StartExperiment(ctx context.Context, req *pb.StartExperimentRequest) (*pb.StartExperimentResponse, error) {
err := s.experimentServer.StartExperiment(experiment.CreateExperimentFromRequest(req.Experiment))
if err != nil {
Expand Down
36 changes: 25 additions & 11 deletions scheduler/pkg/server/server_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
pb "github.com/seldonio/seldon-core/apis/go/v2/mlops/scheduler"

"github.com/seldonio/seldon-core/scheduler/v2/pkg/coordinator"
"github.com/seldonio/seldon-core/scheduler/v2/pkg/store"
)

func (s *SchedulerServer) SubscribeModelStatus(req *pb.ModelSubscriptionRequest, stream pb.Scheduler_SubscribeModelStatusServer) error {
Expand Down Expand Up @@ -247,20 +248,33 @@ func (s *SchedulerServer) sendServerStatus() {
continue
}
ssr := createServerStatusResponse(server)
s.sendServerResponse(ssr)
}

for stream, subscription := range s.serverEventStream.streams {
hasExpired, err := sendWithTimeout(func() error { return stream.Send(ssr) }, s.timeout)
if hasExpired {
// this should trigger a reconnect from the client
close(subscription.fin)
delete(s.serverEventStream.streams, stream)
}
if err != nil {
logger.WithError(err).Errorf("Failed to send server status event to %s", subscription.name)
}
}

func (s *SchedulerServer) sendServerScale(server *store.ServerSnapshot, expectedReplicas uint32) {

Check failure on line 256 in scheduler/pkg/server/server_status.go

View workflow job for this annotation

GitHub Actions / lint

func `(*SchedulerServer).sendServerScale` is unused (unused)
// TODO: should there be some sort of velocity check ?
logger := s.logger.WithField("func", "sendServerScale")
logger.Debugf("will attempt to scale servers to %d for %v", expectedReplicas, server.Name)

ssr := createServerScaleResponse(server, expectedReplicas)
s.sendServerResponse(ssr)
}

func (s *SchedulerServer) sendServerResponse(ssr *pb.ServerStatusResponse) {
logger := s.logger.WithField("func", "sendServerStatusResponse")
for stream, subscription := range s.serverEventStream.streams {
hasExpired, err := sendWithTimeout(func() error { return stream.Send(ssr) }, s.timeout)
if hasExpired {
// this should trigger a reconnect from the client
close(subscription.fin)
delete(s.serverEventStream.streams, stream)
}
if err != nil {
logger.WithError(err).Errorf("Failed to send server status response to %s", subscription.name)
}
}

}

// initial send of server statuses to a new controller
Expand Down

0 comments on commit 1b2f531

Please sign in to comment.