Skip to content
This repository has been archived by the owner on Jul 22, 2024. It is now read-only.

Fix Node Selector Resource Plugin #10

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ require (
k8s.io/apimachinery v0.29.1
k8s.io/client-go v0.29.1
k8s.io/klog/v2 v2.110.1
k8s.io/kubectl v0.29.1
sigs.k8s.io/controller-runtime v0.17.0
)

Expand Down Expand Up @@ -67,6 +66,7 @@ require (
k8s.io/apiextensions-apiserver v0.29.0 // indirect
k8s.io/component-base v0.29.1 // indirect
k8s.io/kube-openapi v0.0.0-20231010175941-2dd684a91f00 // indirect
k8s.io/kubectl v0.29.1 // indirect
k8s.io/metrics v0.29.1 // indirect
k8s.io/utils v0.0.0-20230726121419-3b25d923346b // indirect
sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect
Expand Down
87 changes: 1 addition & 86 deletions pkg/node-labels-resources/grpc_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/selection"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
Expand All @@ -22,7 +21,6 @@ import (
v1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"
resourcehelper "k8s.io/kubectl/pkg/util/resource"
ctrl "sigs.k8s.io/controller-runtime"

"github.com/liqotech/liqo/pkg/consts"
Expand All @@ -34,7 +32,6 @@ import (
type NodeDetails struct {
Schedulable bool
Allocatable corev1.ResourceList
Pods map[string]corev1.ResourceList
}

type nodeLabelsMonitor struct {
Expand All @@ -43,7 +40,6 @@ type nodeLabelsMonitor struct {
subscribers sync.Map
nodeLabels map[string]string
k8sNodeClient v1.NodeInterface
k8sPodClient v1.PodInterface
allocatable corev1.ResourceList
nodeMutex sync.RWMutex
ctx context.Context
Expand Down Expand Up @@ -74,28 +70,16 @@ func ListenAndServeGRPCServer(port int, nodeLabels map[string]string, clientset
options.LabelSelector = labelSelector.String()
}

// this function is used to filter and ignore shadow pods at informer level.
var noShadowPodsFilter = func(options *metav1.ListOptions) {
req, err := labels.NewRequirement(consts.LocalPodLabelKey, selection.NotEquals, []string{consts.LocalPodLabelValue})
utilruntime.Must(err)
options.LabelSelector = labels.NewSelector().Add(*req).String()
options.FieldSelector = fields.OneTermEqualSelector("status.phase", string(corev1.PodRunning)).String()
}

nodeFactory := informers.NewSharedInformerFactoryWithOptions(
clientset, resyncPeriod, informers.WithTweakListOptions(noVirtualNodesFilter),
)
nodeInformer := nodeFactory.Core().V1().Nodes().Informer()
podFactory := informers.NewSharedInformerFactoryWithOptions(
clientset, resyncPeriod, informers.WithTweakListOptions(noShadowPodsFilter),
)
podInformer := podFactory.Core().V1().Pods().Informer()

s := nodeLabelsMonitor{
Server: grpc.NewServer(),
nodeLabels: nodeLabels,
allocatable: corev1.ResourceList{},
k8sNodeClient: clientset.CoreV1().Nodes(),
k8sPodClient: clientset.CoreV1().Pods(corev1.NamespaceAll),
ctx: ctx,
resourceLists: map[string]NodeDetails{},
}
Expand All @@ -105,16 +89,8 @@ func ListenAndServeGRPCServer(port int, nodeLabels map[string]string, clientset
DeleteFunc: s.onNodeDelete,
})
utilruntime.Must(err)
_, err = podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: s.onPodAdd,
// We do not care about update events, since resources are immutable.
DeleteFunc: s.onPodDelete,
})
utilruntime.Must(err)
nodeFactory.Start(ctx.Done())
nodeFactory.WaitForCacheSync(ctx.Done())
podFactory.Start(ctx.Done())
podFactory.WaitForCacheSync(ctx.Done())

resourcemonitors.RegisterResourceReaderServer(s.Server, &s)
if err := s.Server.Serve(lis); err != nil {
Expand All @@ -131,17 +107,8 @@ func (nlm *nodeLabelsMonitor) onNodeAdd(obj interface{}) {
klog.V(4).Infof("Adding Node %s", node.Name)
nlm.resourceLists[node.Name] = NodeDetails{
Allocatable: *toAdd,
Pods: make(map[string]corev1.ResourceList),
Schedulable: utils.IsNodeReady(node) && !node.Spec.Unschedulable,
}
pods, err := nlm.k8sPodClient.List(nlm.ctx, metav1.ListOptions{FieldSelector: "spec.nodeName=" + node.Name})
if err != nil {
klog.Errorf("Failed to list pods for node %s: %v", node.Name, err)
return
}
for i := range pods.Items {
nlm.onPodAdd(&pods.Items[i])
}
nlm.writeClusterResources()
}

Expand All @@ -156,7 +123,6 @@ func (nlm *nodeLabelsMonitor) onNodeUpdate(oldObj, newObj interface{}) {
if !ok {
nlm.resourceLists[newNode.Name] = NodeDetails{
Allocatable: newNodeResources,
Pods: make(map[string]corev1.ResourceList),
Schedulable: true,
}
} else {
Expand Down Expand Up @@ -189,60 +155,14 @@ func (nlm *nodeLabelsMonitor) onNodeDelete(obj interface{}) {
nlm.writeClusterResources()
}

func (nlm *nodeLabelsMonitor) onPodAdd(obj interface{}) {
// Thanks to the filters at the informer level, add events are received only when pods running on physical nodes turn running.
podAdded, ok := obj.(*corev1.Pod)
if !ok {
klog.Error("OnPodAdd: Failed to cast to *corev1.Pod type")
return
}
podResources := extractPodResources(podAdded)
podNodeName := podAdded.Spec.NodeName
nodeDetail, ok := nlm.resourceLists[podNodeName]
if ok {
_, podOk := nodeDetail.Pods[podAdded.Name]
if !podOk {
nodeDetail.Pods[podAdded.Name] = podResources
nlm.resourceLists[podNodeName] = nodeDetail
}
} else {
klog.V(4).Infof("OnPodAdd: Failed to find node %s in resourceLists", podNodeName)
}
nlm.writeClusterResources()
}

func (nlm *nodeLabelsMonitor) onPodDelete(obj interface{}) {
// Thanks to the filters at the informer level, delete events are received only when
// pods previously running on a physical node are no longer running.
podDeleted, ok := obj.(*corev1.Pod)
if !ok {
klog.Errorf("OnPodDelete: Failed to cast to *corev1.Pod type")
return
}
podNodeName := podDeleted.Spec.NodeName
nodeDetail, ok := nlm.resourceLists[podNodeName]
if ok {
delete(nodeDetail.Pods, podDeleted.Name)
nlm.resourceLists[podNodeName] = nodeDetail
} else {
klog.V(4).Infof("OnPodDelete: Failed to find node %s in resourceLists", podNodeName)
}
nlm.writeClusterResources()
}

func (nlm *nodeLabelsMonitor) writeClusterResources() {
podResourceUsage := corev1.ResourceList{}
nodeAllocatable := corev1.ResourceList{}
for _, nodeDetail := range nlm.resourceLists {
if !nodeDetail.Schedulable {
continue
}
addResources(nodeAllocatable, nodeDetail.Allocatable)
for _, podResource := range nodeDetail.Pods {
addResources(podResourceUsage, podResource)
}
}
subResources(nodeAllocatable, podResourceUsage)
nlm.nodeMutex.Lock()
nlm.allocatable = nodeAllocatable.DeepCopy()
klog.V(4).Infof("Cluster resources: %v", nlm.allocatable)
Expand All @@ -253,11 +173,6 @@ func (nlm *nodeLabelsMonitor) writeClusterResources() {
}
}

func extractPodResources(podToExtract *corev1.Pod) corev1.ResourceList {
resourcesToExtract, _ := resourcehelper.PodRequestsAndLimits(podToExtract)
return resourcesToExtract
}

// ReadResources receives a clusterID and returns the resources for that specific clusterID. In this version of the resource plugin
// the clusterID is ignored and the same resources are returned for every clusterID received. Since this method could be called multiple
// times it has to be idempotent.
Expand Down
Loading