diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index 922d89dbb9..3040826acd 100644 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -421,6 +421,27 @@ func (p *partitionProducer) reconnectToBroker() { break } + if strings.Contains(errMsg, "TopicTerminatedError") { + p.log.Info("Topic was terminated, failing pending messages, will not reconnect") + pendingItems := p.pendingQueue.ReadableSlice() + for _, item := range pendingItems { + pi := item.(*pendingItem) + if pi != nil { + pi.Lock() + requests := pi.sendRequests + for _, req := range requests { + sr := req.(*sendRequest) + if sr != nil { + sr.callback(nil, sr.msg, newError(TopicTerminated, err.Error())) + } + } + pi.Unlock() + } + } + p.setProducerState(producerClosing) + break + } + if maxRetry > 0 { maxRetry-- }