Skip to content

Commit

Permalink
Upgrade Pulsar client (3.2.2) and improve JMS priority on consumer side
Browse files Browse the repository at this point in the history
  • Loading branch information
nicoloboschi committed Apr 9, 2024
1 parent 04f136d commit 9c620f5
Show file tree
Hide file tree
Showing 14 changed files with 351 additions and 44 deletions.
3 changes: 3 additions & 0 deletions .github/workflows/tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ jobs:
restore-keys: |
${{ runner.os }}-maven-
- name: Setup upterm session
uses: lhotari/action-upterm@v1

- name: Build and test
run: mvn -B clean javadoc:javadoc verify

6 changes: 2 additions & 4 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,8 @@
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<jms.version>2.0.3</jms.version>
<!-- waiting for a Apache Pulsar 2.11.0 release, in the meantime we use DataStax Luna Streaming
that is a fork of Apache Pulsar -->
<pulsar.groupId>org.apache.pulsar</pulsar.groupId>
<pulsar.version>3.0.0</pulsar.version>
<pulsar.version>3.2.2</pulsar.version>
<activemq.version>5.16.1</activemq.version>
<hawtbuf.version>1.11</hawtbuf.version>
<curator.version>5.1.0</curator.version>
Expand Down Expand Up @@ -197,7 +195,7 @@
<dependency>
<groupId>com.github.spotbugs</groupId>
<artifactId>spotbugs-annotations</artifactId>
<version>4.2.3</version>
<version>4.8.3</version>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
Expand Down
5 changes: 5 additions & 0 deletions pulsar-jms-admin-api/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -38,5 +38,10 @@
<artifactId>jakarta.jms-api</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.github.spotbugs</groupId>
<artifactId>spotbugs-annotations</artifactId>
<scope>provided</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package com.datastax.oss.pulsar.jms.api;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.util.List;
import java.util.Map;
import lombok.Data;
Expand All @@ -40,6 +41,7 @@ protected JMSDestinationMetadata(String destination) {

/** The destination maps to a physical topic, partitioned or non-partitioned. */
@ToString
@SuppressFBWarnings({"EI_EXPOSE_REP2", "EI_EXPOSE_REP"})
public abstract static class PhysicalPulsarTopicMetadata extends JMSDestinationMetadata {
public PhysicalPulsarTopicMetadata(
String destination,
Expand Down Expand Up @@ -87,6 +89,7 @@ public boolean isVirtualDestination() {

/** The destination is a JMS Topic, that maps to a Pulsar Topic with a set of Subscriptions. */
@ToString
@SuppressFBWarnings({"EI_EXPOSE_REP2", "EI_EXPOSE_REP"})
public static final class TopicMetadata extends PhysicalPulsarTopicMetadata {

public TopicMetadata(
Expand Down Expand Up @@ -119,6 +122,7 @@ public boolean isTopic() {

/** The destination is a JMS Queue. A Queue is mapped to a single Pulsar Subscription. */
@ToString
@SuppressFBWarnings({"EI_EXPOSE_REP2", "EI_EXPOSE_REP"})
public static final class QueueMetadata extends PhysicalPulsarTopicMetadata {
public QueueMetadata(
String destination,
Expand Down Expand Up @@ -166,6 +170,7 @@ public boolean isTopic() {

/** The Destination is a Virtual Destination, with the set of actual physical destinations. */
@ToString
@SuppressFBWarnings({"EI_EXPOSE_REP2", "EI_EXPOSE_REP"})
public static final class VirtualDestinationMetadata extends JMSDestinationMetadata {
private final boolean multiTopic;
private final boolean regex;
Expand Down Expand Up @@ -193,6 +198,7 @@ public boolean isMultiTopic() {
return multiTopic;
}

@SuppressFBWarnings({"EI_EXPOSE_REP2", "EI_EXPOSE_REP"})
public List<JMSDestinationMetadata> getDestinations() {
return destinations;
}
Expand Down Expand Up @@ -220,6 +226,7 @@ public final String getDestination() {
/** Metadata about a Pulsar Subscription. */
@Data
@ToString
@SuppressFBWarnings({"EI_EXPOSE_REP2", "EI_EXPOSE_REP"})
public static final class SubscriptionMetadata {
private final String subscriptionName;

Expand All @@ -236,6 +243,7 @@ public SubscriptionMetadata(String subscriptionName) {
/** Metadata about a Pulsar Consumer. */
@Data
@ToString
@SuppressFBWarnings({"EI_EXPOSE_REP2", "EI_EXPOSE_REP"})
public static final class ConsumerMetadata {
@Getter private final String consumerName;

Expand All @@ -257,6 +265,7 @@ public ConsumerMetadata(String consumerName) {
/** Metadata about a Pulsar Producer. */
@Data
@ToString
@SuppressFBWarnings({"EI_EXPOSE_REP2", "EI_EXPOSE_REP"})
public static final class ProducerMetadata {
@Getter private final String producerName;

Expand Down
1 change: 1 addition & 0 deletions pulsar-jms-filters/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
<dependency>
<groupId>com.github.spotbugs</groupId>
<artifactId>spotbugs-annotations</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
Expand Down
5 changes: 5 additions & 0 deletions pulsar-jms-integration-tests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@
<artifactId>junit-jupiter</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.github.spotbugs</groupId>
<artifactId>spotbugs-annotations</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-params</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ public class DockerTest {
private static final String TEST_PULSAR_DOCKER_IMAGE_NAME =
System.getProperty("testPulsarDockerImageName");
public static final String LUNASTREAMING = "datastax/lunastreaming:2.10_4.4";
public static final String LUNASTREAMING_31 = "datastax/lunastreaming:3.1_3.0";

@TempDir Path tempDir;

Expand All @@ -74,6 +75,11 @@ public void testLunaStreaming210() throws Exception {
// waiting for Apache Pulsar 2.10.1, in the meantime we use Luna Streaming 2.10.0.x
test(LUNASTREAMING, false);
}
@Test
public void testLunaStreaming31() throws Exception {
test(LUNASTREAMING_31, false);
}


@Test
public void testPulsar292Transactions() throws Exception {
Expand All @@ -92,12 +98,12 @@ public void testPulsar211Transactions() throws Exception {

@Test
public void testPulsar3Transactions() throws Exception {
test("apachepulsar/pulsar:3.0.0", true);
test("apachepulsar/pulsar:3.2.2", true);
}

@Test
public void testNoAuthentication() throws Exception {
test("apachepulsar/pulsar:3.0.0", false, false, false);
test("apachepulsar/pulsar:3.2.2", false, false, false);
}

@Test
Expand All @@ -106,11 +112,22 @@ public void testLunaStreaming210Transactions() throws Exception {
test(LUNASTREAMING, true);
}

@Test
public void testLunaStreaming31Transactions() throws Exception {
// waiting for Apache Pulsar 2.10.1, in the meantime we use Luna Streaming 2.10.0.x
test(LUNASTREAMING_31, true);
}

@Test
public void testLunaStreaming210ServerSideSelectors() throws Exception {
test(LUNASTREAMING, false, true);
}

@Test
public void testLunaStreaming31ServerSideSelectors() throws Exception {
test(LUNASTREAMING_31, false, true);
}

@Test
public void testGenericPulsar() throws Exception {
assumeTrue(TEST_PULSAR_DOCKER_IMAGE_NAME != null && !TEST_PULSAR_DOCKER_IMAGE_NAME.isEmpty());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@
*/
package com.datastax.oss.pulsar.jms;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.util.Enumeration;
import java.util.List;
import java.util.NoSuchElementException;

@SuppressFBWarnings({"EI_EXPOSE_REP2", "EI_EXPOSE_REP"})
public final class CompositeEnumeration implements Enumeration {
private final List<? extends Enumeration> enumerations;
private int currentEnumeration = 0;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
/*
* Copyright DataStax, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.datastax.oss.pulsar.jms;

import java.util.*;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.common.util.collections.GrowableArrayBlockingQueue;

public class MessagePriorityGrowableArrayBlockingQueue extends GrowableArrayBlockingQueue<Message> {

static int getPriority(Message m) {
Integer priority = PulsarMessage.readJMSPriority(m);
return priority == null ? PulsarMessage.DEFAULT_PRIORITY : priority;
}

private final PriorityBlockingQueue<Message> queue;
private final AtomicBoolean terminated = new AtomicBoolean(false);

public MessagePriorityGrowableArrayBlockingQueue() {
this(10);
}

public MessagePriorityGrowableArrayBlockingQueue(int initialCapacity) {
queue =
new PriorityBlockingQueue<>(
initialCapacity,
new Comparator<Message>() {
@Override
public int compare(Message o1, Message o2) {
int priority1 = getPriority(o1);
int priority2 = getPriority(o2);
return Integer.compare(priority2, priority1);
}
});
}

@Override
public Message remove() {
return queue.remove();
}

@Override
public Message poll() {
return queue.poll();
}

@Override
public Message element() {
return queue.element();
}

@Override
public Message peek() {
return queue.peek();
}

@Override
public boolean offer(Message e) {
return queue.offer(e);
}

@Override
public void put(Message e) {
queue.put(e);
}

@Override
public boolean add(Message e) {
return queue.add(e);
}

@Override
public boolean offer(Message e, long timeout, TimeUnit unit) {
return queue.offer(e, timeout, unit);
}

@Override
public Message take() throws InterruptedException {
return queue.take();
}

@Override
public Message poll(long timeout, TimeUnit unit) throws InterruptedException {
return queue.poll(timeout, unit);
}

@Override
public int remainingCapacity() {
return queue.remainingCapacity();
}

@Override
public int drainTo(Collection<? super Message> c) {
return queue.drainTo(c);
}

@Override
public int drainTo(Collection<? super Message> c, int maxElements) {
return queue.drainTo(c, maxElements);
}

@Override
public void clear() {
queue.clear();
}

@Override
public boolean remove(Object o) {
return queue.remove(o);
}

@Override
public int size() {
return queue.size();
}

@Override
public Iterator<Message> iterator() {
return queue.iterator();
}

@Override
public List<Message> toList() {
List<Message> list = new ArrayList<>(size());
forEach(list::add);
return list;
}

@Override
public void forEach(Consumer<? super Message> action) {
queue.forEach(action);
}

@Override
public String toString() {
return queue.toString();
}

@Override
public void terminate(Consumer<Message> itemAfterTerminatedHandler) {
terminated.set(true);
}

@Override
public boolean isTerminated() {
return terminated.get();
}
}
Loading

0 comments on commit 9c620f5

Please sign in to comment.