diff --git a/src/main/java/iudx/auditing/server/rabbitmq/consumers/AuditMessageConsumer.java b/src/main/java/iudx/auditing/server/rabbitmq/consumers/AuditMessageConsumer.java index 12e8930..9033ee8 100644 --- a/src/main/java/iudx/auditing/server/rabbitmq/consumers/AuditMessageConsumer.java +++ b/src/main/java/iudx/auditing/server/rabbitmq/consumers/AuditMessageConsumer.java @@ -21,7 +21,8 @@ public class AuditMessageConsumer implements RabitMqConsumer { private final RabbitMQClient client; private final MessageProcessService msgService; - private final QueueOptions options = new QueueOptions().setMaxInternalQueueSize(10000).setKeepMostRecent(true).setAutoAck(false); + private final QueueOptions options = + new QueueOptions().setMaxInternalQueueSize(10000).setKeepMostRecent(true).setAutoAck(false); public AuditMessageConsumer( Vertx vertx, RabbitMQOptions options, MessageProcessService msgService) { @@ -47,6 +48,8 @@ private void consume() { RabbitMQConsumer mqConsumer = receiveResultHandler.result(); mqConsumer.handler( message -> { + mqConsumer.pause(); + LOGGER.debug("message consumption paused."); JsonObject request = new JsonObject(); try { long deliveryTag = message.envelope().getDeliveryTag(); @@ -61,14 +64,20 @@ private void consume() { LOGGER.info("Audit message published in databases."); client.basicAck( handler.result().getLong(DELIVERY_TAG), false); + mqConsumer.resume(); + LOGGER.debug("message consumption resumed"); } else { LOGGER.error( "Error while publishing messages for processing " + handler.cause().getMessage()); + mqConsumer.resume(); + LOGGER.debug("message consumption resumed"); } }); } catch (Exception e) { LOGGER.error("Error while decoding the message"); + mqConsumer.resume(); + LOGGER.debug("message consumption resumed"); } }); } else {