Skip to content

Commit

Permalink
[Fix][Producer] handle TopicNotFound/TopicTerminated/ProducerBlockedQ…
Browse files Browse the repository at this point in the history
…uotaExceededException/ProducerFenced when reconnecting (#1134)

Master Issue: #1128

### Motivation

In Java client, when we get TopicNotFound/TopicTerminated/ProducerBlockedQuotaExceededException/ProducerFenced, we should failPendingMessages, and close producer. But in Go client, we forget to handle ProducerBlockedQuotaExceededException/ProducerFenced, and in #1128, we just call sr.done(), actually we should call failPendingMessages().

https://github.com/apache/pulsar-client-go/pull/1128/files
https://github.com/apache/pulsar/blob/master/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java#L1663

### Modifications
1. rename `errMsgTopicNotFount` to `errMsgTopicNotFound`
2. handle TopicTerminated/ProducerBlockedQuotaExceededException/ProducerFenced, call `failPendingMessages()`;

---------

Co-authored-by: gunli <[email protected]>
  • Loading branch information
gunli and gunli authored Dec 8, 2023
1 parent 72aed95 commit bd11581
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 35 deletions.
2 changes: 1 addition & 1 deletion pulsar/consumer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -1659,7 +1659,7 @@ func (pc *partitionConsumer) reconnectToBroker() {
}
pc.log.WithError(err).Error("Failed to create consumer at reconnect")
errMsg := err.Error()
if strings.Contains(errMsg, errTopicNotFount) {
if strings.Contains(errMsg, errMsgTopicNotFound) {
// when topic is deleted, we should give up reconnection.
pc.log.Warn("Topic Not Found.")
break
Expand Down
6 changes: 6 additions & 0 deletions pulsar/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,12 @@ const (
TransactionNoFoundError
// ClientMemoryBufferIsFull client limit buffer is full
ClientMemoryBufferIsFull
// ProducerFenced When a producer asks and fail to get exclusive producer access,
// or loses the exclusive status after a reconnection, the broker will
// use this error to indicate that this producer is now permanently
// fenced. Applications are now supposed to close it and create a
// new producer
ProducerFenced
)

// Error implement error interface, composed of two parts: msg and result.
Expand Down
74 changes: 45 additions & 29 deletions pulsar/producer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,12 @@ var (
sendRequestPool *sync.Pool
)

var errTopicNotFount = "TopicNotFound"
const (
errMsgTopicNotFound = "TopicNotFound"
errMsgTopicTerminated = "TopicTerminatedError"
errMsgProducerBlockedQuotaExceededException = "ProducerBlockedQuotaExceededException"
errMsgProducerFenced = "ProducerFenced"
)

func init() {
sendRequestPool = &sync.Pool{
Expand Down Expand Up @@ -441,30 +446,28 @@ func (p *partitionProducer) reconnectToBroker() {
}
p.log.WithError(err).Error("Failed to create producer at reconnect")
errMsg := err.Error()
if strings.Contains(errMsg, errTopicNotFount) {
if strings.Contains(errMsg, errMsgTopicNotFound) {
// when topic is deleted, we should give up reconnection.
p.log.Warn("Topic Not Found.")
p.log.Warn("Topic not found, stop reconnecting, close the producer")
p.doClose(newError(TopicNotFound, err.Error()))
break
}

if strings.Contains(errMsg, "TopicTerminatedError") {
p.log.Info("Topic was terminated, failing pending messages, will not reconnect")
pendingItems := p.pendingQueue.ReadableSlice()
for _, item := range pendingItems {
pi := item.(*pendingItem)
if pi != nil {
pi.Lock()
requests := pi.sendRequests
for _, req := range requests {
sr := req.(*sendRequest)
if sr != nil {
sr.done(nil, newError(TopicTerminated, err.Error()))
}
}
pi.Unlock()
}
}
p.setProducerState(producerClosing)
if strings.Contains(errMsg, errMsgTopicTerminated) {
p.log.Warn("Topic was terminated, failing pending messages, stop reconnecting, close the producer")
p.doClose(newError(TopicTerminated, err.Error()))
break
}

if strings.Contains(errMsg, errMsgProducerBlockedQuotaExceededException) {
p.log.Warn("Producer was blocked by quota exceed exception, failing pending messages, stop reconnecting")
p.failPendingMessages(newError(ProducerBlockedQuotaExceededException, err.Error()))
break
}

if strings.Contains(errMsg, errMsgProducerFenced) {
p.log.Warn("Producer was fenced, failing pending messages, stop reconnecting")
p.doClose(newError(ProducerFenced, err.Error()))
break
}

Expand All @@ -481,10 +484,18 @@ func (p *partitionProducer) reconnectToBroker() {
func (p *partitionProducer) runEventsLoop() {
for {
select {
case data := <-p.dataChan:
case data, ok := <-p.dataChan:
// when doClose() is call, p.dataChan will be closed, data will be nil
if !ok {
return
}
p.internalSend(data)
case i := <-p.cmdChan:
switch v := i.(type) {
case cmd, ok := <-p.cmdChan:
// when doClose() is call, p.dataChan will be closed, cmd will be nil
if !ok {
return
}
switch v := cmd.(type) {
case *flushRequest:
p.internalFlush(v)
case *closeProducer:
Expand Down Expand Up @@ -1321,13 +1332,18 @@ func (p *partitionProducer) ReceivedSendReceipt(response *pb.CommandSendReceipt)

func (p *partitionProducer) internalClose(req *closeProducer) {
defer close(req.doneCh)

p.doClose(errProducerClosed)
}

func (p *partitionProducer) doClose(reason error) {
if !p.casProducerState(producerReady, producerClosing) {
return
}

p.log.Info("Closing producer")
defer close(p.dataChan)
defer close(p.cmdChan)
p.log.Info("Closing producer")

id := p.client.rpcClient.NewRequestID()
_, err := p.client.rpcClient.RequestOnCnx(p._getConn(), id, pb.BaseCommand_CLOSE_PRODUCER, &pb.CommandCloseProducer{
Expand All @@ -1340,7 +1356,7 @@ func (p *partitionProducer) internalClose(req *closeProducer) {
} else {
p.log.Info("Closed producer")
}
p.failPendingMessages()
p.failPendingMessages(reason)

if p.batchBuilder != nil {
if err = p.batchBuilder.Close(); err != nil {
Expand All @@ -1353,7 +1369,7 @@ func (p *partitionProducer) internalClose(req *closeProducer) {
p.batchFlushTicker.Stop()
}

func (p *partitionProducer) failPendingMessages() {
func (p *partitionProducer) failPendingMessages(err error) {
curViewItems := p.pendingQueue.ReadableSlice()
viewSize := len(curViewItems)
if viewSize <= 0 {
Expand All @@ -1378,11 +1394,11 @@ func (p *partitionProducer) failPendingMessages() {

for _, i := range pi.sendRequests {
sr := i.(*sendRequest)
sr.done(nil, errProducerClosed)
sr.done(nil, err)
}

// flag the sending has completed with error, flush make no effect
pi.done(errProducerClosed)
pi.done(err)
pi.Unlock()

// finally reached the last view item, current iteration ends
Expand Down
13 changes: 8 additions & 5 deletions pulsar/producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,16 @@ import (
"testing"
"time"

"github.com/apache/pulsar-client-go/pulsar/internal"
pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
"github.com/stretchr/testify/assert"
"google.golang.org/protobuf/proto"

"github.com/apache/pulsar-client-go/pulsar/internal"
pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"

log "github.com/sirupsen/logrus"

"github.com/apache/pulsar-client-go/pulsar/crypto"
plog "github.com/apache/pulsar-client-go/pulsar/log"
log "github.com/sirupsen/logrus"
)

func TestInvalidURL(t *testing.T) {
Expand Down Expand Up @@ -1168,7 +1170,7 @@ func TestTopicTermination(t *testing.T) {
topicName := newTopicName()
consumer, err := client.Subscribe(ConsumerOptions{
Topic: topicName,
SubscriptionName: "send_timeout_sub",
SubscriptionName: "topic_terminated_sub",
})
assert.Nil(t, err)
defer consumer.Close() // subscribe but do nothing
Expand All @@ -1189,7 +1191,7 @@ func TestTopicTermination(t *testing.T) {
})
if err != nil {
e := err.(*Error)
if e.result == TopicTerminated {
if e.result == TopicTerminated || err == errProducerClosed {
terminatedChan <- true
} else {
terminatedChan <- false
Expand All @@ -1210,6 +1212,7 @@ func TestTopicTermination(t *testing.T) {
return
case <-afterCh:
assert.Fail(t, "Time is up. Topic should have been terminated by now")
return
}
}
}
Expand Down

0 comments on commit bd11581

Please sign in to comment.