Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug] Consumer receive acknowledged messages from compacted topic after reconnection #23494

Open
3 tasks done
summeriiii opened this issue Oct 21, 2024 · 0 comments · May be fixed by #23495
Open
3 tasks done

[Bug] Consumer receive acknowledged messages from compacted topic after reconnection #23494

summeriiii opened this issue Oct 21, 2024 · 0 comments · May be fixed by #23495
Labels
type/bug The PR fixed a bug or issue reported a bug

Comments

@summeriiii
Copy link
Contributor

summeriiii commented Oct 21, 2024

Search before asking

  • I searched in the issues and found nothing similar.

Read release policy

  • I understand that unsupported versions don't get bug fixes. I will attempt to reproduce the issue on a supported version of Pulsar client and Pulsar broker.

Version

master

Minimal reproduce step

  1. producer send message
  2. trigger compact
  3. consume the compacted topic and acknowledge message
  4. unload the topic and reconnect
  5. re-consume the compacted topic
    public void testIndividualAcknowledgeWithReconnection() throws Exception {
        final String topicName = "persistent://my-property/use/my-ns/testIndividualAcknowledge" + UUID.randomUUID();
        final String subName = "sub1";
        final int numMessages = 10;

        pulsarClient.newConsumer().topic(topicName).subscriptionName(subName)
                .receiverQueueSize(1).readCompacted(true).subscribe().close();

        Map<String, String> expected = new HashMap<>();

        @Cleanup
        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
                .topic(topicName)
                .enableBatching(false)
                .messageRoutingMode(MessageRoutingMode.SinglePartition)
                .create();

        for (int i = 0; i < numMessages; i++) {
            String key = "key" + new Random().nextInt(4);
            String value = ("my-message-" + i);
            producer.newMessage().key(key).value(value).send();
            expected.put(key, value);
        }
        producer.flush();

        // compact the topic
        compact(topicName);

        @Cleanup
        Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING).topic(topicName)
                .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
                .subscriptionName(subName).receiverQueueSize(1).readCompacted(true).subscribe();

        Map<String, String> results = new HashMap<>();
        while (true) {
            Message<String> message = consumer.receive(3, TimeUnit.SECONDS);
            if (message == null) {
                break;
            }
            results.put(message.getKey(), message.getValue());
            consumer.acknowledge(message);
        }
        Awaitility.await().untilAsserted(() ->
                assertEquals(admin.topics().getStats(topicName, true).getSubscriptions().get(subName).getMsgBacklog(),
                        numMessages - expected.size()));

        // unload the topic
        admin.topics().unload(topicName);

        // wait the consumer reconnect
        Awaitility.await().until(() -> admin.topics().getStats(topicName).getSubscriptions() != null);

        // should not receive message
        int count = 0;
        while (true) {
            Message<String> message = consumer.receive(3, TimeUnit.SECONDS);
            if (message == null) {
                break;
            }
            count++;
            results.put(message.getKey(), message.getValue());
            consumer.acknowledge(message);
        }
        assertEquals(count, 0);

        Awaitility.await().untilAsserted(() ->
                assertEquals(admin.topics().getStats(topicName, true).getSubscriptions().get(subName).getMsgBacklog(),
                        numMessages - expected.size()));

        assertEquals(results, expected);
    }

What did you expect to see?

In the step 5, we should receive no message

What did you see instead?

Still receive the acknowledged message

Anything else?

No response

Are you willing to submit a PR?

  • I'm willing to submit a PR!
@summeriiii summeriiii added the type/bug The PR fixed a bug or issue reported a bug label Oct 21, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type/bug The PR fixed a bug or issue reported a bug
Projects
None yet
1 participant