From 21d9b7b0825f9a7ab746c8a95f8f5c927c37817f Mon Sep 17 00:00:00 2001 From: Prashant Kumar Date: Mon, 13 Nov 2023 14:33:31 -0800 Subject: [PATCH] When topic is terminated. Client must not retry connecting. Pending messages should be failed --- pulsar/producer_partition.go | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index 922d89dbb9..3040826acd 100644 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -421,6 +421,27 @@ func (p *partitionProducer) reconnectToBroker() { break } + if strings.Contains(errMsg, "TopicTerminatedError") { + p.log.Info("Topic was terminated, failing pending messages, will not reconnect") + pendingItems := p.pendingQueue.ReadableSlice() + for _, item := range pendingItems { + pi := item.(*pendingItem) + if pi != nil { + pi.Lock() + requests := pi.sendRequests + for _, req := range requests { + sr := req.(*sendRequest) + if sr != nil { + sr.callback(nil, sr.msg, newError(TopicTerminated, err.Error())) + } + } + pi.Unlock() + } + } + p.setProducerState(producerClosing) + break + } + if maxRetry > 0 { maxRetry-- }