diff --git a/consumer.go b/consumer.go index 285ce779..6a4d75cf 100644 --- a/consumer.go +++ b/consumer.go @@ -955,16 +955,22 @@ func (r *Consumer) redistributeRDY() { return } - numConns := int32(len(r.conns())) + // if an external heuristic set needRDYRedistributed we want to wait + // until we can actually redistribute to proceed + conns := r.conns() + if len(conns) == 0 { + return + } + maxInFlight := r.getMaxInFlight() - if numConns > maxInFlight { + if len(conns) > int(maxInFlight) { r.log(LogLevelDebug, "redistributing RDY state (%d conns > %d max_in_flight)", - numConns, maxInFlight) + len(conns), maxInFlight) atomic.StoreInt32(&r.needRDYRedistributed, 1) } - if r.inBackoff() && numConns > 1 { - r.log(LogLevelDebug, "redistributing RDY state (in backoff and %d conns > 1)", numConns) + if r.inBackoff() && len(conns) > 1 { + r.log(LogLevelDebug, "redistributing RDY state (in backoff and %d conns > 1)", len(conns)) atomic.StoreInt32(&r.needRDYRedistributed, 1) } @@ -972,7 +978,6 @@ func (r *Consumer) redistributeRDY() { return } - conns := r.conns() possibleConns := make([]*Conn, 0, len(conns)) for _, c := range conns { lastMsgDuration := time.Now().Sub(c.LastMessageTime())