Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: add informer resync period for node status watcher #10130

Merged
merged 1 commit into from
Jan 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion internal/app/machined/pkg/controllers/k8s/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ func (ctrl *EndpointController) watchKubernetesEndpoint(ctx context.Context, r c

func kubernetesEndpointWatcher(ctx context.Context, logger *zap.Logger, client *kubernetes.Client) (chan *corev1.Endpoints, func(), error) {
informerFactory := informers.NewSharedInformerFactoryWithOptions(
client.Clientset, 30*time.Second,
client.Clientset, constants.KubernetesInformerDefaultResyncPeriod,
informers.WithNamespace(corev1.NamespaceDefault),
informers.WithTweakListOptions(func(options *v1.ListOptions) {
options.FieldSelector = fields.OneTermEqualSelector("metadata.name", "kubernetes").String()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"context"
"fmt"

"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
Expand All @@ -17,6 +18,7 @@ import (
"k8s.io/client-go/tools/cache"

"github.com/siderolabs/talos/pkg/kubernetes"
"github.com/siderolabs/talos/pkg/machinery/constants"
)

// NodeWatcher defines a NodeWatcher-based node watcher.
Expand Down Expand Up @@ -46,10 +48,12 @@ func (r *NodeWatcher) Get() (*corev1.Node, error) {
}

// Watch starts watching Node state and notifies on updates via notify channel.
func (r *NodeWatcher) Watch(ctx context.Context) (<-chan struct{}, <-chan error, func(), error) {
func (r *NodeWatcher) Watch(ctx context.Context, logger *zap.Logger) (<-chan struct{}, <-chan error, func(), error) {
logger.Debug("starting node watcher", zap.String("nodename", r.nodename))

informerFactory := informers.NewSharedInformerFactoryWithOptions(
r.client.Clientset,
0,
constants.KubernetesInformerDefaultResyncPeriod,
informers.WithTweakListOptions(
func(opts *metav1.ListOptions) {
opts.FieldSelector = fields.OneTermEqualSelector(metav1.ObjectNameField, r.nodename).String()
Expand Down Expand Up @@ -88,7 +92,11 @@ func (r *NodeWatcher) Watch(ctx context.Context) (<-chan struct{}, <-chan error,

informerFactory.Start(ctx.Done())

logger.Debug("waiting for node cache sync")

informerFactory.WaitForCacheSync(ctx.Done())

logger.Debug("node cache sync done")

return notifyCh, watchErrCh, informerFactory.Shutdown, nil
}
4 changes: 2 additions & 2 deletions internal/app/machined/pkg/controllers/k8s/node_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,9 +158,9 @@ func (ctrl *NodeStatusController) Run(ctx context.Context, r controller.Runtime,
var watchCtx context.Context
watchCtx, watchCtxCancel = context.WithCancel(ctx) //nolint:govet

notifyCh, watchErrCh, notifyCloser, err = nodewatcher.Watch(watchCtx)
notifyCh, watchErrCh, notifyCloser, err = nodewatcher.Watch(watchCtx, logger)
if err != nil {
return fmt.Errorf("error setting up registry watcher: %w", err) //nolint:govet
return fmt.Errorf("error setting up node watcher: %w", err) //nolint:govet
}
}

Expand Down
10 changes: 7 additions & 3 deletions internal/integration/base/k8s.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ import (
"k8s.io/kubectl/pkg/scheme"

taloskubernetes "github.com/siderolabs/talos/pkg/kubernetes"
"github.com/siderolabs/talos/pkg/machinery/constants"
)

// K8sSuite is a base suite for K8s tests.
Expand Down Expand Up @@ -813,9 +814,12 @@ func (k8sSuite *K8sSuite) SetupNodeInformer(ctx context.Context, nodeName string

watchCh := make(chan *corev1.Node)

informerFactory := informers.NewSharedInformerFactoryWithOptions(k8sSuite.Clientset, 30*time.Second, informers.WithTweakListOptions(func(options *metav1.ListOptions) {
options.FieldSelector = metadataKeyName + nodeName
}))
informerFactory := informers.NewSharedInformerFactoryWithOptions(
k8sSuite.Clientset, constants.KubernetesInformerDefaultResyncPeriod,
informers.WithTweakListOptions(func(options *metav1.ListOptions) {
options.FieldSelector = metadataKeyName + nodeName
}),
)

nodeInformer := informerFactory.Core().V1().Nodes().Informer()
_, err := nodeInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
Expand Down
3 changes: 1 addition & 2 deletions internal/pkg/discovery/registry/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"net/netip"
"strconv"
"strings"
"time"

"github.com/siderolabs/gen/value"
"github.com/siderolabs/gen/xslices"
Expand Down Expand Up @@ -265,7 +264,7 @@ func (r *Kubernetes) List(localNodeName string) ([]*cluster.AffiliateSpec, error

// Watch starts watching Node state and notifies on updates via notify channel.
func (r *Kubernetes) Watch(ctx context.Context, logger *zap.Logger) (<-chan struct{}, func(), error) {
informerFactory := informers.NewSharedInformerFactory(r.client.Clientset, 30*time.Second)
informerFactory := informers.NewSharedInformerFactory(r.client.Clientset, constants.KubernetesInformerDefaultResyncPeriod)

notifyCh := make(chan struct{}, 1)

Expand Down
3 changes: 3 additions & 0 deletions pkg/machinery/constants/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -1238,6 +1238,9 @@ const (

// RegistrydListenAddress is the address to listen on for the registryd service.
RegistrydListenAddress = "127.0.0.1:3172"

// KubernetesInformerDefaultResyncPeriod is the default resync period for Kubernetes informers.
KubernetesInformerDefaultResyncPeriod = 30 * time.Second
)

// See https://linux.die.net/man/3/klogctl
Expand Down
Loading