diff --git a/changelogs/unreleased/6872-tsaarni-small.md b/changelogs/unreleased/6872-tsaarni-small.md new file mode 100644 index 00000000000..321ccd1329b --- /dev/null +++ b/changelogs/unreleased/6872-tsaarni-small.md @@ -0,0 +1 @@ +Fixed a memory leak in Contour follower instance due to unprocessed LoadBalancer status updates. diff --git a/cmd/contour/serve.go b/cmd/contour/serve.go index 1c48f9fce88..976bfb334a8 100644 --- a/cmd/contour/serve.go +++ b/cmd/contour/serve.go @@ -703,6 +703,10 @@ func (s *Server) doServe() error { return err } + notifier := &leadership.Notifier{ + ToNotify: []leadership.NeedLeaderElectionNotification{contourHandler, observer}, + } + // Register an informer to watch envoy's service if we haven't been given static details. if lbAddress := contourConfiguration.Ingress.StatusAddress; len(lbAddress) > 0 { s.log.WithField("loadbalancer-address", lbAddress).Info("Using supplied information for Ingress status") @@ -723,6 +727,8 @@ func (s *Server) doServe() error { s.log.WithError(err).WithField("resource", "services").Fatal("failed to create informer") } + notifier.ToNotify = append(notifier.ToNotify, serviceHandler) + s.log.WithField("envoy-service-name", contourConfiguration.Envoy.Service.Name). WithField("envoy-service-namespace", contourConfiguration.Envoy.Service.Namespace). Info("Watching Service for Ingress status") @@ -740,9 +746,6 @@ func (s *Server) doServe() error { return err } - notifier := &leadership.Notifier{ - ToNotify: []leadership.NeedLeaderElectionNotification{contourHandler, observer}, - } if err := s.mgr.Add(notifier); err != nil { return err } diff --git a/internal/k8s/statusaddress.go b/internal/k8s/statusaddress.go index 0cb5cc1daa2..e323e8c310f 100644 --- a/internal/k8s/statusaddress.go +++ b/internal/k8s/statusaddress.go @@ -16,6 +16,7 @@ package k8s import ( "fmt" "sync" + "sync/atomic" "github.com/sirupsen/logrus" core_v1 "k8s.io/api/core/v1" @@ -186,6 +187,7 @@ type ServiceStatusLoadBalancerWatcher struct { ServiceName string LBStatus chan core_v1.LoadBalancerStatus Log logrus.FieldLogger + leader atomic.Bool } func (s *ServiceStatusLoadBalancerWatcher) OnAdd(obj any, _ bool) { @@ -234,8 +236,14 @@ func (s *ServiceStatusLoadBalancerWatcher) OnDelete(obj any) { }) } +func (s *ServiceStatusLoadBalancerWatcher) OnElectedLeader() { + s.leader.Store(true) +} + func (s *ServiceStatusLoadBalancerWatcher) notify(lbstatus core_v1.LoadBalancerStatus) { - s.LBStatus <- lbstatus + if s.leader.Load() { + s.LBStatus <- lbstatus + } } func coreToNetworkingLBStatus(lbs core_v1.LoadBalancerStatus) networking_v1.IngressLoadBalancerStatus { diff --git a/internal/k8s/statusaddress_test.go b/internal/k8s/statusaddress_test.go index e8bbf88cf18..477188179af 100644 --- a/internal/k8s/statusaddress_test.go +++ b/internal/k8s/statusaddress_test.go @@ -41,6 +41,7 @@ func TestServiceStatusLoadBalancerWatcherOnAdd(t *testing.T) { LBStatus: lbstatus, Log: fixture.NewTestLogger(t), } + sw.OnElectedLeader() recv := func() (core_v1.LoadBalancerStatus, bool) { select { @@ -89,6 +90,7 @@ func TestServiceStatusLoadBalancerWatcherOnUpdate(t *testing.T) { LBStatus: lbstatus, Log: fixture.NewTestLogger(t), } + sw.OnElectedLeader() recv := func() (core_v1.LoadBalancerStatus, bool) { select { @@ -139,6 +141,7 @@ func TestServiceStatusLoadBalancerWatcherOnDelete(t *testing.T) { LBStatus: lbstatus, Log: fixture.NewTestLogger(t), } + sw.OnElectedLeader() recv := func() (core_v1.LoadBalancerStatus, bool) { select { @@ -179,6 +182,31 @@ func TestServiceStatusLoadBalancerWatcherOnDelete(t *testing.T) { assert.Equal(t, want, got) } +func TestServiceStatusLoadBalancerWatcherNoNotificationsOnFollower(t *testing.T) { + lbstatus := make(chan core_v1.LoadBalancerStatus, 1) + + sw := ServiceStatusLoadBalancerWatcher{ + ServiceName: "envoy", + LBStatus: lbstatus, + Log: fixture.NewTestLogger(t), + } + + recv := func() bool { + select { + case <-sw.LBStatus: + return true + default: + return false + } + } + + // assert that when not elected leader, no notifications are sent. + var svc core_v1.Service + svc.Name = "envoy" + sw.OnAdd(&svc, false) + assert.False(t, recv()) +} + func TestStatusAddressUpdater(t *testing.T) { const objName = "someobjfoo"