Skip to content

Commit

Permalink
When topic is terminated. Client must not retry connecting. Pending m…
Browse files Browse the repository at this point in the history
…essages should be failed
  • Loading branch information
Prashant Kumar committed Nov 13, 2023
1 parent bea85d4 commit 21d9b7b
Showing 1 changed file with 21 additions and 0 deletions.
21 changes: 21 additions & 0 deletions pulsar/producer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -421,6 +421,27 @@ func (p *partitionProducer) reconnectToBroker() {
break
}

if strings.Contains(errMsg, "TopicTerminatedError") {
p.log.Info("Topic was terminated, failing pending messages, will not reconnect")
pendingItems := p.pendingQueue.ReadableSlice()
for _, item := range pendingItems {
pi := item.(*pendingItem)
if pi != nil {
pi.Lock()
requests := pi.sendRequests
for _, req := range requests {
sr := req.(*sendRequest)
if sr != nil {
sr.callback(nil, sr.msg, newError(TopicTerminated, err.Error()))
}
}
pi.Unlock()
}
}
p.setProducerState(producerClosing)
break
}

if maxRetry > 0 {
maxRetry--
}
Expand Down

0 comments on commit 21d9b7b

Please sign in to comment.