diff --git a/pulsar/ack_grouping_tracker.go b/pulsar/ack_grouping_tracker.go index dd051868d7..97d9e05a64 100644 --- a/pulsar/ack_grouping_tracker.go +++ b/pulsar/ack_grouping_tracker.go @@ -120,7 +120,7 @@ type timedAckGroupingTracker struct { // Key is the pair of the ledger id and the entry id, // Value is the bit set that represents which messages are acknowledged if the entry stores a batch. - // The bit 1 represents the message has been acknowledged, i.e. the bits "111" represents all messages + // The bit 1 represents the message has not been acknowledged, i.e. the bits "111" represents all messages // in the batch whose batch size is 3 are not acknowledged. // After the 1st message (i.e. batch index is 0) is acknowledged, the bits will become "011". // Value is nil if the entry represents a single message. @@ -241,6 +241,9 @@ func (t *timedAckGroupingTracker) clearPendingAcks() map[[2]uint64]*bitset.BitSe } func (t *timedAckGroupingTracker) close() { + if t.ticker != nil { + t.ticker.Stop() + } t.flushAndClean() if t.exitCh != nil { close(t.exitCh) diff --git a/pulsar/ack_grouping_tracker_test.go b/pulsar/ack_grouping_tracker_test.go index 0a794f6403..202d19d9d5 100644 --- a/pulsar/ack_grouping_tracker_test.go +++ b/pulsar/ack_grouping_tracker_test.go @@ -217,6 +217,52 @@ func TestDuplicateAfterClose(t *testing.T) { assert.False(t, tracker.isDuplicate(&messageID{ledgerID: 1})) } +func TestCloseFlushWithoutTimer(t *testing.T) { + var acker mockAcker + tracker := newAckGroupingTracker( + &AckGroupingOptions{MaxSize: 3, MaxTime: 0}, + nil, + func(id MessageID) { acker.ackCumulative(id) }, + func(ids []*pb.MessageIdData) { acker.ack(ids) }, + ) + + // case 1: message will not be acked because the cache is not full + tracker.add(&messageID{ledgerID: 1}) + tracker.add(&messageID{ledgerID: 2}) + assert.True(t, tracker.isDuplicate(&messageID{ledgerID: int64(1)})) + assert.True(t, tracker.isDuplicate(&messageID{ledgerID: int64(2)})) + assert.Equal(t, 0, len(acker.getLedgerIDs())) + + // case 2: tracker close so that all messages are flushed and acked + tracker.close() + assert.False(t, tracker.isDuplicate(&messageID{ledgerID: 1})) + assert.False(t, tracker.isDuplicate(&messageID{ledgerID: 2})) + assert.Equal(t, []int64{1, 2}, acker.getLedgerIDs()) +} + +func TestCloseFlushWithTimer(t *testing.T) { + var acker mockAcker + tracker := newAckGroupingTracker( + &AckGroupingOptions{MaxSize: 1000, MaxTime: 10 * 1000}, + nil, + func(id MessageID) { acker.ackCumulative(id) }, + func(ids []*pb.MessageIdData) { acker.ack(ids) }, + ) + + // case 1: messages are not acked because the cache is not full + tracker.add(&messageID{ledgerID: 1}) + tracker.add(&messageID{ledgerID: 2}) + assert.True(t, tracker.isDuplicate(&messageID{ledgerID: int64(1)})) + assert.True(t, tracker.isDuplicate(&messageID{ledgerID: int64(2)})) + assert.Equal(t, 0, len(acker.getLedgerIDs())) + + // case 2: tracker close so that all messages are flushed and acked + tracker.close() + assert.False(t, tracker.isDuplicate(&messageID{ledgerID: 1})) + assert.False(t, tracker.isDuplicate(&messageID{ledgerID: 2})) + assert.Equal(t, []int64{1, 2}, acker.getLedgerIDs()) +} + func TestTrackerPendingAcks(t *testing.T) { m := make(map[uint64][]int64) tracker := newAckGroupingTracker(&AckGroupingOptions{MaxSize: 3, MaxTime: 0}, nil, nil,