From 61d9a3b53be104eb18350e827466b10c1d7887e5 Mon Sep 17 00:00:00 2001 From: ananjaykumar2 Date: Tue, 25 Jun 2024 10:06:02 +0530 Subject: [PATCH] removed internal queue limit --- .../rabbitmq/consumers/AuditMessageConsumer.java | 15 ++++++--------- .../consumers/SubscriptionMonitoringConsumer.java | 9 +++++++-- 2 files changed, 13 insertions(+), 11 deletions(-) diff --git a/src/main/java/iudx/auditing/server/rabbitmq/consumers/AuditMessageConsumer.java b/src/main/java/iudx/auditing/server/rabbitmq/consumers/AuditMessageConsumer.java index e86853c..402b5ff 100644 --- a/src/main/java/iudx/auditing/server/rabbitmq/consumers/AuditMessageConsumer.java +++ b/src/main/java/iudx/auditing/server/rabbitmq/consumers/AuditMessageConsumer.java @@ -21,8 +21,7 @@ public class AuditMessageConsumer implements RabitMqConsumer { private final RabbitMQClient client; private final MessageProcessService msgService; - private final QueueOptions options = - new QueueOptions().setKeepMostRecent(true).setMaxInternalQueueSize(1000).setAutoAck(false); + private final QueueOptions options = new QueueOptions().setKeepMostRecent(true).setAutoAck(false); public AuditMessageConsumer( Vertx vertx, RabbitMQOptions options, MessageProcessService msgService) { @@ -48,8 +47,6 @@ private void consume() { RabbitMQConsumer mqConsumer = receiveResultHandler.result(); mqConsumer.handler( message -> { - mqConsumer.pause(); - LOGGER.debug("message consumption paused."); JsonObject request = new JsonObject(); try { long deliveryTag = message.envelope().getDeliveryTag(); @@ -70,22 +67,22 @@ private void consume() { LOGGER.error( "Error while publishing messages for processing " + handler.cause().getMessage()); - mqConsumer.resume(); - LOGGER.debug("message consumption resumed"); } }); } catch (Exception e) { LOGGER.error("Error while decoding the message"); - mqConsumer.resume(); - LOGGER.debug("message consumption resumed"); } }); + } else { + LOGGER.error( + "failed to consume message from auditing-messages Q : {}", + receiveResultHandler.cause().getMessage()); } }); }) .onFailure( failureHandler -> { - LOGGER.fatal("Rabbit client startup failed for Latest message Q consumer."); + LOGGER.fatal("Rabbit client startup failed for auditing-messages Q consumer."); }); } } diff --git a/src/main/java/iudx/auditing/server/rabbitmq/consumers/SubscriptionMonitoringConsumer.java b/src/main/java/iudx/auditing/server/rabbitmq/consumers/SubscriptionMonitoringConsumer.java index fe6a669..2245dc2 100644 --- a/src/main/java/iudx/auditing/server/rabbitmq/consumers/SubscriptionMonitoringConsumer.java +++ b/src/main/java/iudx/auditing/server/rabbitmq/consumers/SubscriptionMonitoringConsumer.java @@ -26,7 +26,7 @@ public class SubscriptionMonitoringConsumer implements RabitMqConsumer { private final MessageProcessService msgService; private final QueueOptions options = - new QueueOptions().setKeepMostRecent(true).setMaxInternalQueueSize(1000).setAutoAck(false); + new QueueOptions().setKeepMostRecent(true).setAutoAck(false); public SubscriptionMonitoringConsumer( Vertx vertx, RabbitMQOptions options, MessageProcessService msgService) { @@ -87,12 +87,17 @@ private void consume() { } } }); + } else { + LOGGER.error( + "failed to consume message from subscription-monitoring Q : {}", + receiveResultHandler.cause().getMessage()); } }); }) .onFailure( failureHandler -> { - LOGGER.fatal("Rabbit client startup failed for subscription message Q consumer."); + LOGGER.fatal( + "Rabbit client startup failed for subscription-monitoring message Q consumer."); }); }