-
Notifications
You must be signed in to change notification settings - Fork 137
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
jms synchronous call to RabbitMQ (with spring boot context) - setting amqp = true for tmp queues #1208
Comments
I tried adding a reproducer test (see the referenced commit), but didn't quit manage to. maybe you could help me? I have little to no knowledge about jms and/or amqp. |
Hi Timo, I will create a new sample or adapt yours. Since, I have some meetings today, I will provide it a little bit later. You will hear from me, for sure. Thanks and best regards, |
Hi Timo, here we are... sorry there were some more meetings than planed... :). I have prepared a small sample project, showing exactly the issue. It is based on spring boot, apache camel (for processing) and it gives you also a docker-compose.yaml to start the rabbitmq with some predefined exchanges and queues. I guess that there is everything we need. That are the interesting lines:
As we see there incoming messages was nicely processed by the apache camel. It generated a simple result (as json) and trhows it back to a Based on this sample I am going to check if I can find a workaround by specifying a reply destination... If you have troubles to start the sample, let me now. I will try to response as soon as possible. Best regards, |
awesome! that's something I can work with! you'll hear from me... |
I guess that I made it work with a work-around. I have just pushed a commit. What I did:
@Bean
public JmsEndpoint inOutQueueEndpoint(ConnectionFactory connectionFactory, @Qualifier("fooQueueDestination") RMQDestination fooQueueDestination, @Qualifier("barQueueDestination") RMQDestination barQueueDestination) {
return CitrusEndpoints
.jms()
// .asynchronous()
.synchronous()
.connectionFactory(connectionFactory)
.destination(fooQueueDestination)
.replyDestination("bar.queue")
.destinationResolver((session, destinationName, pubSubDomain) -> barQueueDestination)
// .replyDestination(barQueueDestination)
// .replyDestination(out)
.build();
} I have to admit that it was not easy to find the right constructor for the queue and the correct parameters (replyDestionation) to ensure that also the correct reply attributes were set by citrus. But with this setup it seems to work. But... (there is always a but):
|
awesome you could make it work! I've extracted the following improvement from your comment.. from my understanding. please tell me if this could benefit you: we could improve the AMPQ support by not using .jms()
.synchronous()
.ampq(true)
.build(); something into this direction. that would help? |
Hi Timo,
Sounds great! The logic must also ensure that an temporary queue (like it is already happening for jms) will also be created (with exclusive = true). That would completely cover my case. Why? Since, I could run the example with the bar.queue (output) queue. I started to make some investigations how can I ensure that these temporary queue will be created during the processing, like it is happening for jms (not amqp). That is what I have now: package sample.camel;
import com.rabbitmq.client.Channel;
import com.rabbitmq.jms.admin.RMQConnectionFactory;
import com.rabbitmq.jms.admin.RMQDestination;
import org.citrusframework.dsl.endpoint.CitrusEndpoints;
import org.citrusframework.jms.endpoint.JmsEndpoint;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
@Configuration
//@Import(TodoAppAutoConfiguration.class)
public class SampleCamelRouteEndpointConfig {
Logger logger = LoggerFactory.getLogger(SampleCamelRouteEndpointConfig.class);
private com.rabbitmq.client.Connection rabbitConnection; // Hold the RabbitMQ connection
@Bean
public RMQConnectionFactory jmsConnectionFactory() throws IOException, TimeoutException {
RMQConnectionFactory connectionFactory = new RMQConnectionFactory();
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
connectionFactory.setVirtualHost("/");
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672); // Replace with appropriate port
// Initialize RabbitMQ core connection to manage manually
initializeRabbitConnection(connectionFactory);
return connectionFactory;
}
private void initializeRabbitConnection(RMQConnectionFactory jmsConnectionFactory) throws IOException, TimeoutException {
// Create a RabbitMQ ConnectionFactory using the same settings as the RMQConnectionFactory
com.rabbitmq.client.ConnectionFactory factory = new com.rabbitmq.client.ConnectionFactory();
factory.setHost(jmsConnectionFactory.getHost());
factory.setPort(jmsConnectionFactory.getPort());
factory.setUsername(jmsConnectionFactory.getUsername());
factory.setPassword(jmsConnectionFactory.getPassword());
factory.setVirtualHost(jmsConnectionFactory.getVirtualHost());
// Open a connection to RabbitMQ and store it
this.rabbitConnection = factory.newConnection();
}
@Bean(name = "fooQueueDestination")
public RMQDestination fooQueueDestination() {
return new RMQDestination("foo.queue", "", "foo.queue", "");
}
@Bean(name = "temporaryQueueName")
public String temporaryQueueName() throws IOException, TimeoutException {
// Check if the temporary queue name is already cached
// Use the existing RabbitMQ connection to declare a temporary queue
try (Channel channel = rabbitConnection.createChannel()) {
// Declare a temporary queue with a unique name, exclusive and auto-delete properties
String tempQueueName = channel.queueDeclare("", false, false, true, null).getQueue();
logger.info("Temporary queue created: {}", tempQueueName);
return tempQueueName;
}
}
@Bean
public JmsEndpoint inOutQueueEndpoint(
@Qualifier("jmsConnectionFactory") RMQConnectionFactory connectionFactory,
@Qualifier("fooQueueDestination") RMQDestination fooQueueDestination,
@Qualifier("temporaryQueueName") String temporaryQueueName
) {
logger.info("Endpoint is using this {}", temporaryQueueName);
RMQDestination tempQueueDestination = new RMQDestination(temporaryQueueName, "", temporaryQueueName, temporaryQueueName);
tempQueueDestination.setAmqp(true); // Ensure AMQP protocol is used
tempQueueDestination.setQueue(true); // Ensure this is recognized as a queue
return CitrusEndpoints
.jms()
.synchronous()
.connectionFactory(connectionFactory)
.destination(fooQueueDestination)
.replyDestination(tempQueueDestination)
.build();
}
} As you see I started to create with Well, it would help a lot in the case of amqp if the temporary queue could be build up (with exclusive = true) to gain more comfort. Some notes about this. I have revised the logic of the private RMQMessageConsumer createConsumerInternal(RMQDestination dest, String uuidTag, boolean durableSubscriber, String jmsSelector) throws JMSException {
String consumerTag = uuidTag != null ? uuidTag : generateJmsConsumerQueueName();
logger.trace("create consumer for destination '{}' with consumerTag '{}' and selector '{}'", dest, consumerTag, jmsSelector);
declareDestinationIfNecessary(dest);
if (!dest.isQueue()) {
String subscriptionName = consumerTag;
Subscription subscription = this.subscriptions.get(durableSubscriber, subscriptionName);
if (subscription == null) {
// it is unshared, non-durable, creating a transient subscription instance
subscription = new Subscription(subscriptionName, subscriptionName, false, false, jmsSelector, false);
}
subscription.createTopology(dest, this, this.channel);
consumerTag = subscription.queue();
}
RMQMessageConsumer consumer = new RMQMessageConsumer(this, dest, consumerTag, getConnection().isStopped(),
jmsSelector, this.requeueOnMessageListenerException, this.receivingContextConsumer,
this.requeueOnTimeout);
this.consumers.add(consumer);
return consumer;
} and here is the void declareDestinationIfNecessary(RMQDestination destination) throws JMSException {
if (destination != null && !destination.isAmqp() && !destination.isDeclared()) {
if (destination.isQueue()) {
declareRMQQueue(destination, null, false, true);
} else {
declareTopic(destination);
}
}
} What I do not understand here, is why the In consequence, this "automatic queue creation process" of a reply queue will never work for my amqp reply queues. I haven't found any explanation, yet, why the RMQ Client is making this assumption. One last sentence, it would help a lot if the entire process using a amqp queue could be set-up automatically, like it is the case for JMS. Thanks and best regards, |
thinking out lout; a different builder - exclusively for ampq - would probably make even more sense.. e.g.: CitrusEndpoints.ampq()
.synchronous()
.build(); there would be less potential for confusion this way, probably. I'll have a look at it. edit: and thanks for all the inputs. that helps a lot. |
In my sample project I pushed a commit which finalizes it. The pattern is:
One small pitfalls:
|
Citrus Version
4.1.0
Question
How can I enable amqp for temporary created RMQDestinations (jms-temp-queues) during a synchronous citrus invocation?
What I've tried so far
Some notes before:
The use case seems to be very simple. I have this test case:
Currently I am getting some when the temporary jms consumer (waiting for the response message) is getting the message. The stack shows:
Some findings:
After some debugging I could narrow down that the implicitly created jms queue (by citrus) waiting on the response message is not able to handle correctly the response object.
The application is working, also the correct json message was generated. In the debugger I was also able to convert the byte[] into a string and I have seen the json string which I have expected (I debugged the receive, when the response message was processed). --> This let me assume that the backend and calling logic is working as expected. I could also verify this in my logs and traces of the application.
I have also seen in the debugger, that for the temporary created queue
amqp = false
was set:Additional information
Thanks a lot and I am looking forward to hear from you :).
The text was updated successfully, but these errors were encountered: