From 920d0ce1552b1403e81065a87d8e2205f0697887 Mon Sep 17 00:00:00 2001 From: Tero Saarni Date: Mon, 20 Jan 2025 20:10:02 +0200 Subject: [PATCH] Prevent LoadBalancer updates on follower. Follower instances of Contour do not run loadBalancerStatusWriter and therefore do not read from the channel that receives status updates. The LoadBalancer status updates are still watched and sent to a channel, causing the go routine to block. This led to LoadBalancer updates piling up and consuming memory, eventually causing an out-of-memory condition and killing the Contour process. Signed-off-by: Tero Saarni --- changelogs/unreleased/6872-tsaarni-small.md | 1 + cmd/contour/serve.go | 9 ++++--- internal/k8s/statusaddress.go | 10 +++++++- internal/k8s/statusaddress_test.go | 28 +++++++++++++++++++++ 4 files changed, 44 insertions(+), 4 deletions(-) create mode 100644 changelogs/unreleased/6872-tsaarni-small.md diff --git a/changelogs/unreleased/6872-tsaarni-small.md b/changelogs/unreleased/6872-tsaarni-small.md new file mode 100644 index 00000000000..321ccd1329b --- /dev/null +++ b/changelogs/unreleased/6872-tsaarni-small.md @@ -0,0 +1 @@ +Fixed a memory leak in Contour follower instance due to unprocessed LoadBalancer status updates. diff --git a/cmd/contour/serve.go b/cmd/contour/serve.go index 1c48f9fce88..976bfb334a8 100644 --- a/cmd/contour/serve.go +++ b/cmd/contour/serve.go @@ -703,6 +703,10 @@ func (s *Server) doServe() error { return err } + notifier := &leadership.Notifier{ + ToNotify: []leadership.NeedLeaderElectionNotification{contourHandler, observer}, + } + // Register an informer to watch envoy's service if we haven't been given static details. if lbAddress := contourConfiguration.Ingress.StatusAddress; len(lbAddress) > 0 { s.log.WithField("loadbalancer-address", lbAddress).Info("Using supplied information for Ingress status") @@ -723,6 +727,8 @@ func (s *Server) doServe() error { s.log.WithError(err).WithField("resource", "services").Fatal("failed to create informer") } + notifier.ToNotify = append(notifier.ToNotify, serviceHandler) + s.log.WithField("envoy-service-name", contourConfiguration.Envoy.Service.Name). WithField("envoy-service-namespace", contourConfiguration.Envoy.Service.Namespace). Info("Watching Service for Ingress status") @@ -740,9 +746,6 @@ func (s *Server) doServe() error { return err } - notifier := &leadership.Notifier{ - ToNotify: []leadership.NeedLeaderElectionNotification{contourHandler, observer}, - } if err := s.mgr.Add(notifier); err != nil { return err } diff --git a/internal/k8s/statusaddress.go b/internal/k8s/statusaddress.go index 0cb5cc1daa2..e323e8c310f 100644 --- a/internal/k8s/statusaddress.go +++ b/internal/k8s/statusaddress.go @@ -16,6 +16,7 @@ package k8s import ( "fmt" "sync" + "sync/atomic" "github.com/sirupsen/logrus" core_v1 "k8s.io/api/core/v1" @@ -186,6 +187,7 @@ type ServiceStatusLoadBalancerWatcher struct { ServiceName string LBStatus chan core_v1.LoadBalancerStatus Log logrus.FieldLogger + leader atomic.Bool } func (s *ServiceStatusLoadBalancerWatcher) OnAdd(obj any, _ bool) { @@ -234,8 +236,14 @@ func (s *ServiceStatusLoadBalancerWatcher) OnDelete(obj any) { }) } +func (s *ServiceStatusLoadBalancerWatcher) OnElectedLeader() { + s.leader.Store(true) +} + func (s *ServiceStatusLoadBalancerWatcher) notify(lbstatus core_v1.LoadBalancerStatus) { - s.LBStatus <- lbstatus + if s.leader.Load() { + s.LBStatus <- lbstatus + } } func coreToNetworkingLBStatus(lbs core_v1.LoadBalancerStatus) networking_v1.IngressLoadBalancerStatus { diff --git a/internal/k8s/statusaddress_test.go b/internal/k8s/statusaddress_test.go index e8bbf88cf18..477188179af 100644 --- a/internal/k8s/statusaddress_test.go +++ b/internal/k8s/statusaddress_test.go @@ -41,6 +41,7 @@ func TestServiceStatusLoadBalancerWatcherOnAdd(t *testing.T) { LBStatus: lbstatus, Log: fixture.NewTestLogger(t), } + sw.OnElectedLeader() recv := func() (core_v1.LoadBalancerStatus, bool) { select { @@ -89,6 +90,7 @@ func TestServiceStatusLoadBalancerWatcherOnUpdate(t *testing.T) { LBStatus: lbstatus, Log: fixture.NewTestLogger(t), } + sw.OnElectedLeader() recv := func() (core_v1.LoadBalancerStatus, bool) { select { @@ -139,6 +141,7 @@ func TestServiceStatusLoadBalancerWatcherOnDelete(t *testing.T) { LBStatus: lbstatus, Log: fixture.NewTestLogger(t), } + sw.OnElectedLeader() recv := func() (core_v1.LoadBalancerStatus, bool) { select { @@ -179,6 +182,31 @@ func TestServiceStatusLoadBalancerWatcherOnDelete(t *testing.T) { assert.Equal(t, want, got) } +func TestServiceStatusLoadBalancerWatcherNoNotificationsOnFollower(t *testing.T) { + lbstatus := make(chan core_v1.LoadBalancerStatus, 1) + + sw := ServiceStatusLoadBalancerWatcher{ + ServiceName: "envoy", + LBStatus: lbstatus, + Log: fixture.NewTestLogger(t), + } + + recv := func() bool { + select { + case <-sw.LBStatus: + return true + default: + return false + } + } + + // assert that when not elected leader, no notifications are sent. + var svc core_v1.Service + svc.Name = "envoy" + sw.OnAdd(&svc, false) + assert.False(t, recv()) +} + func TestStatusAddressUpdater(t *testing.T) { const objName = "someobjfoo"