From 3ad4e5745de8715b5f39d1c26e6075510302ef7e Mon Sep 17 00:00:00 2001 From: Jonathan Innis Date: Thu, 30 Jan 2025 23:12:34 -0800 Subject: [PATCH] Cache requirements for pods alongside requests --- .../provisioning/scheduling/existingnode.go | 18 +++------ .../provisioning/scheduling/nodeclaim.go | 19 +++------- .../provisioning/scheduling/queue.go | 10 ++--- .../provisioning/scheduling/scheduler.go | 37 +++++++++++++++---- 4 files changed, 46 insertions(+), 38 deletions(-) diff --git a/pkg/controllers/provisioning/scheduling/existingnode.go b/pkg/controllers/provisioning/scheduling/existingnode.go index 2deba7ca27..9556be54b1 100644 --- a/pkg/controllers/provisioning/scheduling/existingnode.go +++ b/pkg/controllers/provisioning/scheduling/existingnode.go @@ -65,7 +65,7 @@ func NewExistingNode(n *state.StateNode, topology *Topology, taints []v1.Taint, return node } -func (n *ExistingNode) Add(ctx context.Context, kubeClient client.Client, pod *v1.Pod, podRequests v1.ResourceList) error { +func (n *ExistingNode) Add(ctx context.Context, kubeClient client.Client, pod *v1.Pod, podData *PodData) error { // Check Taints if err := scheduling.Taints(n.cachedTaints).Tolerates(pod); err != nil { return err @@ -86,29 +86,21 @@ func (n *ExistingNode) Add(ctx context.Context, kubeClient client.Client, pod *v // check resource requests first since that's a pretty likely reason the pod won't schedule on an in-flight // node, which at this point can't be increased in size - requests := resources.Merge(n.requests, podRequests) + requests := resources.Merge(n.requests, podData.Requests) if !resources.Fits(requests, n.cachedAvailable) { return fmt.Errorf("exceeds node resources") } nodeRequirements := scheduling.NewRequirements(n.requirements.Values()...) - podRequirements := scheduling.NewPodRequirements(pod) // Check NodeClaim Affinity Requirements - if err = nodeRequirements.Compatible(podRequirements); err != nil { + if err = nodeRequirements.Compatible(podData.Requirements); err != nil { return err } - nodeRequirements.Add(podRequirements.Values()...) - - strictPodRequirements := podRequirements - if scheduling.HasPreferredNodeAffinity(pod) { - // strictPodRequirements is important as it ensures we don't inadvertently restrict the possible pod domains by a - // preferred node affinity. Only required node affinities can actually reduce pod domains. - strictPodRequirements = scheduling.NewStrictPodRequirements(pod) - } + nodeRequirements.Add(podData.Requirements.Values()...) // Check Topology Requirements - topologyRequirements, err := n.topology.AddRequirements(strictPodRequirements, nodeRequirements, pod) + topologyRequirements, err := n.topology.AddRequirements(podData.StrictRequirements, nodeRequirements, pod) if err != nil { return err } diff --git a/pkg/controllers/provisioning/scheduling/nodeclaim.go b/pkg/controllers/provisioning/scheduling/nodeclaim.go index fdd8a190f8..0e44bfae49 100644 --- a/pkg/controllers/provisioning/scheduling/nodeclaim.go +++ b/pkg/controllers/provisioning/scheduling/nodeclaim.go @@ -64,7 +64,7 @@ func NewNodeClaim(nodeClaimTemplate *NodeClaimTemplate, topology *Topology, daem } } -func (n *NodeClaim) Add(pod *v1.Pod, podRequests v1.ResourceList) error { +func (n *NodeClaim) Add(pod *v1.Pod, podData *PodData) error { // Check Taints if err := scheduling.Taints(n.Spec.Taints).Tolerates(pod); err != nil { return err @@ -76,22 +76,15 @@ func (n *NodeClaim) Add(pod *v1.Pod, podRequests v1.ResourceList) error { return fmt.Errorf("checking host port usage, %w", err) } nodeClaimRequirements := scheduling.NewRequirements(n.Requirements.Values()...) - podRequirements := scheduling.NewPodRequirements(pod) // Check NodeClaim Affinity Requirements - if err := nodeClaimRequirements.Compatible(podRequirements, scheduling.AllowUndefinedWellKnownLabels); err != nil { + if err := nodeClaimRequirements.Compatible(podData.Requirements, scheduling.AllowUndefinedWellKnownLabels); err != nil { return fmt.Errorf("incompatible requirements, %w", err) } - nodeClaimRequirements.Add(podRequirements.Values()...) + nodeClaimRequirements.Add(podData.Requirements.Values()...) - strictPodRequirements := podRequirements - if scheduling.HasPreferredNodeAffinity(pod) { - // strictPodRequirements is important as it ensures we don't inadvertently restrict the possible pod domains by a - // preferred node affinity. Only required node affinities can actually reduce pod domains. - strictPodRequirements = scheduling.NewStrictPodRequirements(pod) - } // Check Topology Requirements - topologyRequirements, err := n.topology.AddRequirements(strictPodRequirements, nodeClaimRequirements, pod, scheduling.AllowUndefinedWellKnownLabels) + topologyRequirements, err := n.topology.AddRequirements(podData.StrictRequirements, nodeClaimRequirements, pod, scheduling.AllowUndefinedWellKnownLabels) if err != nil { return err } @@ -101,9 +94,9 @@ func (n *NodeClaim) Add(pod *v1.Pod, podRequests v1.ResourceList) error { nodeClaimRequirements.Add(topologyRequirements.Values()...) // Check instance type combinations - requests := resources.Merge(n.Spec.Resources.Requests, podRequests) + requests := resources.Merge(n.Spec.Resources.Requests, podData.Requests) - remaining, err := filterInstanceTypesByRequirements(n.InstanceTypeOptions, nodeClaimRequirements, podRequests, n.daemonResources, requests) + remaining, err := filterInstanceTypesByRequirements(n.InstanceTypeOptions, nodeClaimRequirements, podData.Requests, n.daemonResources, requests) if err != nil { // We avoid wrapping this err because calling String() on InstanceTypeFilterError is an expensive operation // due to calls to resources.Merge and stringifying the nodeClaimRequirements diff --git a/pkg/controllers/provisioning/scheduling/queue.go b/pkg/controllers/provisioning/scheduling/queue.go index c5b3001bec..a36db85923 100644 --- a/pkg/controllers/provisioning/scheduling/queue.go +++ b/pkg/controllers/provisioning/scheduling/queue.go @@ -34,8 +34,8 @@ type Queue struct { } // NewQueue constructs a new queue given the input pods, sorting them to optimize for bin-packing into nodes. -func NewQueue(pods []*v1.Pod, podRequests map[types.UID]v1.ResourceList) *Queue { - sort.Slice(pods, byCPUAndMemoryDescending(pods, podRequests)) +func NewQueue(pods []*v1.Pod, podData map[types.UID]*PodData) *Queue { + sort.Slice(pods, byCPUAndMemoryDescending(pods, podData)) return &Queue{ pods: pods, lastLen: map[types.UID]int{}, @@ -73,13 +73,13 @@ func (q *Queue) List() []*v1.Pod { return q.pods } -func byCPUAndMemoryDescending(pods []*v1.Pod, podRequests map[types.UID]v1.ResourceList) func(i int, j int) bool { +func byCPUAndMemoryDescending(pods []*v1.Pod, podData map[types.UID]*PodData) func(i int, j int) bool { return func(i, j int) bool { lhsPod := pods[i] rhsPod := pods[j] - lhs := podRequests[lhsPod.UID] - rhs := podRequests[rhsPod.UID] + lhs := podData[lhsPod.UID].Requests + rhs := podData[rhsPod.UID].Requests cpuCmp := resources.Cmp(lhs[v1.ResourceCPU], rhs[v1.ResourceCPU]) if cpuCmp < 0 { diff --git a/pkg/controllers/provisioning/scheduling/scheduler.go b/pkg/controllers/provisioning/scheduling/scheduler.go index 181b7b0694..457ac4fa0e 100644 --- a/pkg/controllers/provisioning/scheduling/scheduler.go +++ b/pkg/controllers/provisioning/scheduling/scheduler.go @@ -77,7 +77,7 @@ func NewScheduler(ctx context.Context, kubeClient client.Client, nodePools []*v1 topology: topology, cluster: cluster, daemonOverhead: getDaemonOverhead(templates, daemonSetPods), - cachedPodRequests: map[types.UID]corev1.ResourceList{}, // cache pod requests to avoid having to continually recompute this total + cachedPodData: map[types.UID]*PodData{}, // cache pod data to avoid having to continually recompute it recorder: recorder, preferences: &Preferences{ToleratePreferNoSchedule: toleratePreferNoSchedule}, remainingResources: lo.SliceToMap(nodePools, func(np *v1.NodePool) (string, corev1.ResourceList) { @@ -89,6 +89,12 @@ func NewScheduler(ctx context.Context, kubeClient client.Client, nodePools []*v1 return s } +type PodData struct { + Requests corev1.ResourceList + Requirements scheduling.Requirements + StrictRequirements scheduling.Requirements +} + type Scheduler struct { id types.UID // Unique UUID attached to this scheduling loop newNodeClaims []*NodeClaim @@ -96,7 +102,7 @@ type Scheduler struct { nodeClaimTemplates []*NodeClaimTemplate remainingResources map[string]corev1.ResourceList // (NodePool name) -> remaining resources for that NodePool daemonOverhead map[*NodeClaimTemplate]corev1.ResourceList - cachedPodRequests map[types.UID]corev1.ResourceList // (Pod Namespace/Name) -> calculated resource requests for the pod + cachedPodData map[types.UID]*PodData // (Pod Namespace/Name) -> pre-computed data for pods to avoid re-computation and memory usage preferences *Preferences topology *Topology cluster *state.Cluster @@ -217,9 +223,9 @@ func (s *Scheduler) Solve(ctx context.Context, pods []*corev1.Pod) Results { UnschedulablePodsCount.DeletePartialMatch(map[string]string{ControllerLabel: injection.GetControllerName(ctx)}) QueueDepth.DeletePartialMatch(map[string]string{ControllerLabel: injection.GetControllerName(ctx)}) for _, p := range pods { - s.cachedPodRequests[p.UID] = resources.RequestsForPods(p) + s.updateCachedPodData(p) } - q := NewQueue(pods, s.cachedPodRequests) + q := NewQueue(pods, s.cachedPodData) startTime := s.clock.Now() lastLogTime := s.clock.Now() @@ -251,6 +257,8 @@ func (s *Scheduler) Solve(ctx context.Context, pods []*corev1.Pod) Results { if err := s.topology.Update(ctx, pod); err != nil { log.FromContext(ctx).Error(err, "failed updating topology") } + // Update the cached podData since the pod was relaxed and it could have changed its requirement set + s.updateCachedPodData(pod) } } UnfinishedWorkSeconds.Delete(map[string]string{ControllerLabel: injection.GetControllerName(ctx), schedulingIDLabel: string(s.id)}) @@ -265,10 +273,25 @@ func (s *Scheduler) Solve(ctx context.Context, pods []*corev1.Pod) Results { } } +func (s *Scheduler) updateCachedPodData(p *corev1.Pod) { + requirements := scheduling.NewPodRequirements(p) + strictRequirements := requirements + if scheduling.HasPreferredNodeAffinity(p) { + // strictPodRequirements is important as it ensures we don't inadvertently restrict the possible pod domains by a + // preferred node affinity. Only required node affinities can actually reduce pod domains. + strictRequirements = scheduling.NewStrictPodRequirements(p) + } + s.cachedPodData[p.UID] = &PodData{ + Requests: resources.RequestsForPods(p), + Requirements: requirements, + StrictRequirements: strictRequirements, + } +} + func (s *Scheduler) add(ctx context.Context, pod *corev1.Pod) error { // first try to schedule against an in-flight real node for _, node := range s.existingNodes { - if err := node.Add(ctx, s.kubeClient, pod, s.cachedPodRequests[pod.UID]); err == nil { + if err := node.Add(ctx, s.kubeClient, pod, s.cachedPodData[pod.UID]); err == nil { return nil } } @@ -278,7 +301,7 @@ func (s *Scheduler) add(ctx context.Context, pod *corev1.Pod) error { // Pick existing node that we are about to create for _, nodeClaim := range s.newNodeClaims { - if err := nodeClaim.Add(pod, s.cachedPodRequests[pod.UID]); err == nil { + if err := nodeClaim.Add(pod, s.cachedPodData[pod.UID]); err == nil { return nil } } @@ -299,7 +322,7 @@ func (s *Scheduler) add(ctx context.Context, pod *corev1.Pod) error { } } nodeClaim := NewNodeClaim(nodeClaimTemplate, s.topology, s.daemonOverhead[nodeClaimTemplate], instanceTypes) - if err := nodeClaim.Add(pod, s.cachedPodRequests[pod.UID]); err != nil { + if err := nodeClaim.Add(pod, s.cachedPodData[pod.UID]); err != nil { nodeClaim.Destroy() // Ensure we cleanup any changes that we made while mocking out a NodeClaim errs = multierr.Append(errs, fmt.Errorf("incompatible with nodepool %q, daemonset overhead=%s, %w", nodeClaimTemplate.NodePoolName,