Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][client] PIP-393: Improve performance of Negative Acknowledgement #23600

Open
wants to merge 8 commits into
base: master
Choose a base branch
from

Conversation

thetumbled
Copy link
Member

@thetumbled thetumbled commented Nov 14, 2024

Motivation

There are many issues with the current implementation of Negative Acknowledgement in Pulsar:

  • the memory occupation is high.
  • the code execution efficiency is low.
  • the redelivery time is not accurate.
  • multiple negative ack for messages in the same entry(batch) will interfere with each other.
    All of these problem is severe and need to be solved.

Memory occupation is high

After the improvement of #23582, we have reduce half more memory occupation
of NegativeAcksTracker by replacing HashMap with ConcurrentLongLongPairHashMap. With 100w entry, the memory occupation decrease from 178Mb to 64Mb. With 1kw entry, the memory occupation decrease from 1132Mb to 512Mb.
The average memory occupation of each entry decrease from 1132MB/10000000=118byte to 512MB/10000000=53byte.

But it is not enough. Assuming that we negative ack message 1w/s, assigning 1h redelivery delay for each message,
the memory occupation of NegativeAcksTracker will be 3600*10000*53/1024/1024/1024=1.77GB, if the delay is 5h,
the required memory is 3600*10000*53/1024/1024/1024*5=8.88GB, which increase too fast.

Code execution efficiency is low

Currently, each time the timer task is triggered, it will iterate all the entries in NegativeAcksTracker.nackedMessages,
which is unnecessary. We can sort entries by timestamp and only iterate the entries that need to be redelivered.

Redelivery time is not accurate

Currently, the redelivery time is controlled by the timerIntervalNanos, which is 1/3 of the negativeAckRedeliveryDelay.
That means, if the negativeAckRedeliveryDelay is 1h, the redelivery time will be 20min, which is unacceptable.

Multiple negative ack for messages in the same entry(batch) will interfere with each other

Currently, NegativeAcksTracker#nackedMessages map (ledgerId, entryId) to timestamp, which means multiple nacks from messages in the same batch share single one timestamp.
If we let msg1 redelivered 10s later, then let msg2 redelivered 20s later, these two messages are delivered 20s later together. msg1 will not be redelivered 10s later as the timestamp recorded in NegativeAcksTracker#nackedMessages is overrode by the second nack call.

we can reproduce this problem with test code below:

Consumer consumer = client.newConsumer()
                .topic("persistent://public/default/testNack")
                .subscriptionName("sub2")
                .subscriptionType(SubscriptionType.Shared)
                .negativeAckRedeliveryDelay(20, TimeUnit.SECONDS) // fixed delay with 20s.
                .subscribe();
        // receive first message and nack it.
        Message msg = consumer.receive();
        MessageIdAdv batchMessageId = (MessageIdAdv) msg.getMessageId();
        int batchIndex = batchMessageId.getBatchIndex();
        log.info("Message received, timestamp:{}, message id:{}, batch index:{}", getTime(), batchMessageId, batchIndex);
        consumer.negativeAcknowledge(msg);
        
        // receive the secode message and sleep for 10s, then nack it.
        msg = consumer.receive();
        batchMessageId = (MessageIdAdv) msg.getMessageId();
        batchIndex = batchMessageId.getBatchIndex();
        log.info("Message received, timestamp:{}, message id:{}, batch index:{}", getTime(), batchMessageId, batchIndex);
        Thread.sleep(10000);
        consumer.negativeAcknowledge(msg);

We expect the second message redelivered 10s later than the first message, as it call nack 10s later than the first one.
However, we will receive two messages together.
image

You can also reproduce this problem with the test code in this PR: org.apache.pulsar.client.impl.NegativeAcksTest#testNegativeAcksWithBatch

Modifications

Refactor the NegativeAcksTracker to solve the above problems.

Verifying this change

  • Make sure that the change passes the CI checks.

This change added tests and can be verified as follows:

(example:)

  • Added integration tests for end-to-end deployment with large payloads (10MB)
  • Extended integration test for recovery after broker failure

Does this pull request potentially affect one of the following parts:

If the box was checked, please highlight the changes

  • Dependencies (add or upgrade a dependency)
  • The public API
  • The schema
  • The default values of configurations
  • The threading model
  • The binary protocol
  • The REST endpoints
  • The admin CLI options
  • The metrics
  • Anything that affects deployment

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

Matching PR in forked repository

PR in forked repository: thetumbled#64

@thetumbled
Copy link
Member Author

I will implement a space efficient map structure for ConcurrentTripleLong2LongHashMap later, which for now use hashmap to ensure the logical correction.

@lhotari
Copy link
Member

lhotari commented Nov 14, 2024

I will implement a space efficient map structure for ConcurrentTripleLong2LongHashMap later, which for now use hashmap to ensure the logical correction.

@thetumbled Fastutil could be a better source of space efficient map data structures. I believe that there's a templating solution where it's possible to generate code for efficient implementations. In this case, is there a need for the data structure to be concurrent? Following a single writer principle could result in simpler and more performant designs. One way to address message passing from other threads to a single writer thread is to use message passing queues from JCTools which we already use in Pulsar. Just some food for thought.

@thetumbled
Copy link
Member Author

I will implement a space efficient map structure for ConcurrentTripleLong2LongHashMap later, which for now use hashmap to ensure the logical correction.

@thetumbled Fastutil could be a better source of space efficient map data structures. I believe that there's a templating solution where it's possible to generate code for efficient implementations. In this case, is there a need for the data structure to be concurrent? Following a single writer principle could result in simpler and more performant designs. One way to address message passing from other threads to a single writer thread is to use message passing queues from JCTools which we already use in Pulsar. Just some food for thought.

If any solution from Fastutil prove to be more space efficient, i am glad to adopt it. Nack Tracker will consume enormous amount of memory, we have to choose the best one.

@lhotari
Copy link
Member

lhotari commented Nov 14, 2024

If any solution from Fastutil prove to be more space efficient, i am glad to adopt it. Nack Tracker will consume enormous amount of memory, we have to choose the best one.

The main purpose of Fastutil is performance and space efficiency.
Instead of adding very complex keys such as triple keys, there's a chance to use a datastructure that is hierarchical. That's the approach I took in the PendingAcksMap class:

private final Long2ObjectSortedMap<Long2ObjectSortedMap<IntIntPair>> pendingAcks;

There's a huge benefit when the keys are primitives and you can only achieve that with a hierarchical data structure.
In the long run, I'd rather get rid of our custom collection implementations instead of adding more of them.
It's not trivial to create bug free collections. Using existing libraries for that purpose is strongly preferred.

@thetumbled Have you considered a hierarchical data structure? (such as map of maps)

@thetumbled
Copy link
Member Author

If any solution from Fastutil prove to be more space efficient, i am glad to adopt it. Nack Tracker will consume enormous amount of memory, we have to choose the best one.

The main purpose of Fastutil is performance and space efficiency. Instead of adding very complex keys such as triple keys, there's a chance to use a datastructure that is hierarchical. That's the approach I took in the PendingAcksMap class:

private final Long2ObjectSortedMap<Long2ObjectSortedMap<IntIntPair>> pendingAcks;

There's a huge benefit when the keys are primitives and you can only achieve that with a hierarchical data structure.
In the long run, I'd rather get rid of our custom collection implementations instead of adding more of them.
It's not trivial to create bug free collections. Using existing libraries for that purpose is strongly preferred.
@thetumbled Have you considered a hierarchical data structure? (such as map of maps)

It is a good point, i will test it.

@lhotari
Copy link
Member

lhotari commented Nov 14, 2024

If any solution from Fastutil prove to be more space efficient, i am glad to adopt it. Nack Tracker will consume enormous amount of memory, we have to choose the best one.

The main purpose of Fastutil is performance and space efficiency. Instead of adding very complex keys such as triple keys, there's a chance to use a datastructure that is hierarchical. That's the approach I took in the PendingAcksMap class:

private final Long2ObjectSortedMap<Long2ObjectSortedMap<IntIntPair>> pendingAcks;

There's a huge benefit when the keys are primitives and you can only achieve that with a hierarchical data structure.
In the long run, I'd rather get rid of our custom collection implementations instead of adding more of them.
It's not trivial to create bug free collections. Using existing libraries for that purpose is strongly preferred.
@thetumbled Have you considered a hierarchical data structure? (such as map of maps)

It is a good point, i will test it.

@thetumbled In certain cases when tracking existence (true/false), it's worth considering to use space efficient bit maps. In Pulsar, we use the RoaringBitmap library.
I think that it should be used for storing nacks.

@lhotari
Copy link
Member

lhotari commented Nov 14, 2024

I think that it should be used for storing nacks.

I guess it's not applicable in this case.

@thetumbled I checked the NegativeAcksTracker class and it seems that the actual key is (ledgerId, entryId).
The partitionIndex and timestamp are part of the value.
partitionIndex doesn't have to be a long value.

It's easy to implement (ledgerId, entryId) as map of maps.

@lhotari
Copy link
Member

lhotari commented Nov 14, 2024

This is a very poor solution in the current implementation:

nackedMessages.forEach((ledgerId, entryId, partitionIndex, timestamp) -> {
if (timestamp < now) {
MessageId msgId = new MessageIdImpl(ledgerId, entryId,
// need to covert non-partitioned topic partition index to -1
(int) (partitionIndex == NON_PARTITIONED_TOPIC_PARTITION_INDEX ? -1 : partitionIndex));
addChunkedMessageIdsAndRemoveFromSequenceMap(msgId, messagesToRedeliver, this.consumer);
messagesToRedeliver.add(msgId);
}
});

There should be a separate datastructure (a list or queue) which contains the entries in timestamp order. The benefit of that is that iterating could stop after the timestamp condition no longer holds.

@lhotari
Copy link
Member

lhotari commented Nov 14, 2024

@thetumbled It looks like there's no need for a map data structure in the first place. That's completely unnecessary for implementing NegativeAcksTracker

@thetumbled
Copy link
Member Author

@thetumbled It looks like there's no need for a map data structure in the first place. That's completely unnecessary for implementing NegativeAcksTracker

You are right. We need to improve the code execution efficiency too. some kind of structures sorted by timestamp.

@lhotari
Copy link
Member

lhotari commented Nov 14, 2024

You are right. We need to improve the code execution efficiency too. some kind of structures sorted by timestamp.

Fastutil contains multiple PriorityQueue implementations: https://fastutil.di.unimi.it/docs/it/unimi/dsi/fastutil/PriorityQueue.html.
For example, this would work: https://fastutil.di.unimi.it/docs/it/unimi/dsi/fastutil/objects/ObjectArrayPriorityQueue.html
or this one: https://fastutil.di.unimi.it/docs/it/unimi/dsi/fastutil/objects/ObjectHeapPriorityQueue.html

@thetumbled thetumbled changed the title [fix][client] fix multiple nack from messages in the same batch interfere each other. [improve][client] PIP-393: Improve performance of Negative Acknowledgement Nov 15, 2024
@thetumbled
Copy link
Member Author

You are right. We need to improve the code execution efficiency too. some kind of structures sorted by timestamp.

Fastutil contains multiple PriorityQueue implementations: https://fastutil.di.unimi.it/docs/it/unimi/dsi/fastutil/PriorityQueue.html. For example, this would work: https://fastutil.di.unimi.it/docs/it/unimi/dsi/fastutil/objects/ObjectArrayPriorityQueue.html or this one: https://fastutil.di.unimi.it/docs/it/unimi/dsi/fastutil/objects/ObjectHeapPriorityQueue.html

I propose a pip to fix several issues with nack tracker, with a new data structure :

Long2ObjectSortedMap<Long2ObjectMap<Roaring64Bitmap>> nackedMessages = new Long2ObjectAVLTreeMap<>();

This PR become the implementation PR for PIP-393: #23601.
I will implement PIP-393 soon.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
doc-not-needed Your PR changes do not impact docs
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants