From b8bb2c58cd0829d58d66718d8d4dd6c925ca68b6 Mon Sep 17 00:00:00 2001 From: Jonathan Stevens Date: Thu, 10 Nov 2022 16:08:42 -0700 Subject: [PATCH] fix(consumer): remove old nsqd connections if addresses change --- consumer.go | 38 ++++++++++++++++++++++++++++---------- 1 file changed, 28 insertions(+), 10 deletions(-) diff --git a/consumer.go b/consumer.go index 895c89a5..d810e6af 100644 --- a/consumer.go +++ b/consumer.go @@ -33,9 +33,9 @@ type Handler interface { // HandlerFunc is a convenience type to avoid having to declare a struct // to implement the Handler interface, it can be used like this: // -// consumer.AddHandler(nsq.HandlerFunc(func(m *Message) error { -// // handle the message -// })) +// consumer.AddHandler(nsq.HandlerFunc(func(m *Message) error { +// // handle the message +// })) type HandlerFunc func(message *Message) error // HandleMessage implements the Handler interface @@ -220,8 +220,7 @@ func (r *Consumer) conns() []*Conn { // The logger parameter is an interface that requires the following // method to be implemented (such as the the stdlib log.Logger): // -// Output(calldepth int, s string) error -// +// Output(calldepth int, s string) error func (r *Consumer) SetLogger(l logger, lvl LogLevel) { r.logGuard.Lock() defer r.logGuard.Unlock() @@ -266,8 +265,7 @@ func (r *Consumer) getLogLevel() LogLevel { // of the following interfaces that modify the behavior // of the `Consumer`: // -// DiscoveryFilter -// +// DiscoveryFilter func (r *Consumer) SetBehaviorDelegate(cb interface{}) { matched := false @@ -312,7 +310,7 @@ func (r *Consumer) getMaxInFlight() int32 { // ChangeMaxInFlight sets a new maximum number of messages this comsumer instance // will allow in-flight, and updates all existing connections as appropriate. // -// For example, ChangeMaxInFlight(0) would pause message flow +// # For example, ChangeMaxInFlight(0) would pause message flow // // If already connected, it updates the reader RDY state for each connection. func (r *Consumer) ChangeMaxInFlight(maxInFlight int) { @@ -513,13 +511,33 @@ retry: if discoveryFilter, ok := r.behaviorDelegate.(DiscoveryFilter); ok { nsqdAddrs = discoveryFilter.Filter(nsqdAddrs) } + + var successfulNsqdAddrs []string for _, addr := range nsqdAddrs { err = r.ConnectToNSQD(addr) if err != nil && err != ErrAlreadyConnected { r.log(LogLevelError, "(%s) error connecting to nsqd - %s", addr, err) continue } + successfulNsqdAddrs = append(successfulNsqdAddrs, addr) } + + // in the event that there are new nsqd addresses, remove the old connections from the connections map + for addr := range r.connections { + if !inAddrs(successfulNsqdAddrs, addr) { + delete(r.connections, addr) + } + } +} + +func inAddrs(addrs []string, addr string) bool { + for _, a := range addrs { + if addr == a { + return true + } + } + + return false } // ConnectToNSQDs takes multiple nsqd addresses to connect directly to. @@ -1109,7 +1127,7 @@ func (r *Consumer) stopHandlers() { // AddHandler sets the Handler for messages received by this Consumer. This can be called // multiple times to add additional handlers. Handler will have a 1:1 ratio to message handling goroutines. // -// This panics if called after connecting to NSQD or NSQ Lookupd +// # This panics if called after connecting to NSQD or NSQ Lookupd // // (see Handler or HandlerFunc for details on implementing this interface) func (r *Consumer) AddHandler(handler Handler) { @@ -1120,7 +1138,7 @@ func (r *Consumer) AddHandler(handler Handler) { // takes a second argument which indicates the number of goroutines to spawn for // message handling. // -// This panics if called after connecting to NSQD or NSQ Lookupd +// # This panics if called after connecting to NSQD or NSQ Lookupd // // (see Handler or HandlerFunc for details on implementing this interface) func (r *Consumer) AddConcurrentHandlers(handler Handler, concurrency int) {