Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: Cache requirements for pods alongside requests #1950

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 5 additions & 13 deletions pkg/controllers/provisioning/scheduling/existingnode.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func NewExistingNode(n *state.StateNode, topology *Topology, taints []v1.Taint,
return node
}

func (n *ExistingNode) Add(ctx context.Context, kubeClient client.Client, pod *v1.Pod, podRequests v1.ResourceList) error {
func (n *ExistingNode) Add(ctx context.Context, kubeClient client.Client, pod *v1.Pod, podData *PodData) error {
// Check Taints
if err := scheduling.Taints(n.cachedTaints).Tolerates(pod); err != nil {
return err
Expand All @@ -86,29 +86,21 @@ func (n *ExistingNode) Add(ctx context.Context, kubeClient client.Client, pod *v

// check resource requests first since that's a pretty likely reason the pod won't schedule on an in-flight
// node, which at this point can't be increased in size
requests := resources.Merge(n.requests, podRequests)
requests := resources.Merge(n.requests, podData.Requests)

if !resources.Fits(requests, n.cachedAvailable) {
return fmt.Errorf("exceeds node resources")
}

nodeRequirements := scheduling.NewRequirements(n.requirements.Values()...)
podRequirements := scheduling.NewPodRequirements(pod)
// Check NodeClaim Affinity Requirements
if err = nodeRequirements.Compatible(podRequirements); err != nil {
if err = nodeRequirements.Compatible(podData.Requirements); err != nil {
return err
}
nodeRequirements.Add(podRequirements.Values()...)

strictPodRequirements := podRequirements
if scheduling.HasPreferredNodeAffinity(pod) {
// strictPodRequirements is important as it ensures we don't inadvertently restrict the possible pod domains by a
// preferred node affinity. Only required node affinities can actually reduce pod domains.
strictPodRequirements = scheduling.NewStrictPodRequirements(pod)
}
nodeRequirements.Add(podData.Requirements.Values()...)

// Check Topology Requirements
topologyRequirements, err := n.topology.AddRequirements(strictPodRequirements, nodeRequirements, pod)
topologyRequirements, err := n.topology.AddRequirements(podData.StrictRequirements, nodeRequirements, pod)
if err != nil {
return err
}
Expand Down
19 changes: 6 additions & 13 deletions pkg/controllers/provisioning/scheduling/nodeclaim.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func NewNodeClaim(nodeClaimTemplate *NodeClaimTemplate, topology *Topology, daem
}
}

func (n *NodeClaim) Add(pod *v1.Pod, podRequests v1.ResourceList) error {
func (n *NodeClaim) Add(pod *v1.Pod, podData *PodData) error {
// Check Taints
if err := scheduling.Taints(n.Spec.Taints).Tolerates(pod); err != nil {
return err
Expand All @@ -76,22 +76,15 @@ func (n *NodeClaim) Add(pod *v1.Pod, podRequests v1.ResourceList) error {
return fmt.Errorf("checking host port usage, %w", err)
}
nodeClaimRequirements := scheduling.NewRequirements(n.Requirements.Values()...)
podRequirements := scheduling.NewPodRequirements(pod)

// Check NodeClaim Affinity Requirements
if err := nodeClaimRequirements.Compatible(podRequirements, scheduling.AllowUndefinedWellKnownLabels); err != nil {
if err := nodeClaimRequirements.Compatible(podData.Requirements, scheduling.AllowUndefinedWellKnownLabels); err != nil {
return fmt.Errorf("incompatible requirements, %w", err)
}
nodeClaimRequirements.Add(podRequirements.Values()...)
nodeClaimRequirements.Add(podData.Requirements.Values()...)

strictPodRequirements := podRequirements
if scheduling.HasPreferredNodeAffinity(pod) {
// strictPodRequirements is important as it ensures we don't inadvertently restrict the possible pod domains by a
// preferred node affinity. Only required node affinities can actually reduce pod domains.
strictPodRequirements = scheduling.NewStrictPodRequirements(pod)
}
// Check Topology Requirements
topologyRequirements, err := n.topology.AddRequirements(strictPodRequirements, nodeClaimRequirements, pod, scheduling.AllowUndefinedWellKnownLabels)
topologyRequirements, err := n.topology.AddRequirements(podData.StrictRequirements, nodeClaimRequirements, pod, scheduling.AllowUndefinedWellKnownLabels)
if err != nil {
return err
}
Expand All @@ -101,9 +94,9 @@ func (n *NodeClaim) Add(pod *v1.Pod, podRequests v1.ResourceList) error {
nodeClaimRequirements.Add(topologyRequirements.Values()...)

// Check instance type combinations
requests := resources.Merge(n.Spec.Resources.Requests, podRequests)
requests := resources.Merge(n.Spec.Resources.Requests, podData.Requests)

remaining, err := filterInstanceTypesByRequirements(n.InstanceTypeOptions, nodeClaimRequirements, podRequests, n.daemonResources, requests)
remaining, err := filterInstanceTypesByRequirements(n.InstanceTypeOptions, nodeClaimRequirements, podData.Requests, n.daemonResources, requests)
if err != nil {
// We avoid wrapping this err because calling String() on InstanceTypeFilterError is an expensive operation
// due to calls to resources.Merge and stringifying the nodeClaimRequirements
Expand Down
10 changes: 5 additions & 5 deletions pkg/controllers/provisioning/scheduling/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ type Queue struct {
}

// NewQueue constructs a new queue given the input pods, sorting them to optimize for bin-packing into nodes.
func NewQueue(pods []*v1.Pod, podRequests map[types.UID]v1.ResourceList) *Queue {
sort.Slice(pods, byCPUAndMemoryDescending(pods, podRequests))
func NewQueue(pods []*v1.Pod, podData map[types.UID]*PodData) *Queue {
sort.Slice(pods, byCPUAndMemoryDescending(pods, podData))
return &Queue{
pods: pods,
lastLen: map[types.UID]int{},
Expand Down Expand Up @@ -73,13 +73,13 @@ func (q *Queue) List() []*v1.Pod {
return q.pods
}

func byCPUAndMemoryDescending(pods []*v1.Pod, podRequests map[types.UID]v1.ResourceList) func(i int, j int) bool {
func byCPUAndMemoryDescending(pods []*v1.Pod, podData map[types.UID]*PodData) func(i int, j int) bool {
return func(i, j int) bool {
lhsPod := pods[i]
rhsPod := pods[j]

lhs := podRequests[lhsPod.UID]
rhs := podRequests[rhsPod.UID]
lhs := podData[lhsPod.UID].Requests
rhs := podData[rhsPod.UID].Requests

cpuCmp := resources.Cmp(lhs[v1.ResourceCPU], rhs[v1.ResourceCPU])
if cpuCmp < 0 {
Expand Down
37 changes: 30 additions & 7 deletions pkg/controllers/provisioning/scheduling/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func NewScheduler(ctx context.Context, kubeClient client.Client, nodePools []*v1
topology: topology,
cluster: cluster,
daemonOverhead: getDaemonOverhead(templates, daemonSetPods),
cachedPodRequests: map[types.UID]corev1.ResourceList{}, // cache pod requests to avoid having to continually recompute this total
cachedPodData: map[types.UID]*PodData{}, // cache pod data to avoid having to continually recompute it
recorder: recorder,
preferences: &Preferences{ToleratePreferNoSchedule: toleratePreferNoSchedule},
remainingResources: lo.SliceToMap(nodePools, func(np *v1.NodePool) (string, corev1.ResourceList) {
Expand All @@ -89,14 +89,20 @@ func NewScheduler(ctx context.Context, kubeClient client.Client, nodePools []*v1
return s
}

type PodData struct {
Requests corev1.ResourceList
Requirements scheduling.Requirements
StrictRequirements scheduling.Requirements
}

type Scheduler struct {
id types.UID // Unique UUID attached to this scheduling loop
newNodeClaims []*NodeClaim
existingNodes []*ExistingNode
nodeClaimTemplates []*NodeClaimTemplate
remainingResources map[string]corev1.ResourceList // (NodePool name) -> remaining resources for that NodePool
daemonOverhead map[*NodeClaimTemplate]corev1.ResourceList
cachedPodRequests map[types.UID]corev1.ResourceList // (Pod Namespace/Name) -> calculated resource requests for the pod
cachedPodData map[types.UID]*PodData // (Pod Namespace/Name) -> pre-computed data for pods to avoid re-computation and memory usage
preferences *Preferences
topology *Topology
cluster *state.Cluster
Expand Down Expand Up @@ -217,9 +223,9 @@ func (s *Scheduler) Solve(ctx context.Context, pods []*corev1.Pod) Results {
UnschedulablePodsCount.DeletePartialMatch(map[string]string{ControllerLabel: injection.GetControllerName(ctx)})
QueueDepth.DeletePartialMatch(map[string]string{ControllerLabel: injection.GetControllerName(ctx)})
for _, p := range pods {
s.cachedPodRequests[p.UID] = resources.RequestsForPods(p)
s.updateCachedPodData(p)
}
q := NewQueue(pods, s.cachedPodRequests)
q := NewQueue(pods, s.cachedPodData)

startTime := s.clock.Now()
lastLogTime := s.clock.Now()
Expand Down Expand Up @@ -251,6 +257,8 @@ func (s *Scheduler) Solve(ctx context.Context, pods []*corev1.Pod) Results {
if err := s.topology.Update(ctx, pod); err != nil {
log.FromContext(ctx).Error(err, "failed updating topology")
}
// Update the cached podData since the pod was relaxed and it could have changed its requirement set
s.updateCachedPodData(pod)
}
}
UnfinishedWorkSeconds.Delete(map[string]string{ControllerLabel: injection.GetControllerName(ctx), schedulingIDLabel: string(s.id)})
Expand All @@ -265,10 +273,25 @@ func (s *Scheduler) Solve(ctx context.Context, pods []*corev1.Pod) Results {
}
}

func (s *Scheduler) updateCachedPodData(p *corev1.Pod) {
requirements := scheduling.NewPodRequirements(p)
strictRequirements := requirements
if scheduling.HasPreferredNodeAffinity(p) {
// strictPodRequirements is important as it ensures we don't inadvertently restrict the possible pod domains by a
// preferred node affinity. Only required node affinities can actually reduce pod domains.
strictRequirements = scheduling.NewStrictPodRequirements(p)
}
s.cachedPodData[p.UID] = &PodData{
Requests: resources.RequestsForPods(p),
Requirements: requirements,
StrictRequirements: strictRequirements,
}
}

func (s *Scheduler) add(ctx context.Context, pod *corev1.Pod) error {
// first try to schedule against an in-flight real node
for _, node := range s.existingNodes {
if err := node.Add(ctx, s.kubeClient, pod, s.cachedPodRequests[pod.UID]); err == nil {
if err := node.Add(ctx, s.kubeClient, pod, s.cachedPodData[pod.UID]); err == nil {
return nil
}
}
Expand All @@ -278,7 +301,7 @@ func (s *Scheduler) add(ctx context.Context, pod *corev1.Pod) error {

// Pick existing node that we are about to create
for _, nodeClaim := range s.newNodeClaims {
if err := nodeClaim.Add(pod, s.cachedPodRequests[pod.UID]); err == nil {
if err := nodeClaim.Add(pod, s.cachedPodData[pod.UID]); err == nil {
return nil
}
}
Expand All @@ -299,7 +322,7 @@ func (s *Scheduler) add(ctx context.Context, pod *corev1.Pod) error {
}
}
nodeClaim := NewNodeClaim(nodeClaimTemplate, s.topology, s.daemonOverhead[nodeClaimTemplate], instanceTypes)
if err := nodeClaim.Add(pod, s.cachedPodRequests[pod.UID]); err != nil {
if err := nodeClaim.Add(pod, s.cachedPodData[pod.UID]); err != nil {
nodeClaim.Destroy() // Ensure we cleanup any changes that we made while mocking out a NodeClaim
errs = multierr.Append(errs, fmt.Errorf("incompatible with nodepool %q, daemonset overhead=%s, %w",
nodeClaimTemplate.NodePoolName,
Expand Down
Loading