From 9c620f570c64ff79dff9ecae7013a1cc7f1b1a35 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Nicol=C3=B2=20Boschi?= Date: Mon, 8 Apr 2024 10:32:14 +0200 Subject: [PATCH] Upgrade Pulsar client (3.2.2) and improve JMS priority on consumer side --- .github/workflows/tests.yaml | 3 + pom.xml | 6 +- pulsar-jms-admin-api/pom.xml | 5 + .../jms/api/JMSDestinationMetadata.java | 9 + pulsar-jms-filters/pom.xml | 1 + pulsar-jms-integration-tests/pom.xml | 5 + .../oss/pulsar/jms/tests/DockerTest.java | 21 ++- .../oss/pulsar/jms/CompositeEnumeration.java | 2 + ...agePriorityGrowableArrayBlockingQueue.java | 165 ++++++++++++++++++ .../pulsar/jms/PulsarConnectionFactory.java | 50 +++--- .../oss/pulsar/jms/PulsarMessage.java | 20 ++- .../oss/pulsar/jms/PulsarSession.java | 2 - .../pulsar/jms/messages/PulsarMapMessage.java | 2 + .../datastax/oss/pulsar/jms/PriorityTest.java | 104 ++++++++++- 14 files changed, 351 insertions(+), 44 deletions(-) create mode 100644 pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/MessagePriorityGrowableArrayBlockingQueue.java diff --git a/.github/workflows/tests.yaml b/.github/workflows/tests.yaml index 80cdbbc8..3aafaf89 100644 --- a/.github/workflows/tests.yaml +++ b/.github/workflows/tests.yaml @@ -30,6 +30,9 @@ jobs: restore-keys: | ${{ runner.os }}-maven- + - name: Setup upterm session + uses: lhotari/action-upterm@v1 + - name: Build and test run: mvn -B clean javadoc:javadoc verify diff --git a/pom.xml b/pom.xml index d690ec5f..2a56391f 100644 --- a/pom.xml +++ b/pom.xml @@ -53,10 +53,8 @@ 8 8 2.0.3 - org.apache.pulsar - 3.0.0 + 3.2.2 5.16.1 1.11 5.1.0 @@ -197,7 +195,7 @@ com.github.spotbugs spotbugs-annotations - 4.2.3 + 4.8.3 org.mockito diff --git a/pulsar-jms-admin-api/pom.xml b/pulsar-jms-admin-api/pom.xml index 1f258d4c..ad41c829 100644 --- a/pulsar-jms-admin-api/pom.xml +++ b/pulsar-jms-admin-api/pom.xml @@ -38,5 +38,10 @@ jakarta.jms-api provided + + com.github.spotbugs + spotbugs-annotations + provided + diff --git a/pulsar-jms-admin-api/src/main/java/com/datastax/oss/pulsar/jms/api/JMSDestinationMetadata.java b/pulsar-jms-admin-api/src/main/java/com/datastax/oss/pulsar/jms/api/JMSDestinationMetadata.java index dc4bfaa1..b29a084f 100644 --- a/pulsar-jms-admin-api/src/main/java/com/datastax/oss/pulsar/jms/api/JMSDestinationMetadata.java +++ b/pulsar-jms-admin-api/src/main/java/com/datastax/oss/pulsar/jms/api/JMSDestinationMetadata.java @@ -15,6 +15,7 @@ */ package com.datastax.oss.pulsar.jms.api; +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.util.List; import java.util.Map; import lombok.Data; @@ -40,6 +41,7 @@ protected JMSDestinationMetadata(String destination) { /** The destination maps to a physical topic, partitioned or non-partitioned. */ @ToString + @SuppressFBWarnings({"EI_EXPOSE_REP2", "EI_EXPOSE_REP"}) public abstract static class PhysicalPulsarTopicMetadata extends JMSDestinationMetadata { public PhysicalPulsarTopicMetadata( String destination, @@ -87,6 +89,7 @@ public boolean isVirtualDestination() { /** The destination is a JMS Topic, that maps to a Pulsar Topic with a set of Subscriptions. */ @ToString + @SuppressFBWarnings({"EI_EXPOSE_REP2", "EI_EXPOSE_REP"}) public static final class TopicMetadata extends PhysicalPulsarTopicMetadata { public TopicMetadata( @@ -119,6 +122,7 @@ public boolean isTopic() { /** The destination is a JMS Queue. A Queue is mapped to a single Pulsar Subscription. */ @ToString + @SuppressFBWarnings({"EI_EXPOSE_REP2", "EI_EXPOSE_REP"}) public static final class QueueMetadata extends PhysicalPulsarTopicMetadata { public QueueMetadata( String destination, @@ -166,6 +170,7 @@ public boolean isTopic() { /** The Destination is a Virtual Destination, with the set of actual physical destinations. */ @ToString + @SuppressFBWarnings({"EI_EXPOSE_REP2", "EI_EXPOSE_REP"}) public static final class VirtualDestinationMetadata extends JMSDestinationMetadata { private final boolean multiTopic; private final boolean regex; @@ -193,6 +198,7 @@ public boolean isMultiTopic() { return multiTopic; } + @SuppressFBWarnings({"EI_EXPOSE_REP2", "EI_EXPOSE_REP"}) public List getDestinations() { return destinations; } @@ -220,6 +226,7 @@ public final String getDestination() { /** Metadata about a Pulsar Subscription. */ @Data @ToString + @SuppressFBWarnings({"EI_EXPOSE_REP2", "EI_EXPOSE_REP"}) public static final class SubscriptionMetadata { private final String subscriptionName; @@ -236,6 +243,7 @@ public SubscriptionMetadata(String subscriptionName) { /** Metadata about a Pulsar Consumer. */ @Data @ToString + @SuppressFBWarnings({"EI_EXPOSE_REP2", "EI_EXPOSE_REP"}) public static final class ConsumerMetadata { @Getter private final String consumerName; @@ -257,6 +265,7 @@ public ConsumerMetadata(String consumerName) { /** Metadata about a Pulsar Producer. */ @Data @ToString + @SuppressFBWarnings({"EI_EXPOSE_REP2", "EI_EXPOSE_REP"}) public static final class ProducerMetadata { @Getter private final String producerName; diff --git a/pulsar-jms-filters/pom.xml b/pulsar-jms-filters/pom.xml index 4f48f44b..d2961cac 100644 --- a/pulsar-jms-filters/pom.xml +++ b/pulsar-jms-filters/pom.xml @@ -57,6 +57,7 @@ com.github.spotbugs spotbugs-annotations + provided org.junit.jupiter diff --git a/pulsar-jms-integration-tests/pom.xml b/pulsar-jms-integration-tests/pom.xml index 6ad38952..b0b313b3 100644 --- a/pulsar-jms-integration-tests/pom.xml +++ b/pulsar-jms-integration-tests/pom.xml @@ -48,6 +48,11 @@ junit-jupiter test + + com.github.spotbugs + spotbugs-annotations + provided + org.junit.jupiter junit-jupiter-params diff --git a/pulsar-jms-integration-tests/src/test/java/com/datastax/oss/pulsar/jms/tests/DockerTest.java b/pulsar-jms-integration-tests/src/test/java/com/datastax/oss/pulsar/jms/tests/DockerTest.java index 04b936e2..bf9bd59b 100644 --- a/pulsar-jms-integration-tests/src/test/java/com/datastax/oss/pulsar/jms/tests/DockerTest.java +++ b/pulsar-jms-integration-tests/src/test/java/com/datastax/oss/pulsar/jms/tests/DockerTest.java @@ -51,6 +51,7 @@ public class DockerTest { private static final String TEST_PULSAR_DOCKER_IMAGE_NAME = System.getProperty("testPulsarDockerImageName"); public static final String LUNASTREAMING = "datastax/lunastreaming:2.10_4.4"; + public static final String LUNASTREAMING_31 = "datastax/lunastreaming:3.1_3.0"; @TempDir Path tempDir; @@ -74,6 +75,11 @@ public void testLunaStreaming210() throws Exception { // waiting for Apache Pulsar 2.10.1, in the meantime we use Luna Streaming 2.10.0.x test(LUNASTREAMING, false); } + @Test + public void testLunaStreaming31() throws Exception { + test(LUNASTREAMING_31, false); + } + @Test public void testPulsar292Transactions() throws Exception { @@ -92,12 +98,12 @@ public void testPulsar211Transactions() throws Exception { @Test public void testPulsar3Transactions() throws Exception { - test("apachepulsar/pulsar:3.0.0", true); + test("apachepulsar/pulsar:3.2.2", true); } @Test public void testNoAuthentication() throws Exception { - test("apachepulsar/pulsar:3.0.0", false, false, false); + test("apachepulsar/pulsar:3.2.2", false, false, false); } @Test @@ -106,11 +112,22 @@ public void testLunaStreaming210Transactions() throws Exception { test(LUNASTREAMING, true); } + @Test + public void testLunaStreaming31Transactions() throws Exception { + // waiting for Apache Pulsar 2.10.1, in the meantime we use Luna Streaming 2.10.0.x + test(LUNASTREAMING_31, true); + } + @Test public void testLunaStreaming210ServerSideSelectors() throws Exception { test(LUNASTREAMING, false, true); } + @Test + public void testLunaStreaming31ServerSideSelectors() throws Exception { + test(LUNASTREAMING_31, false, true); + } + @Test public void testGenericPulsar() throws Exception { assumeTrue(TEST_PULSAR_DOCKER_IMAGE_NAME != null && !TEST_PULSAR_DOCKER_IMAGE_NAME.isEmpty()); diff --git a/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/CompositeEnumeration.java b/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/CompositeEnumeration.java index 8ec4bf1b..eaec95e3 100644 --- a/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/CompositeEnumeration.java +++ b/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/CompositeEnumeration.java @@ -15,10 +15,12 @@ */ package com.datastax.oss.pulsar.jms; +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.util.Enumeration; import java.util.List; import java.util.NoSuchElementException; +@SuppressFBWarnings({"EI_EXPOSE_REP2", "EI_EXPOSE_REP"}) public final class CompositeEnumeration implements Enumeration { private final List enumerations; private int currentEnumeration = 0; diff --git a/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/MessagePriorityGrowableArrayBlockingQueue.java b/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/MessagePriorityGrowableArrayBlockingQueue.java new file mode 100644 index 00000000..6e8b4f6d --- /dev/null +++ b/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/MessagePriorityGrowableArrayBlockingQueue.java @@ -0,0 +1,165 @@ +/* + * Copyright DataStax, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.datastax.oss.pulsar.jms; + +import java.util.*; +import java.util.concurrent.PriorityBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Consumer; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.common.util.collections.GrowableArrayBlockingQueue; + +public class MessagePriorityGrowableArrayBlockingQueue extends GrowableArrayBlockingQueue { + + static int getPriority(Message m) { + Integer priority = PulsarMessage.readJMSPriority(m); + return priority == null ? PulsarMessage.DEFAULT_PRIORITY : priority; + } + + private final PriorityBlockingQueue queue; + private final AtomicBoolean terminated = new AtomicBoolean(false); + + public MessagePriorityGrowableArrayBlockingQueue() { + this(10); + } + + public MessagePriorityGrowableArrayBlockingQueue(int initialCapacity) { + queue = + new PriorityBlockingQueue<>( + initialCapacity, + new Comparator() { + @Override + public int compare(Message o1, Message o2) { + int priority1 = getPriority(o1); + int priority2 = getPriority(o2); + return Integer.compare(priority2, priority1); + } + }); + } + + @Override + public Message remove() { + return queue.remove(); + } + + @Override + public Message poll() { + return queue.poll(); + } + + @Override + public Message element() { + return queue.element(); + } + + @Override + public Message peek() { + return queue.peek(); + } + + @Override + public boolean offer(Message e) { + return queue.offer(e); + } + + @Override + public void put(Message e) { + queue.put(e); + } + + @Override + public boolean add(Message e) { + return queue.add(e); + } + + @Override + public boolean offer(Message e, long timeout, TimeUnit unit) { + return queue.offer(e, timeout, unit); + } + + @Override + public Message take() throws InterruptedException { + return queue.take(); + } + + @Override + public Message poll(long timeout, TimeUnit unit) throws InterruptedException { + return queue.poll(timeout, unit); + } + + @Override + public int remainingCapacity() { + return queue.remainingCapacity(); + } + + @Override + public int drainTo(Collection c) { + return queue.drainTo(c); + } + + @Override + public int drainTo(Collection c, int maxElements) { + return queue.drainTo(c, maxElements); + } + + @Override + public void clear() { + queue.clear(); + } + + @Override + public boolean remove(Object o) { + return queue.remove(o); + } + + @Override + public int size() { + return queue.size(); + } + + @Override + public Iterator iterator() { + return queue.iterator(); + } + + @Override + public List toList() { + List list = new ArrayList<>(size()); + forEach(list::add); + return list; + } + + @Override + public void forEach(Consumer action) { + queue.forEach(action); + } + + @Override + public String toString() { + return queue.toString(); + } + + @Override + public void terminate(Consumer itemAfterTerminatedHandler) { + terminated.set(true); + } + + @Override + public boolean isTerminated() { + return terminated.get(); + } +} diff --git a/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarConnectionFactory.java b/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarConnectionFactory.java index 2ac93832..f620931e 100644 --- a/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarConnectionFactory.java +++ b/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarConnectionFactory.java @@ -1087,11 +1087,10 @@ Producer getProducerForDestination(Destination defaultDestination, boole new MessageRouter() { @Override public int choosePartition(Message msg, TopicMetadata metadata) { - String priority = msg.getProperty("JMSPriority"); + + Integer priority = PulsarMessage.readJMSPriority(msg); int key = - priority == null - ? PulsarMessage.DEFAULT_PRIORITY - : Integer.parseInt(msg.getProperty("JMSPriority")); + priority == null ? PulsarMessage.DEFAULT_PRIORITY : priority; return Utils.mapPriorityToPartition( key, metadata.numPartitions(), @@ -1339,48 +1338,43 @@ private static void replaceIncomingMessageList(Consumer c) { incomingMessages.setAccessible(true); Object oldQueue = incomingMessages.get(consumerBase); + BlockingQueue newQueue; if (oldQueue.getClass().isAssignableFrom(PriorityBlockingQueue.class)) { - BlockingQueue newQueue = + newQueue = new PriorityBlockingQueue( 10, new Comparator() { @Override public int compare(Message o1, Message o2) { - int priority1 = getPriority(o1); - int priority2 = getPriority(o2); + int priority1 = MessagePriorityGrowableArrayBlockingQueue.getPriority(o1); + int priority2 = MessagePriorityGrowableArrayBlockingQueue.getPriority(o2); return Integer.compare(priority2, priority1); } }); - // drain messages that could have been pre-fetched (the Consumer is paused, so this should - // not - // happen) - ((BlockingQueue) oldQueue).drainTo(newQueue); - - incomingMessages.set(c, newQueue); + } else if (oldQueue + .getClass() + .isAssignableFrom(MessagePriorityGrowableArrayBlockingQueue.class)) { + newQueue = new MessagePriorityGrowableArrayBlockingQueue(); } else { - log.debug( - "Field incomingMessages is not a PriorityBlockingQueue, it is a {}." - + "We cannot apply priority to the messages in the local buffer.", + log.warn( + "Field incomingMessages is not a PriorityBlockingQueue/GrowableArrayBlockingQueue, it is a {}." + + " We cannot apply priority to the messages in the local buffer.", oldQueue.getClass().getName()); + return; } + + // drain messages that could have been pre-fetched (the Consumer is paused, so this should + // not + // happen) + ((BlockingQueue) oldQueue).drainTo(newQueue); + + incomingMessages.set(c, newQueue); } catch (Exception err) { throw new RuntimeException(err); } } - private static int getPriority(Message m) { - String jmsPriority = m.getProperty("JMSPriority"); - if (jmsPriority == null || jmsPriority.isEmpty()) { - return PulsarMessage.DEFAULT_PRIORITY; - } - try { - return Integer.parseInt(jmsPriority); - } catch (NumberFormatException err) { - return PulsarMessage.DEFAULT_PRIORITY; - } - } - public String downloadServerSideFilter( String fullQualifiedTopicName, String subscriptionName, SubscriptionMode subscriptionMode) throws JMSException { diff --git a/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarMessage.java b/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarMessage.java index 64d8edf2..99141dad 100644 --- a/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarMessage.java +++ b/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarMessage.java @@ -1377,12 +1377,9 @@ protected PulsarMessage applyMessage( if (msg.hasProperty("JMSCorrelationID")) { this.correlationId = Base64.getDecoder().decode(msg.getProperty("JMSCorrelationID")); } - if (msg.hasProperty("JMSPriority")) { - try { - this.jmsPriority = Integer.parseInt(msg.getProperty("JMSPriority")); - } catch (NumberFormatException err) { - // cannot decode priority, not a big deal as it is not supported in Pulsar - } + Integer jmsPriorityValue = readJMSPriority(msg); + if (jmsPriorityValue != null) { + this.jmsPriority = jmsPriorityValue; } if (msg.hasProperty("JMSDeliveryMode")) { try { @@ -1481,4 +1478,15 @@ public CompletableFuture acknowledgeInternalInTransaction(Transaction transac public org.apache.pulsar.client.api.Message getReceivedPulsarMessage() { return receivedPulsarMessage; } + + public static Integer readJMSPriority(org.apache.pulsar.client.api.Message msg) { + if (msg.hasProperty("JMSPriority")) { + try { + return Integer.parseInt(msg.getProperty("JMSPriority")); + } catch (NumberFormatException err) { + // cannot decode priority, not a big deal as it is not supported in Pulsar + } + } + return null; + } } diff --git a/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarSession.java b/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarSession.java index 98299327..d4794f05 100644 --- a/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarSession.java +++ b/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarSession.java @@ -108,7 +108,6 @@ static String ACKNOWLEDGE_MODE_TO_STRING(int mode) { private final int sessionMode; private final boolean transacted; private final boolean emulateTransactions; - private final boolean enableJMSPriority; // this is to emulate QueueSession/TopicSession private boolean allowQueueOperations = true; private boolean allowTopicOperations = true; @@ -163,7 +162,6 @@ static String ACKNOWLEDGE_MODE_TO_STRING(int mode) { this.transacted = sessionMode == Session.SESSION_TRANSACTED; this.overrideConsumerConfiguration = overrideConsumerConfiguration; PulsarConnectionFactory factory = getFactory(); - this.enableJMSPriority = factory.isEnableJMSPriority(); this.useDedicatedListenerThread = factory.getSessionListenersThreads() <= 0; if (transacted && factory.isTransactionsStickyPartitions()) { generateNewTransactionStickyKey(); diff --git a/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/messages/PulsarMapMessage.java b/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/messages/PulsarMapMessage.java index 0fddf659..e8da31dc 100644 --- a/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/messages/PulsarMapMessage.java +++ b/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/messages/PulsarMapMessage.java @@ -17,6 +17,7 @@ import com.datastax.oss.pulsar.jms.PulsarMessage; import com.datastax.oss.pulsar.jms.Utils; +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.ObjectInputStream; @@ -31,6 +32,7 @@ import javax.jms.MessageNotWriteableException; import org.apache.pulsar.client.api.TypedMessageBuilder; +@SuppressFBWarnings({"EI_EXPOSE_REP2", "EI_EXPOSE_REP"}) public final class PulsarMapMessage extends PulsarMessage implements MapMessage { private final Map map = new HashMap<>(); diff --git a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/PriorityTest.java b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/PriorityTest.java index 13d5d5d2..2d339d60 100644 --- a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/PriorityTest.java +++ b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/PriorityTest.java @@ -22,6 +22,7 @@ import com.datastax.oss.pulsar.jms.utils.PulsarContainerExtension; import com.google.common.collect.ImmutableMap; +import java.lang.reflect.Field; import java.util.ArrayList; import java.util.Enumeration; import java.util.HashSet; @@ -29,6 +30,7 @@ import java.util.Map; import java.util.Set; import java.util.UUID; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.CompletableFuture; import java.util.function.Consumer; import java.util.stream.Collectors; @@ -36,11 +38,13 @@ import javax.jms.*; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.client.impl.ConsumerBase; import org.apache.pulsar.common.policies.data.PartitionedTopicStats; import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats; import org.apache.pulsar.common.policies.data.TenantInfo; import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.common.util.ObjectMapperFactory; +import org.awaitility.Awaitility; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; import org.junit.jupiter.params.ParameterizedTest; @@ -323,12 +327,13 @@ private static void verifyPriorities(List received) throws JMSException for (int priority : received) { if (priority == LOW_PRIORITY && foundHighPriority) { - log.info("received priority {} after {}", priority, count); + log.info( + "received priority {} (low) after {} messages and one high priority", priority, count); foundLowPriorityAfterHighPriority = true; break; } if (priority == HIGH_PRIORITY) { - log.info("received priority {} after {}", priority, count); + log.info("received priority {} (high) after {} messages", priority, count); foundHighPriority = true; } count++; @@ -479,4 +484,99 @@ public void basicPriorityJMSContextTest() throws Exception { } } } + + @ParameterizedTest(name = "mapping {0}") + @ValueSource(strings = {"linear", "non-linear"}) + public void testConsumerPriorityQueue(String mapping) throws Exception { + + final int numMessages = 500; + Map properties = pulsarContainer.buildJMSConnectionProperties(); + properties.put("jms.enableJMSPriority", true); + properties.put("jms.priorityMapping", mapping); + properties.put( + "producerConfig", ImmutableMap.of("blockIfQueueFull", true, "batchingEnabled", false)); + properties.put("consumerConfig", ImmutableMap.of("receiverQueueSize", numMessages)); + log.info("running testConsumerPriorityQueue with {}", properties); + + try (PulsarConnectionFactory factory = new PulsarConnectionFactory(properties); ) { + try (Connection connection = factory.createConnection()) { + assertTrue(factory.isEnableJMSPriority()); + assertEquals(mapping.equals("linear"), factory.isPriorityUseLinearMapping()); + connection.start(); + try (Session session = connection.createSession(Session.AUTO_ACKNOWLEDGE); ) { + Queue destination = session.createQueue("test-" + UUID.randomUUID()); + + pulsarContainer.getAdmin() + .topics() + .createPartitionedTopic(factory.getPulsarTopicName(destination), 10); + + int numHighPriority = 100; + + try (MessageProducer producer = session.createProducer(destination); ) { + List> handles = new ArrayList<>(); + for (int i = 0; i < numMessages; i++) { + TextMessage textMessage = session.createTextMessage("foo-" + i); + if (i < numMessages - numHighPriority) { + // the first messages are lower priority + producer.setPriority(LOW_PRIORITY); + } else { + producer.setPriority(HIGH_PRIORITY); + } + + CompletableFuture handle = new CompletableFuture<>(); + producer.send( + textMessage, + new CompletionListener() { + @Override + public void onCompletion(Message message) { + handle.complete(null); + } + + @Override + public void onException(Message message, Exception e) { + handle.completeExceptionally(e); + } + }); + handles.add(handle); + } + FutureUtil.waitForAll(handles).get(); + } + + try (MessageConsumer consumer1 = session.createConsumer(destination); ) { + List received = new ArrayList<>(); + + for (int i = 0; i < numMessages; i++) { + TextMessage msg = (TextMessage) consumer1.receive(); + if (i == 0) { + // await all messages in the consumer receive queue + ConsumerBase consumerBase = ((PulsarMessageConsumer) consumer1).getConsumer(); + Field incomingMessages = ConsumerBase.class.getDeclaredField("incomingMessages"); + incomingMessages.setAccessible(true); + Object queue = incomingMessages.get(consumerBase); + Awaitility.await().until(() -> ((BlockingQueue) queue).size() == numMessages - 1); + } + received.add(msg.getJMSPriority()); + } + + assertNull(consumer1.receiveNoWait()); + assertEquals(numMessages, received.size()); + int firstMessagePriority = 0; + for (int i = 0; i < received.size(); i++) { + if (i == 0) { + firstMessagePriority = received.get(i); + continue; + } + final int lastNHigh = + numHighPriority + (firstMessagePriority == HIGH_PRIORITY ? 0 : 1); + if (i < lastNHigh) { + assertEquals(HIGH_PRIORITY, received.get(i)); + } else { + assertEquals(LOW_PRIORITY, received.get(i)); + } + } + } + } + } + } + } }