diff --git a/pkg/coscheduling/core/core.go b/pkg/coscheduling/core/core.go index fd9decdca2..1518db6ab2 100644 --- a/pkg/coscheduling/core/core.go +++ b/pkg/coscheduling/core/core.go @@ -28,8 +28,10 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/sets" informerv1 "k8s.io/client-go/informers/core/v1" listerv1 "k8s.io/client-go/listers/core/v1" + "k8s.io/client-go/tools/cache" "k8s.io/klog/v2" "k8s.io/kubernetes/pkg/scheduler/framework" "sigs.k8s.io/controller-runtime/pkg/client" @@ -54,10 +56,11 @@ const ( type Manager interface { PreFilter(context.Context, *corev1.Pod) error Permit(context.Context, *corev1.Pod) Status + Unreserve(context.Context, *corev1.Pod) GetPodGroup(context.Context, *corev1.Pod) (string, *v1alpha1.PodGroup) + GetAssignedPodCount(string) int GetCreationTimestamp(*corev1.Pod, time.Time) time.Time DeletePermittedPodGroup(string) - CalculateAssignedPods(string, string) int ActivateSiblings(pod *corev1.Pod, state *framework.CycleState) BackoffPodGroup(string, time.Duration) } @@ -77,9 +80,15 @@ type PodGroupManager struct { backedOffPG *gochache.Cache // podLister is pod lister podLister listerv1.PodLister + // + podgroups map[string]PodGroupInfo sync.RWMutex } +type PodGroupInfo struct { + assigned sets.Set[string] +} + // NewPodGroupManager creates a new operation object. func NewPodGroupManager(client client.Client, snapshotSharedLister framework.SharedLister, scheduleTimeout *time.Duration, podInformer informerv1.PodInformer) *PodGroupManager { pgMgr := &PodGroupManager{ @@ -89,10 +98,37 @@ func NewPodGroupManager(client client.Client, snapshotSharedLister framework.Sha podLister: podInformer.Lister(), permittedPG: gochache.New(3*time.Second, 3*time.Second), backedOffPG: gochache.New(10*time.Second, 10*time.Second), + podgroups: map[string]PodGroupInfo{}, } + podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + DeleteFunc: func(obj interface{}) { + switch t := obj.(type) { + case *corev1.Pod: + pod := t + pgMgr.Unreserve(context.Background(), pod) + return + case cache.DeletedFinalStateUnknown: + pod, ok := t.Obj.(*corev1.Pod) + if !ok { + return + } + pgMgr.Unreserve(context.Background(), pod) + return + default: + return + } + }, + }) return pgMgr } +func (pgMgr *PodGroupManager) GetAssignedPodCount(pgName string) int { + if pgInfo, exist := pgMgr.podgroups[pgName]; exist { + return len(pgInfo.assigned) + } + return 0 +} + func (pgMgr *PodGroupManager) BackoffPodGroup(pgName string, backoff time.Duration) { if backoff == time.Duration(0) { return @@ -203,15 +239,34 @@ func (pgMgr *PodGroupManager) Permit(ctx context.Context, pod *corev1.Pod) Statu return PodGroupNotFound } - assigned := pgMgr.CalculateAssignedPods(pg.Name, pg.Namespace) + pgInfo, exist := pgMgr.podgroups[pgFullName] + if !exist { + pgInfo = PodGroupInfo{assigned: sets.Set[string]{}} + } + pgInfo.assigned.Insert(pod.Name) // The number of pods that have been assigned nodes is calculated from the snapshot. // The current pod in not included in the snapshot during the current scheduling cycle. - if int32(assigned)+1 >= pg.Spec.MinMember { + if len(pgInfo.assigned) >= int(pg.Spec.MinMember) { return Success } return Wait } +func (pgMgr *PodGroupManager) Unreserve(ctx context.Context, pod *corev1.Pod) { + pgFullName, _ := pgMgr.GetPodGroup(ctx, pod) + if pgFullName == "" { + return + } + + pgInfo, exist := pgMgr.podgroups[pgFullName] + if exist { + pgInfo.assigned.Delete(pod.Name) + if len(pgInfo.assigned) == 0 { + delete(pgInfo.assigned, pgFullName) + } + } +} + // GetCreationTimestamp returns the creation time of a podGroup or a pod. func (pgMgr *PodGroupManager) GetCreationTimestamp(pod *corev1.Pod, ts time.Time) time.Time { pgName := util.GetPodGroupLabel(pod) @@ -243,26 +298,6 @@ func (pgMgr *PodGroupManager) GetPodGroup(ctx context.Context, pod *corev1.Pod) return fmt.Sprintf("%v/%v", pod.Namespace, pgName), &pg } -// CalculateAssignedPods returns the number of pods that has been assigned nodes: assumed or bound. -func (pgMgr *PodGroupManager) CalculateAssignedPods(podGroupName, namespace string) int { - nodeInfos, err := pgMgr.snapshotSharedLister.NodeInfos().List() - if err != nil { - klog.ErrorS(err, "Cannot get nodeInfos from frameworkHandle") - return 0 - } - var count int - for _, nodeInfo := range nodeInfos { - for _, podInfo := range nodeInfo.Pods { - pod := podInfo.Pod - if util.GetPodGroupLabel(pod) == podGroupName && pod.Namespace == namespace && pod.Spec.NodeName != "" { - count++ - } - } - } - - return count -} - // CheckClusterResource checks if resource capacity of the cluster can satisfy . // It returns an error detailing the resource gap if not satisfied; otherwise returns nil. func CheckClusterResource(nodeList []*framework.NodeInfo, resourceRequest corev1.ResourceList, desiredPodGroupName string) error { diff --git a/pkg/coscheduling/coscheduling.go b/pkg/coscheduling/coscheduling.go index 8c61c60d57..c60ba19dd2 100644 --- a/pkg/coscheduling/coscheduling.go +++ b/pkg/coscheduling/coscheduling.go @@ -159,7 +159,7 @@ func (cs *Coscheduling) PostFilter(ctx context.Context, state *framework.CycleSt // This indicates there are already enough Pods satisfying the PodGroup, // so don't bother to reject the whole PodGroup. - assigned := cs.pgMgr.CalculateAssignedPods(pg.Name, pod.Namespace) + assigned := cs.pgMgr.GetAssignedPodCount(pg.Name) if assigned >= int(pg.Spec.MinMember) { klog.V(4).InfoS("Assigned pods", "podGroup", klog.KObj(pg), "assigned", assigned) return &framework.PostFilterResult{}, framework.NewStatus(framework.Unschedulable) @@ -247,6 +247,7 @@ func (cs *Coscheduling) Unreserve(ctx context.Context, state *framework.CycleSta if pg == nil { return } + cs.pgMgr.Unreserve(ctx, pod) cs.frameworkHandler.IterateOverWaitingPods(func(waitingPod framework.WaitingPod) { if waitingPod.GetPod().Namespace == pod.Namespace && util.GetPodGroupLabel(waitingPod.GetPod()) == pg.Name { klog.V(3).InfoS("Unreserve rejects", "pod", klog.KObj(waitingPod.GetPod()), "podGroup", klog.KObj(pg))