diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index 9c7c1c09d8..3e493d1860 100755 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -439,6 +439,27 @@ func (p *partitionProducer) reconnectToBroker() { break } + if strings.Contains(errMsg, "TopicTerminatedError") { + p.log.Info("Topic was terminated, failing pending messages, will not reconnect") + pendingItems := p.pendingQueue.ReadableSlice() + for _, item := range pendingItems { + pi := item.(*pendingItem) + if pi != nil { + pi.Lock() + requests := pi.sendRequests + for _, req := range requests { + sr := req.(*sendRequest) + if sr != nil { + sr.done(nil, newError(TopicTerminated, err.Error())) + } + } + pi.Unlock() + } + } + p.setProducerState(producerClosing) + break + } + if maxRetry > 0 { maxRetry-- } diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go index 49e225f3e1..f30ae65fcf 100644 --- a/pulsar/producer_test.go +++ b/pulsar/producer_test.go @@ -1158,6 +1158,62 @@ func TestFailedSchemaEncode(t *testing.T) { wg.Wait() } +func TestTopicTermination(t *testing.T) { + client, err := NewClient(ClientOptions{ + URL: serviceURL, + }) + assert.NoError(t, err) + defer client.Close() + + topicName := newTopicName() + consumer, err := client.Subscribe(ConsumerOptions{ + Topic: topicName, + SubscriptionName: "send_timeout_sub", + }) + assert.Nil(t, err) + defer consumer.Close() // subscribe but do nothing + + producer, err := client.CreateProducer(ProducerOptions{ + Topic: topicName, + SendTimeout: 2 * time.Second, + }) + assert.Nil(t, err) + defer producer.Close() + + afterCh := time.After(5 * time.Second) + terminatedChan := make(chan bool) + go func() { + for { + _, err := producer.Send(context.Background(), &ProducerMessage{ + Payload: make([]byte, 1024), + }) + if err != nil { + e := err.(*Error) + if e.result == TopicTerminated { + terminatedChan <- true + } else { + terminatedChan <- false + } + } + time.Sleep(1 * time.Millisecond) + } + }() + + terminateURL := adminURL + "/admin/v2/persistent/public/default/" + topicName + "/terminate" + log.Info(terminateURL) + makeHTTPCall(t, http.MethodPost, terminateURL, "") + + for { + select { + case d := <-terminatedChan: + assert.Equal(t, d, true) + return + case <-afterCh: + assert.Fail(t, "Time is up. Topic should have been terminated by now") + } + } +} + func TestSendTimeout(t *testing.T) { quotaURL := adminURL + "/admin/v2/namespaces/public/default/backlogQuota" quotaFmt := `{"limit": "%d", "policy": "producer_request_hold"}`