Skip to content

Commit

Permalink
[release-0.33] controller-runtime, use cached/uncached kube clients (#…
Browse files Browse the repository at this point in the history
…356)

* controller-runtime, Move to use client.Client

moving to use controller-runtime client.Client instead of direct
kubernetes clientSet client. This will allow for code unity when
introducing controller-runtime cached-client in specific cases.

Signed-off-by: Ram Lavi <[email protected]>

* controller-runtime, use cached/uncached clients

Introducing cached controller runtime client in order to reduce multiple
reads of namespaces and mutatingWebhookConfiguration during
initialization that cause slowness during startup of kubemacpool manager

Signed-off-by: Ram Lavi <[email protected]>
  • Loading branch information
RamLavi authored Mar 20, 2022
1 parent decb692 commit 887b8a2
Show file tree
Hide file tree
Showing 219 changed files with 859 additions and 17,144 deletions.
2 changes: 2 additions & 0 deletions config/default/rbac/rbac_role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -97,3 +97,5 @@ rules:
- namespaces
verbs:
- get
- list
- watch
21 changes: 18 additions & 3 deletions config/release/kubemacpool.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,8 @@ rules:
- namespaces
verbs:
- get
- list
- watch
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
Expand Down Expand Up @@ -368,9 +370,6 @@ spec:
- containerPort: 8000
name: webhook-server
protocol: TCP
- containerPort: 8080
name: metrics
protocol: TCP
readinessProbe:
httpGet:
httpHeaders:
Expand All @@ -389,6 +388,22 @@ spec:
- mountPath: /tmp/k8s-webhook-server/serving-certs/
name: tls-key-pair
readOnly: true
- args:
- --logtostderr
- --secure-listen-address=:8443
- --upstream=http://127.0.0.1:8080
image: quay.io/openshift/origin-kube-rbac-proxy:4.10.0
imagePullPolicy: IfNotPresent
name: kube-rbac-proxy
ports:
- containerPort: 8443
name: metrics
protocol: TCP
resources:
requests:
cpu: 10m
memory: 20Mi
terminationMessagePolicy: FallbackToLogsOnError
priorityClassName: system-cluster-critical
restartPolicy: Always
terminationGracePeriodSeconds: 5
Expand Down
21 changes: 18 additions & 3 deletions config/test/kubemacpool.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,8 @@ rules:
- namespaces
verbs:
- get
- list
- watch
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
Expand Down Expand Up @@ -369,9 +371,6 @@ spec:
- containerPort: 8000
name: webhook-server
protocol: TCP
- containerPort: 8080
name: metrics
protocol: TCP
readinessProbe:
httpGet:
httpHeaders:
Expand All @@ -390,6 +389,22 @@ spec:
- mountPath: /tmp/k8s-webhook-server/serving-certs/
name: tls-key-pair
readOnly: true
- args:
- --logtostderr
- --secure-listen-address=:8443
- --upstream=http://127.0.0.1:8080
image: quay.io/openshift/origin-kube-rbac-proxy:4.10.0
imagePullPolicy: IfNotPresent
name: kube-rbac-proxy
ports:
- containerPort: 8443
name: metrics
protocol: TCP
resources:
requests:
cpu: 10m
memory: 20Mi
terminationMessagePolicy: FallbackToLogsOnError
priorityClassName: system-cluster-critical
restartPolicy: Always
terminationGracePeriodSeconds: 5
Expand Down
41 changes: 40 additions & 1 deletion pkg/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package manager

import (
"context"
"fmt"
"net"
"os"
"os/signal"
Expand All @@ -27,7 +28,10 @@ import (
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
kubevirt_api "kubevirt.io/client-go/api/v1"
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/config"
"sigs.k8s.io/controller-runtime/pkg/cluster"
logf "sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/manager"

Expand Down Expand Up @@ -93,7 +97,42 @@ func (k *KubeMacPoolManager) Run(rangeStart, rangeEnd net.HardwareAddr) error {
}

isKubevirtInstalled := checkForKubevirt(k.clientset)
poolManager, err := poolmanager.NewPoolManager(k.clientset, rangeStart, rangeEnd, k.podNamespace, isKubevirtInstalled, k.waitingTime)

log.Info("Constructing cache")
cache, err := cache.New(k.config, cache.Options{
Scheme: k.runtimeManager.GetScheme(),
Mapper: k.runtimeManager.GetRESTMapper(),
})
if err != nil {
return errors.Wrap(err, "failed constructing pool manager cache")
}
log.Info("Starting cache")
go func() {
if err = cache.Start(context.TODO()); err != nil {
panic(errors.Wrap(err, "failed staring pool manager cache"))
}
}()
log.Info("Waiting for cache sync")
ok := cache.WaitForCacheSync(context.TODO())
if !ok {
return fmt.Errorf("cannot wait for controller-runtime manager cache sync")
}
log.Info("Building client")
cachedClient, err := cluster.NewClientBuilder().Build(cache, k.config, client.Options{
Scheme: k.runtimeManager.GetScheme(),
Mapper: k.runtimeManager.GetRESTMapper(),
})
if err != nil {
return errors.Wrap(err, "failed creating pool manager client")
}
client, err := client.New(k.config, client.Options{
Scheme: k.runtimeManager.GetScheme(),
Mapper: k.runtimeManager.GetRESTMapper(),
})
if err != nil {
return errors.Wrap(err, "failed creating pool manager client")
}
poolManager, err := poolmanager.NewPoolManager(client, cachedClient, rangeStart, rangeEnd, k.podNamespace, isKubevirtInstalled, k.waitingTime)
if err != nil {
return errors.Wrap(err, "unable to create pool manager")
}
Expand Down
13 changes: 11 additions & 2 deletions pkg/pool-manager/migratelegacyconfigmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@ import (
"strings"
"time"

corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/k8snetworkplumbingwg/kubemacpool/pkg/names"
)
Expand Down Expand Up @@ -78,7 +80,8 @@ func (m *macMap) createOrUpdateDummyEntryWithTimestamp(macAddress string, timest

// getVmMacWaitMap return a config map that contains mac address and the allocation time.
func (p *PoolManager) getVmMacWaitMap() (map[string]string, error) {
configMap, err := p.kubeClient.CoreV1().ConfigMaps(p.managerNamespace).Get(context.TODO(), names.WAITING_VMS_CONFIGMAP, metav1.GetOptions{})
configMap := corev1.ConfigMap{}
err := p.kubeClient.Get(context.TODO(), client.ObjectKey{Namespace: p.managerNamespace, Name: names.WAITING_VMS_CONFIGMAP}, &configMap)
if err != nil {
return nil, err
}
Expand All @@ -87,5 +90,11 @@ func (p *PoolManager) getVmMacWaitMap() (map[string]string, error) {
}

func (p *PoolManager) deleteVmMacWaitConfigMap() error {
return p.kubeClient.CoreV1().ConfigMaps(p.managerNamespace).Delete(context.TODO(), names.WAITING_VMS_CONFIGMAP, metav1.DeleteOptions{})
configMap := corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Namespace: p.managerNamespace,
Name: names.WAITING_VMS_CONFIGMAP,
},
}
return p.kubeClient.Delete(context.TODO(), &configMap)
}
15 changes: 9 additions & 6 deletions pkg/pool-manager/migratelegacyconfigmap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,22 +13,22 @@ import (
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/kubernetes/scheme"
"sigs.k8s.io/controller-runtime/pkg/client/fake"

"github.com/k8snetworkplumbingwg/kubemacpool/pkg/names"
)

var _ = Describe("migrate legacy vm configMap", func() {
legacyVmConfigMap := v1.ConfigMap{ObjectMeta: metav1.ObjectMeta{Namespace: testManagerNamespace, Name: names.WAITING_VMS_CONFIGMAP}}
waitTimeSeconds := 10

createPoolManager := func(startMacAddr, endMacAddr string, fakeObjectsForClient ...runtime.Object) *PoolManager {
fakeClient := fake.NewSimpleClientset(fakeObjectsForClient...)
fakeClient := fake.NewClientBuilder().WithScheme(scheme.Scheme).WithRuntimeObjects(fakeObjectsForClient...).Build()
startPoolRangeEnv, err := net.ParseMAC(startMacAddr)
Expect(err).ToNot(HaveOccurred(), "should successfully parse starting mac address range")
endPoolRangeEnv, err := net.ParseMAC(endMacAddr)
Expect(err).ToNot(HaveOccurred(), "should successfully parse ending mac address range")
poolManager, err := NewPoolManager(fakeClient, startPoolRangeEnv, endPoolRangeEnv, testManagerNamespace, false, waitTimeSeconds)
poolManager, err := NewPoolManager(fakeClient, fakeClient, startPoolRangeEnv, endPoolRangeEnv, testManagerNamespace, false, waitTimeSeconds)
Expect(err).ToNot(HaveOccurred(), "should successfully initialize poolManager")
err = poolManager.Start()
Expect(err).ToNot(HaveOccurred(), "should successfully start poolManager routines")
Expand Down Expand Up @@ -64,8 +64,11 @@ var _ = Describe("migrate legacy vm configMap", func() {
table.DescribeTable("and running initMacMapFromLegacyConfigMapParams",
func(i *initMacMapFromLegacyConfigMapParams) {
By("updating configMap entries")
legacyVmConfigMap.Data = i.configMapEntries
_, err := poolManager.kubeClient.CoreV1().ConfigMaps(testManagerNamespace).Create(context.Background(), &legacyVmConfigMap, metav1.CreateOptions{})
legacyVmConfigMap := v1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{Namespace: testManagerNamespace, Name: names.WAITING_VMS_CONFIGMAP},
Data: i.configMapEntries,
}
err := poolManager.kubeClient.Create(context.Background(), &legacyVmConfigMap)
Expect(err).ToNot(HaveOccurred(), "should succeed updating the configMap")
By("initiating the macPoolMap")
Expect(poolManager.initMacMapFromLegacyConfigMap()).To(Succeed(), "should not fail migration if configMap does not exist")
Expand Down
10 changes: 8 additions & 2 deletions pkg/pool-manager/pod_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
)

const tempPodName = "tempPodName"
Expand Down Expand Up @@ -196,12 +197,17 @@ func (p *PoolManager) allocatePodFromPool(network *multus.NetworkSelectionElemen
func (p *PoolManager) paginatePodsWithLimit(limit int64, f func(pods *corev1.PodList) error) error {
continueFlag := ""
for {
pods, err := p.kubeClient.CoreV1().Pods(metav1.NamespaceAll).List(context.TODO(), metav1.ListOptions{Limit: limit, Continue: continueFlag})
pods := corev1.PodList{}
err := p.kubeClient.List(context.TODO(), &pods, &client.ListOptions{
Namespace: metav1.NamespaceAll,
Limit: limit,
Continue: continueFlag,
})
if err != nil {
return err
}

err = f(pods)
err = f(&pods)
if err != nil {
return err
}
Expand Down
26 changes: 15 additions & 11 deletions pkg/pool-manager/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,10 @@ import (

"github.com/pkg/errors"
admissionregistrationv1 "k8s.io/api/admissionregistration/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/kubernetes"
"sigs.k8s.io/controller-runtime/pkg/client"
logf "sigs.k8s.io/controller-runtime/pkg/log"

"github.com/k8snetworkplumbingwg/kubemacpool/pkg/utils"
Expand All @@ -52,10 +53,11 @@ var log = logf.Log.WithName("PoolManager")
var now = func() time.Time { return time.Now() }

type PoolManager struct {
kubeClient kubernetes.Interface // kubernetes client
rangeStart net.HardwareAddr // fist mac in range
rangeEnd net.HardwareAddr // last mac in range
currentMac net.HardwareAddr // last given mac
cachedKubeClient client.Client
kubeClient client.Client
rangeStart net.HardwareAddr // fist mac in range
rangeEnd net.HardwareAddr // last mac in range
currentMac net.HardwareAddr // last given mac
managerNamespace string
macPoolMap macMap // allocated mac map and macEntry
podToMacPoolMap map[string]map[string]string // map allocated mac address by networkname and namespace/podname: {"namespace/podname: {"network name": "mac address"}}
Expand All @@ -79,7 +81,7 @@ type macEntry struct {

type macMap map[string]macEntry

func NewPoolManager(kubeClient kubernetes.Interface, rangeStart, rangeEnd net.HardwareAddr, managerNamespace string, kubevirtExist bool, waitTime int) (*PoolManager, error) {
func NewPoolManager(kubeClient, cachedKubeClient client.Client, rangeStart, rangeEnd net.HardwareAddr, managerNamespace string, kubevirtExist bool, waitTime int) (*PoolManager, error) {
err := checkRange(rangeStart, rangeEnd)
if err != nil {
return nil, err
Expand All @@ -95,8 +97,9 @@ func NewPoolManager(kubeClient kubernetes.Interface, rangeStart, rangeEnd net.Ha

currentMac := make(net.HardwareAddr, len(rangeStart))
copy(currentMac, rangeStart)

poolManger := &PoolManager{kubeClient: kubeClient,
poolManger := &PoolManager{
cachedKubeClient: cachedKubeClient,
kubeClient: kubeClient,
isKubevirt: kubevirtExist,
rangeEnd: rangeEnd,
rangeStart: rangeStart,
Expand Down Expand Up @@ -232,8 +235,8 @@ func (p *PoolManager) isNamespaceSelectorCompatibleWithOptModeLabel(namespaceNam
if err != nil {
return false, errors.Wrap(err, "Failed to check if namespaces are managed by default by opt-mode")
}

ns, err := p.kubeClient.CoreV1().Namespaces().Get(context.TODO(), namespaceName, metav1.GetOptions{})
ns := v1.Namespace{}
err = p.cachedKubeClient.Get(context.TODO(), client.ObjectKey{Name: namespaceName}, &ns)
if err != nil {
return false, errors.Wrap(err, "Failed to get Namespace")
}
Expand All @@ -257,7 +260,8 @@ func (p *PoolManager) isNamespaceSelectorCompatibleWithOptModeLabel(namespaceNam
}

func (p *PoolManager) lookupWebhookInMutatingWebhookConfig(mutatingWebhookConfigName, webhookName string) (*admissionregistrationv1.MutatingWebhook, error) {
mutatingWebhookConfiguration, err := p.kubeClient.AdmissionregistrationV1().MutatingWebhookConfigurations().Get(context.TODO(), mutatingWebhookConfigName, metav1.GetOptions{})
mutatingWebhookConfiguration := admissionregistrationv1.MutatingWebhookConfiguration{}
err := p.cachedKubeClient.Get(context.TODO(), client.ObjectKey{Name: mutatingWebhookConfigName}, &mutatingWebhookConfiguration)
if err != nil {
return nil, errors.Wrap(err, "Failed to get mutatingWebhookConfig")
}
Expand Down
Loading

0 comments on commit 887b8a2

Please sign in to comment.