Skip to content

Commit

Permalink
PulsarJMSProducer: do not ovverride some system headers if already set (
Browse files Browse the repository at this point in the history
  • Loading branch information
mukesh-ctds authored Aug 22, 2024
1 parent ccf0188 commit c2691ae
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -192,9 +192,15 @@ private void getProducerAndSend(Destination destination, Message message) throws
message.setObjectProperty(prop.getKey(), prop.getValue());
}
message.setJMSPriority(priority);
message.setJMSCorrelationIDAsBytes(correlationID);
message.setJMSType(jmsType);
message.setJMSReplyTo(jmsReplyTo);
if (message.getJMSCorrelationIDAsBytes() == null) {
message.setJMSCorrelationIDAsBytes(correlationID);
}
if (message.getJMSType() == null) {
message.setJMSType(jmsType);
}
if (message.getJMSReplyTo() == null) {
message.setJMSReplyTo(jmsReplyTo);
}

if (completionListener != null) {
producer.send(destination, message, deliveryMode, priority, timeToLive, completionListener);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package com.datastax.oss.pulsar.jms;

import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;

import com.datastax.oss.pulsar.jms.utils.PulsarContainerExtension;
import javax.jms.Destination;
import javax.jms.JMSConsumer;
import javax.jms.JMSContext;
import javax.jms.JMSProducer;
import javax.jms.TextMessage;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.extension.RegisterExtension;

@Timeout(value = 1, unit = TimeUnit.MINUTES)
public class JMSMessageHeaderTest {

@RegisterExtension
static PulsarContainerExtension pulsarContainer = new PulsarContainerExtension();

@Test
public void sendMessageWithHeaderReceiveJMSContext() throws Exception {

Map<String, Object> properties = pulsarContainer.buildJMSConnectionProperties();
try (PulsarConnectionFactory factory = new PulsarConnectionFactory(properties); ) {
try (JMSContext context = factory.createContext()) {
Destination destination =
context.createQueue("persistent://public/default/test-" + UUID.randomUUID());
try (JMSConsumer consumer = context.createConsumer(destination)) {
JMSProducer producer = context.createProducer();
String message = "Hey JMS!";
TextMessage expTextMessage = context.createTextMessage(message);
expTextMessage.setJMSReplyTo(destination);
expTextMessage.setJMSType("mytype");
expTextMessage.setJMSCorrelationIDAsBytes(new byte[] {1, 2, 3});
producer.send(destination, expTextMessage);
TextMessage actTextMessage = (TextMessage) consumer.receive();

assertNotNull(actTextMessage);
assertEquals(expTextMessage.getText(), actTextMessage.getText());
assertEquals(expTextMessage.getJMSReplyTo(), actTextMessage.getJMSReplyTo());
assertEquals(expTextMessage.getJMSType(), actTextMessage.getJMSType());
assertArrayEquals(new byte[] {1, 2, 3}, actTextMessage.getJMSCorrelationIDAsBytes());
}
}
}
}
}

0 comments on commit c2691ae

Please sign in to comment.