- pkg/scheduler/algorithmprovider/defaults/defaults.go
//注册调度策略元数据工厂方法、注册当前支持的所有预选、优选算法到全局变量fitPredicateMap、priorityFunctionMap中(见pkg/scheduler/factory/plugins.go),注册默认预选、优选策略集Provider到全局变量algorithmProviderMap中(见pkg/scheduler/factory/plugins.go)
func init() {
//注册预选、优选计算时所需元数据的获取工厂方法
// Register functions that extract metadata used by predicates and priorities computations.
factory.RegisterPredicateMetadataProducerFactory(
func(args factory.PluginFactoryArgs) algorithm.PredicateMetadataProducer {
return predicates.NewPredicateMetadataFactory(args.PodLister)
})
factory.RegisterPriorityMetadataProducerFactory(
func(args factory.PluginFactoryArgs) algorithm.PriorityMetadataProducer {
return priorities.NewPriorityMetadataFactory(args.ServiceLister, args.ControllerLister, args.ReplicaSetLister, args.StatefulSetLister)
})
//注册默认的预选、优选算法集Provider(DefaultProvider、ClusterAutoscalerProvider)
registerAlgorithmProvider(defaultPredicates(), defaultPriorities())
//注册一些默认未启用的预选优选算法,但是可在用户自定义预选、优选调度算法集Provider配置中挑选应用
// Registers predicates and priorities that are not enabled by default, but user can pick when creating their
// own set of priorities/predicates.
// PodFitsPorts has been replaced by PodFitsHostPorts for better user understanding.
// For backwards compatibility with 1.0, PodFitsPorts is registered as well.
factory.RegisterFitPredicate("PodFitsPorts", predicates.PodFitsHostPorts)
// Fit is defined based on the absence of port conflicts.
// This predicate is actually a default predicate, because it is invoked from
// predicates.GeneralPredicates()
factory.RegisterFitPredicate(predicates.PodFitsHostPortsPred, predicates.PodFitsHostPorts)
// Fit is determined by resource availability.
// This predicate is actually a default predicate, because it is invoked from
// predicates.GeneralPredicates()
factory.RegisterFitPredicate(predicates.PodFitsResourcesPred, predicates.PodFitsResources)
// Fit is determined by the presence of the Host parameter and a string match
// This predicate is actually a default predicate, because it is invoked from
// predicates.GeneralPredicates()
factory.RegisterFitPredicate(predicates.HostNamePred, predicates.PodFitsHost)
// Fit is determined by node selector query.
factory.RegisterFitPredicate(predicates.MatchNodeSelectorPred, predicates.PodMatchNodeSelector)
// ServiceSpreadingPriority is a priority config factory that spreads pods by minimizing
// the number of pods (belonging to the same service) on the same node.
// Register the factory so that it's available, but do not include it as part of the default priorities
// Largely replaced by "SelectorSpreadPriority", but registered for backward compatibility with 1.0
factory.RegisterPriorityConfigFactory(
"ServiceSpreadingPriority",
factory.PriorityConfigFactory{
MapReduceFunction: func(args factory.PluginFactoryArgs) (algorithm.PriorityMapFunction, algorithm.PriorityReduceFunction) {
return priorities.NewSelectorSpreadPriority(args.ServiceLister, algorithm.EmptyControllerLister{}, algorithm.EmptyReplicaSetLister{}, algorithm.EmptyStatefulSetLister{})
},
Weight: 1,
},
)
// EqualPriority is a prioritizer function that gives an equal weight of one to all nodes
// Register the priority function so that its available
// but do not include it as part of the default priorities
factory.RegisterPriorityFunction2("EqualPriority", core.EqualPriorityMap, nil, 1)
// ImageLocalityPriority prioritizes nodes based on locality of images requested by a pod. Nodes with larger size
// of already-installed packages required by the pod will be preferred over nodes with no already-installed
// packages required by the pod or a small total size of already-installed packages required by the pod.
factory.RegisterPriorityFunction2("ImageLocalityPriority", priorities.ImageLocalityPriorityMap, nil, 1)
// Optional, cluster-autoscaler friendly priority function - give used nodes higher priority.
factory.RegisterPriorityFunction2("MostRequestedPriority", priorities.MostRequestedPriorityMap, nil, 1)
factory.RegisterPriorityFunction2(
"RequestedToCapacityRatioPriority",
priorities.RequestedToCapacityRatioResourceAllocationPriorityDefault().PriorityMap,
nil,
1)
}
//默认Provider的预选算法集
func defaultPredicates() sets.String {
return sets.NewString(
// Fit is determined by volume zone requirements.
factory.RegisterFitPredicateFactory(
predicates.NoVolumeZoneConflictPred,
func(args factory.PluginFactoryArgs) algorithm.FitPredicate {
return predicates.NewVolumeZonePredicate(args.PVInfo, args.PVCInfo, args.StorageClassInfo)
},
),
// Fit is determined by whether or not there would be too many AWS EBS volumes attached to the node
factory.RegisterFitPredicateFactory(
predicates.MaxEBSVolumeCountPred,
func(args factory.PluginFactoryArgs) algorithm.FitPredicate {
return predicates.NewMaxPDVolumeCountPredicate(predicates.EBSVolumeFilterType, args.PVInfo, args.PVCInfo)
},
),
// Fit is determined by whether or not there would be too many GCE PD volumes attached to the node
factory.RegisterFitPredicateFactory(
predicates.MaxGCEPDVolumeCountPred,
func(args factory.PluginFactoryArgs) algorithm.FitPredicate {
return predicates.NewMaxPDVolumeCountPredicate(predicates.GCEPDVolumeFilterType, args.PVInfo, args.PVCInfo)
},
),
// Fit is determined by whether or not there would be too many Azure Disk volumes attached to the node
factory.RegisterFitPredicateFactory(
predicates.MaxAzureDiskVolumeCountPred,
func(args factory.PluginFactoryArgs) algorithm.FitPredicate {
return predicates.NewMaxPDVolumeCountPredicate(predicates.AzureDiskVolumeFilterType, args.PVInfo, args.PVCInfo)
},
),
// Fit is determined by inter-pod affinity.
factory.RegisterFitPredicateFactory(
predicates.MatchInterPodAffinityPred,
func(args factory.PluginFactoryArgs) algorithm.FitPredicate {
return predicates.NewPodAffinityPredicate(args.NodeInfo, args.PodLister)
},
),
// Fit is determined by non-conflicting disk volumes.
factory.RegisterFitPredicate(predicates.NoDiskConflictPred, predicates.NoDiskConflict),
// GeneralPredicates are the predicates that are enforced by all Kubernetes components
// (e.g. kubelet and all schedulers)
factory.RegisterFitPredicate(predicates.GeneralPred, predicates.GeneralPredicates),
// Fit is determined by node memory pressure condition.
factory.RegisterFitPredicate(predicates.CheckNodeMemoryPressurePred, predicates.CheckNodeMemoryPressurePredicate),
// Fit is determined by node disk pressure condition.
factory.RegisterFitPredicate(predicates.CheckNodeDiskPressurePred, predicates.CheckNodeDiskPressurePredicate),
// Fit is determined by node pid pressure condition.
factory.RegisterFitPredicate(predicates.CheckNodePIDPressurePred, predicates.CheckNodePIDPressurePredicate),
// Fit is determined by node conditions: not ready, network unavailable or out of disk.
factory.RegisterMandatoryFitPredicate(predicates.CheckNodeConditionPred, predicates.CheckNodeConditionPredicate),
// Fit is determined based on whether a pod can tolerate all of the node's taints
factory.RegisterFitPredicate(predicates.PodToleratesNodeTaintsPred, predicates.PodToleratesNodeTaints),
// Fit is determined by volume topology requirements.
factory.RegisterFitPredicateFactory(
predicates.CheckVolumeBindingPred,
func(args factory.PluginFactoryArgs) algorithm.FitPredicate {
return predicates.NewVolumeBindingPredicate(args.VolumeBinder)
},
),
)
}
//默认Provider的优选算法集
func defaultPriorities() sets.String {
return sets.NewString(
// spreads pods by minimizing the number of pods (belonging to the same service or replication controller) on the same node.
factory.RegisterPriorityConfigFactory(
"SelectorSpreadPriority",
factory.PriorityConfigFactory{
MapReduceFunction: func(args factory.PluginFactoryArgs) (algorithm.PriorityMapFunction, algorithm.PriorityReduceFunction) {
return priorities.NewSelectorSpreadPriority(args.ServiceLister, args.ControllerLister, args.ReplicaSetLister, args.StatefulSetLister)
},
Weight: 1,
},
),
// pods should be placed in the same topological domain (e.g. same node, same rack, same zone, same power domain, etc.)
// as some other pods, or, conversely, should not be placed in the same topological domain as some other pods.
factory.RegisterPriorityConfigFactory(
"InterPodAffinityPriority",
factory.PriorityConfigFactory{
Function: func(args factory.PluginFactoryArgs) algorithm.PriorityFunction {
return priorities.NewInterPodAffinityPriority(args.NodeInfo, args.NodeLister, args.PodLister, args.HardPodAffinitySymmetricWeight)
},
Weight: 1,
},
),
// Prioritize nodes by least requested utilization.
factory.RegisterPriorityFunction2("LeastRequestedPriority", priorities.LeastRequestedPriorityMap, nil, 1),
// Prioritizes nodes to help achieve balanced resource usage
factory.RegisterPriorityFunction2("BalancedResourceAllocation", priorities.BalancedResourceAllocationMap, nil, 1),
// Set this weight large enough to override all other priority functions.
// TODO: Figure out a better way to do this, maybe at same time as fixing #24720.
factory.RegisterPriorityFunction2("NodePreferAvoidPodsPriority", priorities.CalculateNodePreferAvoidPodsPriorityMap, nil, 10000),
// Prioritizes nodes that have labels matching NodeAffinity
factory.RegisterPriorityFunction2("NodeAffinityPriority", priorities.CalculateNodeAffinityPriorityMap, priorities.CalculateNodeAffinityPriorityReduce, 1),
// Prioritizes nodes that marked with taint which pod can tolerate.
factory.RegisterPriorityFunction2("TaintTolerationPriority", priorities.ComputeTaintTolerationPriorityMap, priorities.ComputeTaintTolerationPriorityReduce, 1),
)
}
- pkg/scheduler/algorithm/predicates/predicates.go GeneralPredicates的预选主要包含两步:
- 对non-critical类型的pod需要满足的预选检查
- 所有pod都需要满足的基本预选检查
// GeneralPredicates checks whether noncriticalPredicates and EssentialPredicates pass. noncriticalPredicates are the predicates
// that only non-critical pods need and EssentialPredicates are the predicates that all pods, including critical pods, need
func GeneralPredicates(pod *v1.Pod, meta algorithm.PredicateMetadata, nodeInfo *schedulercache.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) {
var predicateFails []algorithm.PredicateFailureReason
fit, reasons, err := noncriticalPredicates(pod, meta, nodeInfo)
if err != nil {
return false, predicateFails, err
}
if !fit {
predicateFails = append(predicateFails, reasons...)
}
fit, reasons, err = EssentialPredicates(pod, meta, nodeInfo)
if err != nil {
return false, predicateFails, err
}
if !fit {
predicateFails = append(predicateFails, reasons...)
}
return len(predicateFails) == 0, predicateFails, nil
}
noncriticalPredicates预选检查主要检查当前候选节点是否有足够的各种资源来run这个pod,如当前节点pod数是否已经达到允许的上限,当前节点剩余可请求的CPU资源能否满足pod请求的CPU配额,同理还有内存、临时存储、ScalarRsource。
// noncriticalPredicates are the predicates that only non-critical pods need
func noncriticalPredicates(pod *v1.Pod, meta algorithm.PredicateMetadata, nodeInfo *schedulercache.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) {
var predicateFails []algorithm.PredicateFailureReason
fit, reasons, err := PodFitsResources(pod, meta, nodeInfo)
if !fit {
predicateFails = append(predicateFails, reasons...)
}
return len(predicateFails) == 0, predicateFails, nil
}
// PodFitsResources checks if a node has sufficient resources, such as cpu, memory, gpu, opaque int resources etc to run a pod.
// First return value indicates whether a node has sufficient resources to run a pod while the second return value indicates the
// predicate failure reasons if the node has insufficient resources to run the pod.
func PodFitsResources(pod *v1.Pod, meta algorithm.PredicateMetadata, nodeInfo *schedulercache.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) {
node := nodeInfo.Node()
var predicateFails []algorithm.PredicateFailureReason
allowedPodNumber := nodeInfo.AllowedPodNumber()
if len(nodeInfo.Pods())+1 > allowedPodNumber {
predicateFails = append(predicateFails, NewInsufficientResourceError(v1.ResourcePods, 1, int64(len(nodeInfo.Pods())), int64(allowedPodNumber)))
}
// No extended resources should be ignored by default.
ignoredExtendedResources := sets.NewString()
var podRequest *schedulercache.Resource
if predicateMeta, ok := meta.(*predicateMetadata); ok {
podRequest = predicateMeta.podRequest
if predicateMeta.ignoredExtendedResources != nil {
ignoredExtendedResources = predicateMeta.ignoredExtendedResources
}
} else {
// We couldn't parse metadata - fallback to computing it.
podRequest = GetResourceRequest(pod)
}
if podRequest.MilliCPU == 0 &&
podRequest.Memory == 0 &&
podRequest.EphemeralStorage == 0 &&
len(podRequest.ScalarResources) == 0 {
return len(predicateFails) == 0, predicateFails, nil
}
allocatable := nodeInfo.AllocatableResource()
if allocatable.MilliCPU < podRequest.MilliCPU+nodeInfo.RequestedResource().MilliCPU {
predicateFails = append(predicateFails, NewInsufficientResourceError(v1.ResourceCPU, podRequest.MilliCPU, nodeInfo.RequestedResource().MilliCPU, allocatable.MilliCPU))
}
if allocatable.Memory < podRequest.Memory+nodeInfo.RequestedResource().Memory {
predicateFails = append(predicateFails, NewInsufficientResourceError(v1.ResourceMemory, podRequest.Memory, nodeInfo.RequestedResource().Memory, allocatable.Memory))
}
if allocatable.EphemeralStorage < podRequest.EphemeralStorage+nodeInfo.RequestedResource().EphemeralStorage {
predicateFails = append(predicateFails, NewInsufficientResourceError(v1.ResourceEphemeralStorage, podRequest.EphemeralStorage, nodeInfo.RequestedResource().EphemeralStorage, allocatable.EphemeralStorage))
}
for rName, rQuant := range podRequest.ScalarResources {
if v1helper.IsExtendedResourceName(rName) {
// If this resource is one of the extended resources that should be
// ignored, we will skip checking it.
if ignoredExtendedResources.Has(string(rName)) {
continue
}
}
if allocatable.ScalarResources[rName] < rQuant+nodeInfo.RequestedResource().ScalarResources[rName] {
predicateFails = append(predicateFails, NewInsufficientResourceError(rName, podRequest.ScalarResources[rName], nodeInfo.RequestedResource().ScalarResources[rName], allocatable.ScalarResources[rName]))
}
}
return len(predicateFails) == 0, predicateFails, nil
}
EssentialPredicates预选检查主要包含三个步骤:
- 检查当前节点名是否匹配pod指定的node name
- 检查pod要求的hostPort是否在当前Node上被占用
- 检查节点的标签label是否满足pod的节点亲和性node selector
// EssentialPredicates are the predicates that all pods, including critical pods, need
func EssentialPredicates(pod *v1.Pod, meta algorithm.PredicateMetadata, nodeInfo *schedulercache.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) {
var predicateFails []algorithm.PredicateFailureReason
fit, reasons, err := PodFitsHost(pod, meta, nodeInfo)
if err != nil {
return false, predicateFails, err
}
if !fit {
predicateFails = append(predicateFails, reasons...)
}
// TODO: PodFitsHostPorts is essential for now, but kubelet should ideally
// preempt pods to free up host ports too
fit, reasons, err = PodFitsHostPorts(pod, meta, nodeInfo)
if err != nil {
return false, predicateFails, err
}
if !fit {
predicateFails = append(predicateFails, reasons...)
}
fit, reasons, err = PodMatchNodeSelector(pod, meta, nodeInfo)
if err != nil {
return false, predicateFails, err
}
if !fit {
predicateFails = append(predicateFails, reasons...)
}
return len(predicateFails) == 0, predicateFails, nil
}
//检查当前节点名是否匹配pod指定的node name
// PodFitsHost checks if a pod spec node name matches the current node.
func PodFitsHost(pod *v1.Pod, meta algorithm.PredicateMetadata, nodeInfo *schedulercache.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) {
if len(pod.Spec.NodeName) == 0 {
return true, nil, nil
}
node := nodeInfo.Node()
if pod.Spec.NodeName == node.Name {
return true, nil, nil
}
return false, []algorithm.PredicateFailureReason{ErrPodNotMatchHostName}, nil
}
//检查pod要求的hostPort是否在当前Node上被占用
// PodFitsHostPorts checks if a node has free ports for the requested pod ports.
func PodFitsHostPorts(pod *v1.Pod, meta algorithm.PredicateMetadata, nodeInfo *schedulercache.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) {
var wantPorts []*v1.ContainerPort
if predicateMeta, ok := meta.(*predicateMetadata); ok {
wantPorts = predicateMeta.podPorts
} else {
// We couldn't parse metadata - fallback to computing it.
wantPorts = schedutil.GetContainerPorts(pod)
}
if len(wantPorts) == 0 {
return true, nil, nil
}
existingPorts := nodeInfo.UsedPorts()
// try to see whether existingPorts and wantPorts will conflict or not
if portsConflict(existingPorts, wantPorts) {
return false, []algorithm.PredicateFailureReason{ErrPodNotFitsHostPorts}, nil
}
return true, nil, nil
}
//检查节点的标签label是否满足pod的节点亲和性node selector
// PodMatchNodeSelector checks if a pod node selector matches the node label.
func PodMatchNodeSelector(pod *v1.Pod, meta algorithm.PredicateMetadata, nodeInfo *schedulercache.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) {
node := nodeInfo.Node()
if podMatchesNodeSelectorAndAffinityTerms(pod, node) {
return true, nil, nil
}
return false, []algorithm.PredicateFailureReason{ErrNodeSelectorNotMatch}, nil
}
// podMatchesNodeSelectorAndAffinityTerms checks whether the pod is schedulable onto nodes according to
// the requirements in both NodeAffinity and nodeSelector.
func podMatchesNodeSelectorAndAffinityTerms(pod *v1.Pod, node *v1.Node) bool {
// Check if node.Labels match pod.Spec.NodeSelector.
if len(pod.Spec.NodeSelector) > 0 {
selector := labels.SelectorFromSet(pod.Spec.NodeSelector)
if !selector.Matches(labels.Set(node.Labels)) {
return false
}
}
// 1. nil NodeSelector matches all nodes (i.e. does not filter out any nodes)
// 2. nil []NodeSelectorTerm (equivalent to non-nil empty NodeSelector) matches no nodes
// 3. zero-length non-nil []NodeSelectorTerm matches no nodes also, just for simplicity
// 4. nil []NodeSelectorRequirement (equivalent to non-nil empty NodeSelectorTerm) matches no nodes
// 5. zero-length non-nil []NodeSelectorRequirement matches no nodes also, just for simplicity
// 6. non-nil empty NodeSelectorRequirement is not allowed
nodeAffinityMatches := true
affinity := pod.Spec.Affinity
if affinity != nil && affinity.NodeAffinity != nil {
nodeAffinity := affinity.NodeAffinity
// if no required NodeAffinity requirements, will do no-op, means select all nodes.
// TODO: Replace next line with subsequent commented-out line when implement RequiredDuringSchedulingRequiredDuringExecution.
if nodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution == nil {
// if nodeAffinity.RequiredDuringSchedulingRequiredDuringExecution == nil && nodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution == nil {
return true
}
// Match node selector for requiredDuringSchedulingRequiredDuringExecution.
// TODO: Uncomment this block when implement RequiredDuringSchedulingRequiredDuringExecution.
// if nodeAffinity.RequiredDuringSchedulingRequiredDuringExecution != nil {
// nodeSelectorTerms := nodeAffinity.RequiredDuringSchedulingRequiredDuringExecution.NodeSelectorTerms
// glog.V(10).Infof("Match for RequiredDuringSchedulingRequiredDuringExecution node selector terms %+v", nodeSelectorTerms)
// nodeAffinityMatches = nodeMatchesNodeSelectorTerms(node, nodeSelectorTerms)
// }
// Match node selector for requiredDuringSchedulingIgnoredDuringExecution.
if nodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution != nil {
nodeSelectorTerms := nodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms
glog.V(10).Infof("Match for RequiredDuringSchedulingIgnoredDuringExecution node selector terms %+v", nodeSelectorTerms)
nodeAffinityMatches = nodeAffinityMatches && nodeMatchesNodeSelectorTerms(node, nodeSelectorTerms)
}
}
return nodeAffinityMatches
}
优选策略实现的定义如下: 这里可以看到策略中定义了一个map方法,一个reduce方法,一个计算权重。一般来说map方法先对每个节点根据某些调度条件分别积分,然后reduce方法对所有节点的积分情况应用某个算法统筹计算一个最终得分,该得分范围为0~10,最终优先级还会根据计算权重进行加权。
// PriorityFunctionFactory2 produces map & reduce priority functions
// from a given args.
type PriorityFunctionFactory2 func(PluginFactoryArgs) (algorithm.PriorityMapFunction, algorithm.PriorityReduceFunction)
// PriorityConfigFactory produces a PriorityConfig from the given function and weight
type PriorityConfigFactory struct {
Function PriorityFunctionFactory //老版本实现、即将废弃
MapReduceFunction PriorityFunctionFactory2
Weight int
}
SelectorSpreadPriority优选策略是根据对属于同一services、RCs、RSs、StatefulSet的pod在节点部署的分散程度来打分选取最佳部署节点的策略(默认各pod实例在集群各节点部署越分散越好),其定义和实现如下: 可以看到通过构建一个SelectorSpread类型对象来实现对spread优先级的计算
factory.RegisterPriorityConfigFactory(
"SelectorSpreadPriority",
factory.PriorityConfigFactory{
MapReduceFunction: func(args factory.PluginFactoryArgs) (algorithm.PriorityMapFunction, algorithm.PriorityReduceFunction) {
return priorities.NewSelectorSpreadPriority(args.ServiceLister, args.ControllerLister, args.ReplicaSetLister, args.StatefulSetLister)
},
Weight: 1,
},
),
// NewSelectorSpreadPriority creates a SelectorSpread.
func NewSelectorSpreadPriority(
serviceLister algorithm.ServiceLister,
controllerLister algorithm.ControllerLister,
replicaSetLister algorithm.ReplicaSetLister,
statefulSetLister algorithm.StatefulSetLister) (algorithm.PriorityMapFunction, algorithm.PriorityReduceFunction) {
selectorSpread := &SelectorSpread{
serviceLister: serviceLister,
controllerLister: controllerLister,
replicaSetLister: replicaSetLister,
statefulSetLister: statefulSetLister,
}
return selectorSpread.CalculateSpreadPriorityMap, selectorSpread.CalculateSpreadPriorityReduce
}
接下来重点关注SelectorSpread的两个成员方法CalculateSpreadPriorityMap和CalculateSpreadPriorityReduce其对Spread优先级的计算方法实现,首先由map方法对每个节点上该Pod的Spread情况分别积分,然后由reduce根据所有节点对该Pod的Spread积分情况,应用某个算法,统筹打分,最终每个节点的得分为0~10分。
// CalculateSpreadPriorityMap spreads pods across hosts, considering pods
// belonging to the same service,RC,RS or StatefulSet.
// When a pod is scheduled, it looks for services, RCs,RSs and StatefulSets that match the pod,
// then finds existing pods that match those selectors.
// It favors nodes that have fewer existing matching pods.
// i.e. it pushes the scheduler towards a node where there's the smallest number of
// pods which match the same service, RC,RSs or StatefulSets selectors as the pod being scheduled.
func (s *SelectorSpread) CalculateSpreadPriorityMap(pod *v1.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (schedulerapi.HostPriority, error) {
var selectors []labels.Selector
node := nodeInfo.Node()
//找到pod所属的所有Services\RCs\RSs\StatefulSets的slectors
priorityMeta, ok := meta.(*priorityMetadata)
if ok {
selectors = priorityMeta.podSelectors
} else {
selectors = getSelectors(pod, s.serviceLister, s.controllerLister, s.replicaSetLister, s.statefulSetLister)
}
if len(selectors) == 0 {
return schedulerapi.HostPriority{
Host: node.Name,
Score: int(0),
}, nil
}
//轮询当前节点下所有pod,如果匹配任何一个上面的selectors则计数+1
count := int(0)
for _, nodePod := range nodeInfo.Pods() {
if pod.Namespace != nodePod.Namespace {
continue
}
for _, selector := range selectors {
if selector.Matches(labels.Set(nodePod.ObjectMeta.Labels)) {
count++
break
}
}
}
//返回该节点和节点上pod匹配计算得分的映射对信息
return schedulerapi.HostPriority{
Host: node.Name,
Score: int(count),
}, nil
}
// CalculateSpreadPriorityReduce calculates the source of each node
// based on the number of existing matching pods on the node
// where zone information is included on the nodes, it favors nodes
// in zones with fewer existing matching pods.
func (s *SelectorSpread) CalculateSpreadPriorityReduce(pod *v1.Pod, meta interface{}, nodeNameToInfo map[string]*schedulercache.NodeInfo, result schedulerapi.HostPriorityList) error {
countsByZone := make(map[string]int, 10)
maxCountByZone := int(0)
maxCountByNodeName := int(0)
//计算所有节点中最大得分,如果节点有分区设置的话,那么计算所有分区内节点的得分统计结果
for i := range result {
if result[i].Score > maxCountByNodeName {
maxCountByNodeName = result[i].Score
}
zoneID := utilnode.GetZoneKey(nodeNameToInfo[result[i].Host].Node())
if zoneID == "" {
continue
}
countsByZone[zoneID] += result[i].Score
}
//计算所有分区中的最大得分
for zoneID := range countsByZone {
if countsByZone[zoneID] > maxCountByZone {
maxCountByZone = countsByZone[zoneID]
}
}
haveZones := len(countsByZone) != 0
maxCountByNodeNameFloat64 := float64(maxCountByNodeName)
maxCountByZoneFloat64 := float64(maxCountByZone)
MaxPriorityFloat64 := float64(schedulerapi.MaxPriority)
for i := range result {
// initializing to the default/max node score of maxPriority
fScore := MaxPriorityFloat64 //初始满分为10
if maxCountByNodeName > 0 {
//根据节点最大得分,按照满分为10,重新计算最终优先级评分,这里可以看到前面节点得分越高,说明spread越差,那么此时最终优先级评分越低为0,前面节点得分为0,说明spread越好,此时最终优先级评为为满分10
fScore = MaxPriorityFloat64 * (float64(maxCountByNodeName-result[i].Score) / maxCountByNodeNameFloat64)
}
// If there is zone information present, incorporate it
if haveZones {
zoneID := utilnode.GetZoneKey(nodeNameToInfo[result[i].Host].Node())
if zoneID != "" {
zoneScore := MaxPriorityFloat64
if maxCountByZone > 0 {
zoneScore = MaxPriorityFloat64 * (float64(maxCountByZone-countsByZone[zoneID]) / maxCountByZoneFloat64)
}
fScore = (fScore * (1.0 - zoneWeighting)) + (zoneWeighting * zoneScore)
}
}
result[i].Score = int(fScore)
}
return nil
}