Skip to content

Commit

Permalink
Merge pull request #96 from ananjaykumar2/fix/rmqconsumer-problem
Browse files Browse the repository at this point in the history
testing
  • Loading branch information
ankitmashu authored Jun 25, 2024
2 parents de46305 + 78b91b2 commit 6dd9d8f
Showing 1 changed file with 10 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ public class AuditMessageConsumer implements RabitMqConsumer {
private final RabbitMQClient client;
private final MessageProcessService msgService;

private final QueueOptions options = new QueueOptions().setMaxInternalQueueSize(10000).setKeepMostRecent(true).setAutoAck(false);
private final QueueOptions options =
new QueueOptions().setMaxInternalQueueSize(10000).setKeepMostRecent(true).setAutoAck(false);

public AuditMessageConsumer(
Vertx vertx, RabbitMQOptions options, MessageProcessService msgService) {
Expand All @@ -47,6 +48,8 @@ private void consume() {
RabbitMQConsumer mqConsumer = receiveResultHandler.result();
mqConsumer.handler(
message -> {
mqConsumer.pause();
LOGGER.debug("message consumption paused.");
JsonObject request = new JsonObject();
try {
long deliveryTag = message.envelope().getDeliveryTag();
Expand All @@ -61,14 +64,20 @@ private void consume() {
LOGGER.info("Audit message published in databases.");
client.basicAck(
handler.result().getLong(DELIVERY_TAG), false);
mqConsumer.resume();
LOGGER.debug("message consumption resumed");
} else {
LOGGER.error(
"Error while publishing messages for processing "
+ handler.cause().getMessage());
mqConsumer.resume();
LOGGER.debug("message consumption resumed");
}
});
} catch (Exception e) {
LOGGER.error("Error while decoding the message");
mqConsumer.resume();
LOGGER.debug("message consumption resumed");
}
});
} else {
Expand Down

0 comments on commit 6dd9d8f

Please sign in to comment.