Skip to content

Commit

Permalink
testing
Browse files Browse the repository at this point in the history
  • Loading branch information
ananjaykumar2 committed Jun 25, 2024
1 parent d258cfb commit 78b91b2
Showing 1 changed file with 10 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ public class AuditMessageConsumer implements RabitMqConsumer {
private final RabbitMQClient client;
private final MessageProcessService msgService;

private final QueueOptions options = new QueueOptions().setMaxInternalQueueSize(10000).setKeepMostRecent(true).setAutoAck(false);
private final QueueOptions options =
new QueueOptions().setMaxInternalQueueSize(10000).setKeepMostRecent(true).setAutoAck(false);

public AuditMessageConsumer(
Vertx vertx, RabbitMQOptions options, MessageProcessService msgService) {
Expand All @@ -47,6 +48,8 @@ private void consume() {
RabbitMQConsumer mqConsumer = receiveResultHandler.result();
mqConsumer.handler(
message -> {
mqConsumer.pause();
LOGGER.debug("message consumption paused.");
JsonObject request = new JsonObject();
try {
long deliveryTag = message.envelope().getDeliveryTag();
Expand All @@ -61,14 +64,20 @@ private void consume() {
LOGGER.info("Audit message published in databases.");
client.basicAck(
handler.result().getLong(DELIVERY_TAG), false);
mqConsumer.resume();
LOGGER.debug("message consumption resumed");
} else {
LOGGER.error(
"Error while publishing messages for processing "
+ handler.cause().getMessage());
mqConsumer.resume();
LOGGER.debug("message consumption resumed");
}
});
} catch (Exception e) {
LOGGER.error("Error while decoding the message");
mqConsumer.resume();
LOGGER.debug("message consumption resumed");
}
});
} else {
Expand Down

0 comments on commit 78b91b2

Please sign in to comment.