From 439c4bd08b70f4e6bf4919fbd71b58d0ce0b4de6 Mon Sep 17 00:00:00 2001 From: crossoverJie Date: Wed, 3 Jul 2024 18:56:03 +0800 Subject: [PATCH] fix 1233 --- pulsar/consumer_partition.go | 2 +- pulsar/consumer_test.go | 12 ++++++++---- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go index 794f892816..d8001dc122 100644 --- a/pulsar/consumer_partition.go +++ b/pulsar/consumer_partition.go @@ -730,7 +730,7 @@ func (pc *partitionConsumer) internalAckIDCumulative(msgID MessageID, withRespon var ackReq *ackRequest if withResponse { - ackReq := pc.sendCumulativeAck(msgIDToAck) + ackReq = pc.sendCumulativeAck(msgIDToAck) <-ackReq.doneCh } else { pc.ackGroupingTracker.addCumulative(msgIDToAck) diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go index 54c91ec097..2ecdf8eb31 100644 --- a/pulsar/consumer_test.go +++ b/pulsar/consumer_test.go @@ -1010,11 +1010,13 @@ func TestConsumerBatchCumulativeAck(t *testing.T) { if i == N-1 { // cumulative ack the first half of messages - c1.AckCumulative(msg) + err := c1.AckCumulative(msg) + assert.Nil(t, err) } else if i == N { // the N+1 msg is in the second batch // cumulative ack it to test if the first batch can be acked - c2.AckCumulative(msg) + err := c2.AckCumulative(msg) + assert.Nil(t, err) } } @@ -3950,7 +3952,8 @@ func runBatchIndexAckTest(t *testing.T, ackWithResponse bool, cumulative bool, o // Acknowledge half of the messages if cumulative { msgID := msgIds[BatchingMaxSize/2-1] - consumer.AckIDCumulative(msgID) + err := consumer.AckIDCumulative(msgID) + assert.Nil(t, err) log.Printf("Acknowledge %v:%d cumulatively\n", msgID, msgID.BatchIdx()) } else { for i := 0; i < BatchingMaxSize; i++ { @@ -3985,7 +3988,8 @@ func runBatchIndexAckTest(t *testing.T, ackWithResponse bool, cumulative bool, o } if cumulative { msgID := msgIds[BatchingMaxSize-1] - consumer.AckIDCumulative(msgID) + err := consumer.AckIDCumulative(msgID) + assert.Nil(t, err) log.Printf("Acknowledge %v:%d cumulatively\n", msgID, msgID.BatchIdx()) } consumer.Close()