Skip to content

Commit

Permalink
Make sure dry runs sees all the resources a normal run would do (#1526)
Browse files Browse the repository at this point in the history
* generic resource handling, so that dry run has all the expected resource types and objects

* simpler code and better names

* fix imports
  • Loading branch information
john7doe authored Oct 4, 2024
1 parent e1e537d commit 22d9230
Showing 1 changed file with 62 additions and 74 deletions.
136 changes: 62 additions & 74 deletions pkg/descheduler/descheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ import (
"strconv"
"time"

schedulingv1 "k8s.io/api/scheduling/v1"
"k8s.io/apimachinery/pkg/runtime/schema"

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"k8s.io/apimachinery/pkg/api/meta"
Expand All @@ -34,15 +37,12 @@ import (

v1 "k8s.io/api/core/v1"
policy "k8s.io/api/policy/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
utilversion "k8s.io/apimachinery/pkg/util/version"
"k8s.io/apimachinery/pkg/util/wait"
clientset "k8s.io/client-go/kubernetes"
fakeclientset "k8s.io/client-go/kubernetes/fake"
listersv1 "k8s.io/client-go/listers/core/v1"
schedulingv1 "k8s.io/client-go/listers/scheduling/v1"
core "k8s.io/client-go/testing"

"sigs.k8s.io/descheduler/pkg/descheduler/client"
Expand Down Expand Up @@ -71,10 +71,7 @@ type profileRunner struct {

type descheduler struct {
rs *options.DeschedulerServer
podLister listersv1.PodLister
nodeLister listersv1.NodeLister
namespaceLister listersv1.NamespaceLister
priorityClassLister schedulingv1.PriorityClassLister
ir *informerResources
getPodsAssignedToNode podutil.GetPodsAssignedToNodeFunc
sharedInformerFactory informers.SharedInformerFactory
deschedulerPolicy *api.DeschedulerPolicy
Expand All @@ -83,12 +80,60 @@ type descheduler struct {
podEvictionReactionFnc func(*fakeclientset.Clientset) func(action core.Action) (bool, runtime.Object, error)
}

type informerResources struct {
sharedInformerFactory informers.SharedInformerFactory
resourceToInformer map[schema.GroupVersionResource]informers.GenericInformer
}

func newInformerResources(sharedInformerFactory informers.SharedInformerFactory) *informerResources {
return &informerResources{
sharedInformerFactory: sharedInformerFactory,
resourceToInformer: make(map[schema.GroupVersionResource]informers.GenericInformer),
}
}

func (ir *informerResources) Uses(resources ...schema.GroupVersionResource) error {
for _, resource := range resources {
informer, err := ir.sharedInformerFactory.ForResource(resource)
if err != nil {
return err
}

ir.resourceToInformer[resource] = informer
}
return nil
}

// CopyTo Copy informer subscriptions to the new factory and objects to the fake client so that the backing caches are populated for when listers are used.
func (ir *informerResources) CopyTo(fakeClient *fakeclientset.Clientset, newFactory informers.SharedInformerFactory) error {
for resource, informer := range ir.resourceToInformer {
_, err := newFactory.ForResource(resource)
if err != nil {
return fmt.Errorf("error getting resource %s: %w", resource, err)
}

objects, err := informer.Lister().List(labels.Everything())
if err != nil {
return fmt.Errorf("error listing %s: %w", informer, err)
}

for _, object := range objects {
fakeClient.Tracker().Add(object)
}
}
return nil
}

func newDescheduler(rs *options.DeschedulerServer, deschedulerPolicy *api.DeschedulerPolicy, evictionPolicyGroupVersion string, eventRecorder events.EventRecorder, sharedInformerFactory informers.SharedInformerFactory) (*descheduler, error) {
podInformer := sharedInformerFactory.Core().V1().Pods().Informer()
podLister := sharedInformerFactory.Core().V1().Pods().Lister()
nodeLister := sharedInformerFactory.Core().V1().Nodes().Lister()
namespaceLister := sharedInformerFactory.Core().V1().Namespaces().Lister()
priorityClassLister := sharedInformerFactory.Scheduling().V1().PriorityClasses().Lister()

ir := newInformerResources(sharedInformerFactory)
ir.Uses(v1.SchemeGroupVersion.WithResource("pods"),
v1.SchemeGroupVersion.WithResource("nodes"),
// Future work could be to let each plugin declare what type of resources it needs; that way dry runs would stay
// consistent with the real runs without having to keep the list here in sync.
v1.SchemeGroupVersion.WithResource("namespaces"), // Used by the defaultevictor plugin
schedulingv1.SchemeGroupVersion.WithResource("priorityclasses")) // Used by the defaultevictor plugin

getPodsAssignedToNode, err := podutil.BuildGetPodsAssignedToNodeFunc(podInformer)
if err != nil {
Expand All @@ -109,10 +154,7 @@ func newDescheduler(rs *options.DeschedulerServer, deschedulerPolicy *api.Desche

return &descheduler{
rs: rs,
podLister: podLister,
nodeLister: nodeLister,
namespaceLister: namespaceLister,
priorityClassLister: priorityClassLister,
ir: ir,
getPodsAssignedToNode: getPodsAssignedToNode,
sharedInformerFactory: sharedInformerFactory,
deschedulerPolicy: deschedulerPolicy,
Expand Down Expand Up @@ -146,13 +188,14 @@ func (d *descheduler) runDeschedulerLoop(ctx context.Context, nodes []*v1.Node)
fakeClient := fakeclientset.NewSimpleClientset()
// simulate a pod eviction by deleting a pod
fakeClient.PrependReactor("create", "pods", d.podEvictionReactionFnc(fakeClient))
err := cachedClient(d.rs.Client, fakeClient, d.podLister, d.nodeLister, d.namespaceLister, d.priorityClassLister)
fakeSharedInformerFactory := informers.NewSharedInformerFactory(fakeClient, 0)

err := d.ir.CopyTo(fakeClient, fakeSharedInformerFactory)
if err != nil {
return err
}

// create a new instance of the shared informer factor from the cached client
fakeSharedInformerFactory := informers.NewSharedInformerFactory(fakeClient, 0)
// register the pod informer, otherwise it will not get running
d.getPodsAssignedToNode, err = podutil.BuildGetPodsAssignedToNodeFunc(fakeSharedInformerFactory.Core().V1().Pods().Informer())
if err != nil {
Expand Down Expand Up @@ -336,62 +379,6 @@ func podEvictionReactionFnc(fakeClient *fakeclientset.Clientset) func(action cor
}
}

func cachedClient(
realClient clientset.Interface,
fakeClient *fakeclientset.Clientset,
podLister listersv1.PodLister,
nodeLister listersv1.NodeLister,
namespaceLister listersv1.NamespaceLister,
priorityClassLister schedulingv1.PriorityClassLister,
) error {
klog.V(3).Infof("Pulling resources for the cached client from the cluster")
pods, err := podLister.List(labels.Everything())
if err != nil {
return fmt.Errorf("unable to list pods: %v", err)
}

for _, item := range pods {
if _, err := fakeClient.CoreV1().Pods(item.Namespace).Create(context.TODO(), item, metav1.CreateOptions{}); err != nil {
return fmt.Errorf("unable to copy pod: %v", err)
}
}

nodes, err := nodeLister.List(labels.Everything())
if err != nil {
return fmt.Errorf("unable to list nodes: %v", err)
}

for _, item := range nodes {
if _, err := fakeClient.CoreV1().Nodes().Create(context.TODO(), item, metav1.CreateOptions{}); err != nil {
return fmt.Errorf("unable to copy node: %v", err)
}
}

namespaces, err := namespaceLister.List(labels.Everything())
if err != nil {
return fmt.Errorf("unable to list namespaces: %v", err)
}

for _, item := range namespaces {
if _, err := fakeClient.CoreV1().Namespaces().Create(context.TODO(), item, metav1.CreateOptions{}); err != nil {
return fmt.Errorf("unable to copy namespace: %v", err)
}
}

priorityClasses, err := priorityClassLister.List(labels.Everything())
if err != nil {
return fmt.Errorf("unable to list priorityclasses: %v", err)
}

for _, item := range priorityClasses {
if _, err := fakeClient.SchedulingV1().PriorityClasses().Create(context.TODO(), item, metav1.CreateOptions{}); err != nil {
return fmt.Errorf("unable to copy priorityclass: %v", err)
}
}

return nil
}

func RunDeschedulerStrategies(ctx context.Context, rs *options.DeschedulerServer, deschedulerPolicy *api.DeschedulerPolicy, evictionPolicyGroupVersion string) error {
var span trace.Span
ctx, span = tracing.Tracer().Start(ctx, "RunDeschedulerStrategies")
Expand Down Expand Up @@ -428,7 +415,8 @@ func RunDeschedulerStrategies(ctx context.Context, rs *options.DeschedulerServer
// A next context is created here intentionally to avoid nesting the spans via context.
sCtx, sSpan := tracing.Tracer().Start(ctx, "NonSlidingUntil")
defer sSpan.End()
nodes, err := nodeutil.ReadyNodes(sCtx, rs.Client, descheduler.nodeLister, nodeSelector)

nodes, err := nodeutil.ReadyNodes(sCtx, rs.Client, descheduler.sharedInformerFactory.Core().V1().Nodes().Lister(), nodeSelector)
if err != nil {
sSpan.AddEvent("Failed to detect ready nodes", trace.WithAttributes(attribute.String("err", err.Error())))
klog.Error(err)
Expand Down

0 comments on commit 22d9230

Please sign in to comment.