Skip to content

Commit

Permalink
[fix][test] flaky test `testCanRecoverConsumptionWhenLiftMaxUnAckedMe…
Browse files Browse the repository at this point in the history
…ssagesRestriction` (#18726)

(cherry picked from commit 2d205c9)
  • Loading branch information
labuladong authored and nodece committed Mar 8, 2024
1 parent f38ea11 commit 05701d7
Showing 1 changed file with 56 additions and 80 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,32 +18,35 @@
*/
package org.apache.pulsar.client.impl;

import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import lombok.Cleanup;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.pulsar.client.api.BatcherBuilder;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.KeySharedPolicy;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Range;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.util.Murmur3_32Hash;
import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;

@Test(groups = "broker-impl")
public class KeySharedSubscriptionTest extends ProducerConsumerBase {
Expand All @@ -70,91 +73,58 @@ public Object[][] subType() {
@Test(dataProvider = "subType")
public void testCanRecoverConsumptionWhenLiftMaxUnAckedMessagesRestriction(SubscriptionType subscriptionType)
throws PulsarClientException {
PulsarClient pulsarClient = PulsarClient.builder().
serviceUrl(lookupUrl.toString())
.build();
final int totalMsg = 1000;
String topic = "broker-close-test-" + RandomStringUtils.randomAlphabetic(5);
Map<Consumer<?>, List<MessageId>> nameToId = Maps.newConcurrentMap();
Map<Consumer<?>, List<MessageId>> nameToId = new ConcurrentHashMap<>();
Set<MessageId> pubMessages = Sets.newConcurrentHashSet();
Set<MessageId> recMessages = Sets.newConcurrentHashSet();
AtomicLong lastActiveTime = new AtomicLong();
AtomicBoolean canAcknowledgement = new AtomicBoolean(false);

@Cleanup
Consumer<byte[]> consumer1 = pulsarClient.newConsumer()
.topic(topic)
.subscriptionName("sub-1")
.subscriptionType(subscriptionType)
.consumerName("con-1")
.messageListener((cons1, msg) -> {
lastActiveTime.set(System.currentTimeMillis());
nameToId.computeIfAbsent(cons1,(k) -> new ArrayList<>())
.add(msg.getMessageId());
recMessages.add(msg.getMessageId());
if (canAcknowledgement.get()) {
try {
cons1.acknowledge(msg);
} catch (PulsarClientException e) {
throw new RuntimeException(e);
}
}
})
.subscribe();
@Cleanup
Consumer<byte[]> consumer2 = pulsarClient.newConsumer()
.topic(topic)
.subscriptionName("sub-1")
.subscriptionType(subscriptionType)
.messageListener((cons2, msg) -> {
lastActiveTime.set(System.currentTimeMillis());
nameToId.computeIfAbsent(cons2,(k) -> new ArrayList<>())
.add(msg.getMessageId());
recMessages.add(msg.getMessageId());
if (canAcknowledgement.get()) {
try {
cons2.acknowledge(msg);
} catch (PulsarClientException e) {
throw new RuntimeException(e);
List<Consumer<?>> consumerList = new ArrayList<>();
// create 3 consumers
for (int i = 0; i < 3; i++) {
ConsumerBuilder<byte[]> builder = pulsarClient.newConsumer()
.topic(topic)
.subscriptionName("sub-1")
.subscriptionType(subscriptionType)
.messageListener((consumer, msg) -> {
lastActiveTime.set(System.currentTimeMillis());
nameToId.computeIfAbsent(consumer, (k) -> new ArrayList<>())
.add(msg.getMessageId());
recMessages.add(msg.getMessageId());
if (canAcknowledgement.get()) {
try {
consumer.acknowledge(msg);
} catch (PulsarClientException e) {
throw new RuntimeException(e);
}
}
}
})
.consumerName("con-2")
.subscribe();
@Cleanup
Consumer<byte[]> consumer3 = pulsarClient.newConsumer()
.topic(topic)
.subscriptionName("sub-1")
.subscriptionType(subscriptionType)
.messageListener((cons3, msg) -> {
lastActiveTime.set(System.currentTimeMillis());
nameToId.computeIfAbsent(cons3,(k) -> new ArrayList<>())
.add(msg.getMessageId());
recMessages.add(msg.getMessageId());
if (canAcknowledgement.get()) {
try {
cons3.acknowledge(msg);
} catch (PulsarClientException e) {
throw new RuntimeException(e);
}
}
})
.consumerName("con-3")
.subscribe();
});

if (subscriptionType == SubscriptionType.Key_Shared) {
// ensure every consumer can be distributed messages
int hash = Murmur3_32Hash.getInstance().makeHash(("key-" + i).getBytes())
% KeySharedPolicy.DEFAULT_HASH_RANGE_SIZE;
builder.keySharedPolicy(KeySharedPolicy.stickyHashRange().ranges(Range.of(hash, hash)));
}

consumerList.add(builder.subscribe());
}

@Cleanup
Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topic)
.enableBatching(true)
.batchingMaxPublishDelay(1, TimeUnit.MILLISECONDS)
// We chose 9 because the maximum unacked message is 10
.batchingMaxMessages(9)
.batcherBuilder(BatcherBuilder.KEY_BASED)
.create();

for (int i = 0; i < totalMsg; i++) {
producer.sendAsync(UUID.randomUUID().toString()
.getBytes(StandardCharsets.UTF_8))
.thenAccept(pubMessages::add);
byte[] msg = UUID.randomUUID().toString().getBytes(StandardCharsets.UTF_8);
producer.newMessage().key("key-" + (i % 3)).value(msg)
.sendAsync().thenAccept(pubMessages::add);
}

// Wait for all consumers can not read more messages. the consumers are stuck by max unacked messages.
Expand All @@ -176,7 +146,7 @@ public void testCanRecoverConsumptionWhenLiftMaxUnAckedMessagesRestriction(Subsc

// Wait for all consumers to continue receiving messages.
Awaitility.await()
.atMost(30, TimeUnit.SECONDS)
.atMost(15, TimeUnit.SECONDS)
.pollDelay(5, TimeUnit.SECONDS)
.until(() ->
(System.currentTimeMillis() - lastActiveTime.get()) > TimeUnit.SECONDS.toMillis(5));
Expand All @@ -186,5 +156,11 @@ public void testCanRecoverConsumptionWhenLiftMaxUnAckedMessagesRestriction(Subsc
Assert.assertEquals(pubMessages.size(), totalMsg);
Assert.assertEquals(pubMessages.size(), recMessages.size());
Assert.assertTrue(recMessages.containsAll(pubMessages));

// cleanup
producer.close();
for (Consumer<?> consumer : consumerList) {
consumer.close();
}
}
}

0 comments on commit 05701d7

Please sign in to comment.