JMS Configuration Abstraction with Multi-Connection for queue listeners and producers, built on top of spring boot JMS. This library offers a performant setup for JMS Clients.
There are some scenarios covered by the library:
- Listen messages from a fixed queue.
- Send messages to a fixed and temporary queues.
- Request Reply pattern with automatic temporary queue.
- Request Reply pattern with automatic get message by selector from fixed queue.
Version | Spring Boot | Specification |
---|---|---|
0.6.0 | 2.7.6 | JMS 2 javax |
1.0.1 | 3.0.6 | JMS 3 jakarta |
1.1.0 | 3.1.1 | JMS 3 jakarta |
1.4.1 | 3.2.1 | JMS 3 jakarta |
2.0.0 | 3.2.1 | JMS 3 jakarta |
- Initially available for IBM MQ Clients.
The library can be imported like this:
compile 'com.github.bancolombia:commons-jms-mq:<latest-version-here>'
To listen queues you only should add the next code fragment using the @MQListener
annotation.
@MQListener("DEV.QUEUE.1")
public Mono<Void> process(Message message) throws JMSException {
String text=((TextMessage)message).getText();
return doSomething(text);
}
@MQListener("DEV.QUEUE.1")
public void process(Message message) throws JMSException {
String text=((TextMessage)message).getText();
doSomething(text);
}
This sample will listen for a fixed queue named DEV.QUEUE.1
, the JMS objects structure will be like this:
graph TD
A[ConnectionFactory] -->|create| D(JMSContext)
D -->|create| E(JMSConsumer: Queue)
E -->|notifies|F[MessageListener]
A -->|create| G(JMSContext)
G -->|create| H(JMSConsumer: Queue)
H -->|notifies|F
A -->|create| I(JMSContext)
I -->|create| J(JMSConsumer: Queue)
J -->|notifies|F
The amount of JMSContexts and JMSConsumers is related to the concurrency
annotation attribute, it is based on JMS 2.0.
The amount of Sessions and MessageConsumers is related to the concurrency
annotation attribute, it is based on JMS
1.1.
To send messages exists the @EnableMQGateway
annotation which enables the producers auto-configuration.
This configuration creates a JMS objects structure like this:
graph TD
A[ConnectionFactory] -->|create| B(JMSContext)
A -->|create| C(JMSContext)
A -->|create| D(JMSContext)
B -->|create| E(JMSProducer)
C -->|create| F(JMSProducer)
D -->|create| G(JMSProducer)
B -->|create| H(Destination: default)
C -->|create| I(Destination: default)
D -->|create| J(Destination: default)
E -->|uses| H
F -->|uses| I
G -->|uses| J
The amount of JMSContexts and JMSProducers is related to the concurrency
property see setup, it is based on
JMS 2.0.
@Component
@AllArgsConstructor
@EnableMQGateway(scanBasePackages = "co.com.bancolombia")
public class SampleMQMessageSender {
private final MQMessageSender sender;
// private final MQQueuesContainer container; // Inject it to reference a temporary queue
public Mono<String> send(String message) {
return sender.send(context -> {
Message textMessage = context.createTextMessage(message);
// textMessage.setJMSReplyTo(container.get("any-custom-key")); // Inject the reply to queue from container
return textMessage;
});
}
}
@Component
@AllArgsConstructor
@EnableMQGateway(scanBasePackages = "co.com.bancolombia")
public class SampleMQMessageSender {
private final MQMessageSenderSync sender;
// private final MQQueuesContainer container; // Inject it to reference a temporary queue
public String send(String message) {
return sender.send(context -> {
Message textMessage = context.createTextMessage(message);
// textMessage.setJMSReplyTo(container.get("any-custom-key")); // Inject the reply to queue from container
return textMessage;
});
}
}
This sample shows how to send a message to a default destination queue, also shows how reference an autogenerated temporary queue.
If you need to have another message sender, you can define it with the @MQSender
annotation.
@MQSender(connectionFactory = "domainB")
public interface XDomainSender extends MQMessageSender {
}
In this case we pass a connectionFactory bean called domainB, this configuration allow you to send messages to another broker. Remind that a MQMessageSender can send messages to all queues in a QueueManager, so you only need to have one by Queue Manager.
public Mono<String> sendWithDestination(String message){
return sender.send(destination,context->context.createTextMessage(message));
}
// non reactive projects
public String sendWithDestination(String message){
return sender.send(destination,context->context.createTextMessage(message));
}
This sample shows how you can pass any Destination
as first parameter of send, with it you can send a message to any
dynamic destination.
This is a basic implementation of the Request Reply pattern, basically it creates a temporary queue for responses and starts listening it, it creates its listener and autogenerate an instance that can be pseudo defined by the user as an interface, which implements the interface.
The application that attends the request should follow the replyTo header which is automatically injected through the operation:
textMessage.setJMSReplyTo(temporaryQueue)
This approach is only implemented for reactive projects, so you can define your own interface with at least one of the next interface signatures:
Mono<Message> requestReply(String message);
Mono<Message> requestReply(String message, Duration timeout);
Mono<Message> requestReply(MQMessageCreator messageCreator);
Mono<Message> requestReply(MQMessageCreator messageCreator, Duration timeout);
For example, you define an interface like the next, so it could be auto implemented by the library: this MyRequestReplyTmp
To achieve the auto implementation, you should:
- Annotate the application or a configuration bean with @EnableMQGateway, optionally you can define the base package
@SpringBootApplication(scanBasePackages = "co.com.bancolombia")
@EnableMQGateway(scanBasePackages = "co.com.bancolombia")
public class MainApplication {
public static void main(String[] args) {
SpringApplication.run(MainApplication.class);
}
}
- Annotate the interface with @ReqReply, for example
@ReqReply(requestQueue = "DEV.QUEUE.1") // in queue names you can use ${some.property.name} spring placeholder notation
public interface MyRequestReplyTmp extends MQRequestReply {
}
- Now you can inject your interface in any spring component. MyRequestReplyAdapter
@Component
@AllArgsConstructor
public class MyRequestReplyAdapter implements RequestGateway {
private final MyRequestReplyTmp requestReply;
...
}
Is possible that you require to add the line before the SpringApplication.run(MainApplication.class, args);
like:
public static void main(String[] args) {
System.setProperty("spring.devtools.restart.enabled", "false");
SpringApplication.run(MainApplication.class, args);
}
When the use of a temporary queue is not available for persistent reasons, or lost of messages is not allowed you can use a Request Reply pattern based on a fixed queue, you should consider the next scenarios:
-
Single Queue Manager: In this scenario you should not consider any setup. Following code snippet can show a basic implementation:
@ReqReply(requestQueue = "DEV.QUEUE.1", replyQueue = "DEV.QUEUE.2", queueType = FIXED) public interface MyRequestReply extends MQRequestReply { }
Then inject this interface to your adapter like with temporary queue
-
Multiple Queue Manager or Clustering: In this scenario you should guarantee that:
- the application that attends the request follow the replyTo header.
- set to
true
the propertycommons.jms.input-queue-set-queue-manager
to identify and set the queue manager to the response queue (this guarantees that the application that attends the request send the response to the specific queue manager). - Then the same like with a single Queue Manager
commons.jms.reactive
: Should be set totrue
for reactive (Reactor) projects.
There are two complementary configuration ways. First when creates a method annotated with @MQListener
, you can set
the next properties:
- value: Name of the listening queue, use only when listen for a fixed queue
- concurrency: Number of open connections to listening the queue, applies for fixed and temporary queues.
- connectionFactory: Name of a specific
ConnectionFactory
Bean, used to create the connections for this consumer. - queueCustomizer: Name of a specific
MQQueueCustomizer
Bean, used to customize the listening queue properties before start the consumers.
The next properties can be used when you have a single @MQListener
annotated method, and it can be set in the
application.yaml of your application.
commons.jms.input-concurrency
: Equivalent toconcurrency
annotation property.commons.jms.input-queue
: Equivalent tovalue
annotation property.commons.jms.input-queue-set-queue-manager
: Enable it to set the resolved queue manager when needed.
There are three configuration properties:
commons.jms.output-concurrency
: Number of open connections to send messages to a queue.commons.jms.output-queue
: Name of the default queue to send messages.commons.jms.producer-ttl
: Long value in milliseconds which sets the time to live of a message put onto a queue. A value of 0 means live indefinitely.
commons.jms.max-retries
: Number of retries when the connection is lost.commons.jms.initial-retry-interval-millis
: Initial interval between retries in milliseconds.commons.jms.retry-multiplier
: Multiplier for the interval between retries.
For more information about the connection retry properties, please refer to Resilience4j Retry
This library uses the default bean of kind ConnectionFactory
, you can customize listeners setting the
connectionFactory
attribute of the @MQListener
annotated method.
To customize sender you should override the default MQMessageSenderSync
bean refers to
Custom configurations
If you need multi-broker support you only should define the ConnectionFactory bean with a name and then use this name on each annotation that you need.
This setting is available for:
@MQSender
@MQListener
@ReqReply
You can define custom beans to change default behaviors:
You should create and register many fixed response queues for request reply, in this case you can override the
MQQueueManagerSetter
as following:
@Bean
@ConditionalOnMissingBean(MQQueueManagerSetter.class)
public MQQueueManagerSetter qmSetter(MQProperties properties, MQQueuesContainer container,
@Value("${response.queue.a}") String queueAName,
@Value("${response.queue.b}") String queueBName) {
return (jmsContext, queue) -> {
log.info("Self assigning Queue Manager to listening queue: {}", queue.toString());
MQUtils.setQMNameIfNotSet(jmsContext, queue);
container.registerQueue(properties.getInputQueue(), queue);
// Register response queue a with queue manager assigned
Queue queueA = jmsContext.createQueue(queueAName);
MQUtils.setQMNameIfNotSet(jmsContext, queueA);
container.registerQueue(queueAName, queueA);
// Register response queue b with queue manager assigned
Queue queueB = jmsContext.createQueue(queueBName);
MQUtils.setQMNameIfNotSet(jmsContext, queueB);
container.registerQueue(queueBName, queueB);
};
}
Commons JMS has two health indicators, the first one is the default spring boot jms health indicator, which checks the connection. The second one is the MQHealthIndicator
which checks the listeners connection to the queue manager.
Both health indicators are enabled by default, but you can disable them by setting the next properties:
- application.properties
management.health.jms.enabled=false
- application.yaml
management: health: jms: enabled: false
Change notes:
@MQListener
has removed support to listen a temporary queue, because@ReqReply
use this behaviour by default.@ReqReply
has added support to do a request reply pattern using fixed queues with get message by selector.
Actions:
@EnableMQSelectorMessageListener
has been removed, now you can use@ReqReply
directly usingqueueType
attribute with valueFIXED
.@EnableMQMessageSender
has been removed, now you should use@EnableMQGateway
.@EnableReqReply
has been removed, now you should use@EnableMQGateway
passing the samescanBasePackages
property.- property
replyQueueTemp
has been renamed toreplyQueue
in@ReqReply
. commons.jms.input-queue-alias
has been removed now you only can set the alias withreplyQueue
.
Review the issues, we hear new ideas. Read more Contributing
This repository is licensed under MIT License Copyright (c) 2021 Bancolombia S.A