From ffdc3afec23679e86b41ebb96b25c723f2ec5e05 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Tue, 19 Nov 2024 14:40:08 +0800 Subject: [PATCH] Improve the AckIDList performance when there are many topics subscribed (#1305) ### Motivation Currently when a consumer subscribes multiple topic-partitions and `AckWithResponse` is true, the `AckIDList` method will iterate over all internal consumers **sequentially**. It harms the performance especially there are many internal consumers. For example, if the connection of an internal consumer was stuck by some reason, message IDs from other consumer would be blocked for the operation timeout. ### Modifications In `ackIDListFromMultiTopics`, call `consumer.AckIDList` in goroutines and use a channel to receive all errors from these calls. Add `TestMultiTopicAckIDListTimeout`, which sets a dummy connection instance whose `SendRequest` never completes the callback, to verify the `AckIDList` call will not take much more time than the operation timeout to complete. Without this improvement, it will take more than 5 times of the operation timeout to fail. --- pulsar/consumer_multitopic.go | 21 +++--- pulsar/consumer_multitopic_test.go | 102 ++++++++++++++++++++++++++++- pulsar/consumer_partition.go | 6 +- 3 files changed, 114 insertions(+), 15 deletions(-) diff --git a/pulsar/consumer_multitopic.go b/pulsar/consumer_multitopic.go index 26430add00..7f5e46273c 100644 --- a/pulsar/consumer_multitopic.go +++ b/pulsar/consumer_multitopic.go @@ -189,18 +189,17 @@ func ackIDListFromMultiTopics(log log.Logger, msgIDs []MessageID, findConsumer f } } - ackError := AckError{} + subErrCh := make(chan error, len(consumerToMsgIDs)) for consumer, ids := range consumerToMsgIDs { - if err := consumer.AckIDList(ids); err != nil { - if topicAckError := err.(AckError); topicAckError != nil { - for id, err := range topicAckError { - ackError[id] = err - } - } else { - // It should not reach here - for _, id := range ids { - ackError[id] = err - } + go func() { + subErrCh <- consumer.AckIDList(ids) + }() + } + ackError := AckError{} + for i := 0; i < len(consumerToMsgIDs); i++ { + if topicAckError, ok := (<-subErrCh).(AckError); ok { + for id, err := range topicAckError { + ackError[id] = err } } } diff --git a/pulsar/consumer_multitopic_test.go b/pulsar/consumer_multitopic_test.go index 4b3ec90855..cd236ecc22 100644 --- a/pulsar/consumer_multitopic_test.go +++ b/pulsar/consumer_multitopic_test.go @@ -18,15 +18,17 @@ package pulsar import ( + "errors" "fmt" "strings" "testing" "time" + "github.com/apache/pulsar-client-go/pulsar/internal" + pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto" "github.com/apache/pulsar-client-go/pulsaradmin" "github.com/apache/pulsar-client-go/pulsaradmin/pkg/admin/config" "github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils" - "github.com/stretchr/testify/assert" ) @@ -317,3 +319,101 @@ func runMultiTopicAckIDList(t *testing.T, regex bool) { assert.Fail(t, "AckIDList should return AckError") } } + +type dummyConnection struct { +} + +func (dummyConnection) SendRequest(_ uint64, _ *pb.BaseCommand, _ func(*pb.BaseCommand, error)) { +} + +func (dummyConnection) SendRequestNoWait(_ *pb.BaseCommand) error { + return nil +} + +func (dummyConnection) WriteData(_ internal.Buffer) { +} + +func (dummyConnection) RegisterListener(_ uint64, _ internal.ConnectionListener) error { + return nil +} + +func (dummyConnection) UnregisterListener(_ uint64) { +} + +func (dummyConnection) AddConsumeHandler(_ uint64, _ internal.ConsumerHandler) error { + return nil +} + +func (dummyConnection) DeleteConsumeHandler(_ uint64) { +} + +func (dummyConnection) ID() string { + return "cnx" +} + +func (dummyConnection) GetMaxMessageSize() int32 { + return 0 +} + +func (dummyConnection) Close() { +} + +func (dummyConnection) WaitForClose() <-chan struct{} { + return nil +} + +func (dummyConnection) IsProxied() bool { + return false +} + +func TestMultiTopicAckIDListTimeout(t *testing.T) { + topic := fmt.Sprintf("multiTopicAckIDListTimeout%v", time.Now().UnixNano()) + assert.NoError(t, createPartitionedTopic(topic, 5)) + + cli, err := NewClient(ClientOptions{ + URL: "pulsar://localhost:6650", + OperationTimeout: 3 * time.Second, + }) + assert.Nil(t, err) + defer cli.Close() + + consumer, err := cli.Subscribe(ConsumerOptions{ + Topic: topic, + SubscriptionName: "sub", + AckWithResponse: true, + }) + assert.Nil(t, err) + defer consumer.Close() + + const numMessages = 5 + sendMessages(t, cli, topic, 0, numMessages, false) + msgs := receiveMessages(t, consumer, numMessages) + msgIDs := make([]MessageID, len(msgs)) + + var conn internal.Connection + for i := 0; i < len(msgs); i++ { + msgIDs[i] = msgs[i].ID() + pc, ok := msgIDs[i].(*trackingMessageID).consumer.(*partitionConsumer) + assert.True(t, ok) + conn = pc._getConn() + pc._setConn(dummyConnection{}) + } + + start := time.Now() + err = consumer.AckIDList(msgIDs) + elapsed := time.Since(start) + t.Logf("AckIDList takes %v ms", elapsed) + assert.True(t, elapsed < 5*time.Second && elapsed >= 3*time.Second) + var ackError AckError + if errors.As(err, &ackError) { + for _, err := range ackError { + assert.Equal(t, "request timed out", err.Error()) + } + } + + for i := 0; i < len(msgs); i++ { + pc, ok := msgIDs[i].(*trackingMessageID).consumer.(*partitionConsumer) + assert.True(t, ok) + pc._setConn(conn) + } +} diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go index 9f0057c35e..ae5e43a1e1 100644 --- a/pulsar/consumer_partition.go +++ b/pulsar/consumer_partition.go @@ -141,7 +141,7 @@ type partitionConsumer struct { state uAtomic.Int32 options *partitionConsumerOpts - conn uAtomic.Value + conn atomic.Pointer[internal.Connection] topic string name string @@ -2205,7 +2205,7 @@ func (pc *partitionConsumer) hasMoreMessages() bool { // _setConn sets the internal connection field of this partition consumer atomically. // Note: should only be called by this partition consumer when a new connection is available. func (pc *partitionConsumer) _setConn(conn internal.Connection) { - pc.conn.Store(conn) + pc.conn.Store(&conn) } // _getConn returns internal connection field of this partition consumer atomically. @@ -2214,7 +2214,7 @@ func (pc *partitionConsumer) _getConn() internal.Connection { // Invariant: The conn must be non-nill for the lifetime of the partitionConsumer. // For this reason we leave this cast unchecked and panic() if the // invariant is broken - return pc.conn.Load().(internal.Connection) + return *pc.conn.Load() } func convertToMessageIDData(msgID *trackingMessageID) *pb.MessageIdData {