Pulsar Reader + Pulsar Consumer: DLQ message processing #265
Replies: 1 comment
-
Hi! Since this library is aiming to be very close to official Java library, you should investigate how it's done there (also you'll get more response from general Pulsar community). Once you get your answer for Java, we can check whether this works for Pulsar.Client and if it doesn't - to work on the fix. Some comments about your issues:
Reader api immediately acknowledges all the messages as you can see here
This shouldn't happen, acknowledge should never hang, since it doesn't wait for any response from server, you are welcome to create a failing test to fix. |
Beta Was this translation helpful? Give feedback.
-
Version used:
Lib: 2.16.
Pulsar Ver: 3.1.0
Runtime: Net. 6.0
Hi fsprojects/pulsar-client-dotnet community,
I'm currently trying to design + implement an approach for managing DLQ messages and their reprocessing.
I am following the usual standard of all topics having a respective DLQ topic of the form "topicname-subscriptionname-DLQ", with an initial subscription ("DLQ-sub") to retain the messages.
I have built a simple UI where a DLQ topic can be selected, my backend then uses the Reader API to pull the messages contents down (along with their accompanying message Ids; i.e. ledgerId, entryId etc.), so that then the UI user can manually change the message body, and hit save to re-process it.
The first step of this reprocessMessage endpoint is to produce the updated message into the original topic (this works).
However, I am struggling to understand how to remove that message from the DLQ after it has been re-processed. With the Reader API being unable to acknowledge messages, I was left to use the Consumer API. I then investigated the Consumer API on whether I can subscribe to the DLQ topic using the "DLQ-sub", pass the messageId returned from the Reader API, directly acknowledge that message without having to consume any messages. When I attempt to do this, the AcknowledgeAsync() hangs indefinitely. I have also tried to use the consumer SeekAsync method, passing in the MessageId of the respective message I wish to acknowledge, however I receive the exception "Illegal MessageId; must use Earliest or Latest"
I could perhaps achieve the result I need by using the Consumer API start at Earliest, and to consume/iterate through the DLQ topic/DLQ sub until the matching message for the messageId in question is found, and then acknowledge it. However, this seems incredibly inefficient. especially if there could be 1000s+ of messages.
I have searched the Pulsar Admin API for a function that could perform this but I was unable to locate an appropriate endpoint. If one exists, please let me know.
There appears to be a lack of documentation on all things DLQ re-processing.
So, the key question is:
Q. I need to delete a specific message from a topic+subscription, having already retrieved the MessageId from the Reader API. How can this be accomplished?
Beta Was this translation helpful? Give feedback.
All reactions