From da2e81120f7336d9633a98523e05d91f5750434f Mon Sep 17 00:00:00 2001 From: Andrey Smirnov Date: Tue, 14 Jan 2025 19:32:59 +0400 Subject: [PATCH] fix: add informer resync period for node status watcher Also use a constant everywhere in informers. Add some debug logs. Might fix #9991 Signed-off-by: Andrey Smirnov --- .../app/machined/pkg/controllers/k8s/endpoint.go | 2 +- .../controllers/k8s/internal/nodewatch/nodewatch.go | 12 ++++++++++-- .../app/machined/pkg/controllers/k8s/node_status.go | 4 ++-- internal/integration/base/k8s.go | 10 +++++++--- internal/pkg/discovery/registry/kubernetes.go | 3 +-- pkg/machinery/constants/constants.go | 3 +++ 6 files changed, 24 insertions(+), 10 deletions(-) diff --git a/internal/app/machined/pkg/controllers/k8s/endpoint.go b/internal/app/machined/pkg/controllers/k8s/endpoint.go index fbf1450e01..9add5d3f1b 100644 --- a/internal/app/machined/pkg/controllers/k8s/endpoint.go +++ b/internal/app/machined/pkg/controllers/k8s/endpoint.go @@ -269,7 +269,7 @@ func (ctrl *EndpointController) watchKubernetesEndpoint(ctx context.Context, r c func kubernetesEndpointWatcher(ctx context.Context, logger *zap.Logger, client *kubernetes.Client) (chan *corev1.Endpoints, func(), error) { informerFactory := informers.NewSharedInformerFactoryWithOptions( - client.Clientset, 30*time.Second, + client.Clientset, constants.KubernetesInformerDefaultResyncPeriod, informers.WithNamespace(corev1.NamespaceDefault), informers.WithTweakListOptions(func(options *v1.ListOptions) { options.FieldSelector = fields.OneTermEqualSelector("metadata.name", "kubernetes").String() diff --git a/internal/app/machined/pkg/controllers/k8s/internal/nodewatch/nodewatch.go b/internal/app/machined/pkg/controllers/k8s/internal/nodewatch/nodewatch.go index 71672749e2..9c0d042ecf 100644 --- a/internal/app/machined/pkg/controllers/k8s/internal/nodewatch/nodewatch.go +++ b/internal/app/machined/pkg/controllers/k8s/internal/nodewatch/nodewatch.go @@ -9,6 +9,7 @@ import ( "context" "fmt" + "go.uber.org/zap" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" @@ -17,6 +18,7 @@ import ( "k8s.io/client-go/tools/cache" "github.com/siderolabs/talos/pkg/kubernetes" + "github.com/siderolabs/talos/pkg/machinery/constants" ) // NodeWatcher defines a NodeWatcher-based node watcher. @@ -46,10 +48,12 @@ func (r *NodeWatcher) Get() (*corev1.Node, error) { } // Watch starts watching Node state and notifies on updates via notify channel. -func (r *NodeWatcher) Watch(ctx context.Context) (<-chan struct{}, <-chan error, func(), error) { +func (r *NodeWatcher) Watch(ctx context.Context, logger *zap.Logger) (<-chan struct{}, <-chan error, func(), error) { + logger.Debug("starting node watcher", zap.String("nodename", r.nodename)) + informerFactory := informers.NewSharedInformerFactoryWithOptions( r.client.Clientset, - 0, + constants.KubernetesInformerDefaultResyncPeriod, informers.WithTweakListOptions( func(opts *metav1.ListOptions) { opts.FieldSelector = fields.OneTermEqualSelector(metav1.ObjectNameField, r.nodename).String() @@ -88,7 +92,11 @@ func (r *NodeWatcher) Watch(ctx context.Context) (<-chan struct{}, <-chan error, informerFactory.Start(ctx.Done()) + logger.Debug("waiting for node cache sync") + informerFactory.WaitForCacheSync(ctx.Done()) + logger.Debug("node cache sync done") + return notifyCh, watchErrCh, informerFactory.Shutdown, nil } diff --git a/internal/app/machined/pkg/controllers/k8s/node_status.go b/internal/app/machined/pkg/controllers/k8s/node_status.go index 985abbc4db..02bb00817b 100644 --- a/internal/app/machined/pkg/controllers/k8s/node_status.go +++ b/internal/app/machined/pkg/controllers/k8s/node_status.go @@ -158,9 +158,9 @@ func (ctrl *NodeStatusController) Run(ctx context.Context, r controller.Runtime, var watchCtx context.Context watchCtx, watchCtxCancel = context.WithCancel(ctx) //nolint:govet - notifyCh, watchErrCh, notifyCloser, err = nodewatcher.Watch(watchCtx) + notifyCh, watchErrCh, notifyCloser, err = nodewatcher.Watch(watchCtx, logger) if err != nil { - return fmt.Errorf("error setting up registry watcher: %w", err) //nolint:govet + return fmt.Errorf("error setting up node watcher: %w", err) //nolint:govet } } diff --git a/internal/integration/base/k8s.go b/internal/integration/base/k8s.go index a984d86db4..184ef0bcdd 100644 --- a/internal/integration/base/k8s.go +++ b/internal/integration/base/k8s.go @@ -51,6 +51,7 @@ import ( "k8s.io/kubectl/pkg/scheme" taloskubernetes "github.com/siderolabs/talos/pkg/kubernetes" + "github.com/siderolabs/talos/pkg/machinery/constants" ) // K8sSuite is a base suite for K8s tests. @@ -813,9 +814,12 @@ func (k8sSuite *K8sSuite) SetupNodeInformer(ctx context.Context, nodeName string watchCh := make(chan *corev1.Node) - informerFactory := informers.NewSharedInformerFactoryWithOptions(k8sSuite.Clientset, 30*time.Second, informers.WithTweakListOptions(func(options *metav1.ListOptions) { - options.FieldSelector = metadataKeyName + nodeName - })) + informerFactory := informers.NewSharedInformerFactoryWithOptions( + k8sSuite.Clientset, constants.KubernetesInformerDefaultResyncPeriod, + informers.WithTweakListOptions(func(options *metav1.ListOptions) { + options.FieldSelector = metadataKeyName + nodeName + }), + ) nodeInformer := informerFactory.Core().V1().Nodes().Informer() _, err := nodeInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ diff --git a/internal/pkg/discovery/registry/kubernetes.go b/internal/pkg/discovery/registry/kubernetes.go index 13a4ffccfe..ecf1d05910 100644 --- a/internal/pkg/discovery/registry/kubernetes.go +++ b/internal/pkg/discovery/registry/kubernetes.go @@ -12,7 +12,6 @@ import ( "net/netip" "strconv" "strings" - "time" "github.com/siderolabs/gen/value" "github.com/siderolabs/gen/xslices" @@ -265,7 +264,7 @@ func (r *Kubernetes) List(localNodeName string) ([]*cluster.AffiliateSpec, error // Watch starts watching Node state and notifies on updates via notify channel. func (r *Kubernetes) Watch(ctx context.Context, logger *zap.Logger) (<-chan struct{}, func(), error) { - informerFactory := informers.NewSharedInformerFactory(r.client.Clientset, 30*time.Second) + informerFactory := informers.NewSharedInformerFactory(r.client.Clientset, constants.KubernetesInformerDefaultResyncPeriod) notifyCh := make(chan struct{}, 1) diff --git a/pkg/machinery/constants/constants.go b/pkg/machinery/constants/constants.go index 14dd527582..166dd5c819 100644 --- a/pkg/machinery/constants/constants.go +++ b/pkg/machinery/constants/constants.go @@ -1238,6 +1238,9 @@ const ( // RegistrydListenAddress is the address to listen on for the registryd service. RegistrydListenAddress = "127.0.0.1:3172" + + // KubernetesInformerDefaultResyncPeriod is the default resync period for Kubernetes informers. + KubernetesInformerDefaultResyncPeriod = 30 * time.Second ) // See https://linux.die.net/man/3/klogctl