Skip to content

Commit

Permalink
Merge pull request #93 from ananjaykumar2/fix/rmqconsumer-problem
Browse files Browse the repository at this point in the history
removed internal queue limit
  • Loading branch information
ankitmashu authored Jun 25, 2024
2 parents 66c9f6e + 61d9a3b commit 60e3349
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@ public class AuditMessageConsumer implements RabitMqConsumer {
private final RabbitMQClient client;
private final MessageProcessService msgService;

private final QueueOptions options =
new QueueOptions().setKeepMostRecent(true).setMaxInternalQueueSize(1000).setAutoAck(false);
private final QueueOptions options = new QueueOptions().setKeepMostRecent(true).setAutoAck(false);

public AuditMessageConsumer(
Vertx vertx, RabbitMQOptions options, MessageProcessService msgService) {
Expand All @@ -48,8 +47,6 @@ private void consume() {
RabbitMQConsumer mqConsumer = receiveResultHandler.result();
mqConsumer.handler(
message -> {
mqConsumer.pause();
LOGGER.debug("message consumption paused.");
JsonObject request = new JsonObject();
try {
long deliveryTag = message.envelope().getDeliveryTag();
Expand All @@ -70,22 +67,22 @@ private void consume() {
LOGGER.error(
"Error while publishing messages for processing "
+ handler.cause().getMessage());
mqConsumer.resume();
LOGGER.debug("message consumption resumed");
}
});
} catch (Exception e) {
LOGGER.error("Error while decoding the message");
mqConsumer.resume();
LOGGER.debug("message consumption resumed");
}
});
} else {
LOGGER.error(
"failed to consume message from auditing-messages Q : {}",
receiveResultHandler.cause().getMessage());
}
});
})
.onFailure(
failureHandler -> {
LOGGER.fatal("Rabbit client startup failed for Latest message Q consumer.");
LOGGER.fatal("Rabbit client startup failed for auditing-messages Q consumer.");
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public class SubscriptionMonitoringConsumer implements RabitMqConsumer {
private final MessageProcessService msgService;

private final QueueOptions options =
new QueueOptions().setKeepMostRecent(true).setMaxInternalQueueSize(1000).setAutoAck(false);
new QueueOptions().setKeepMostRecent(true).setAutoAck(false);

public SubscriptionMonitoringConsumer(
Vertx vertx, RabbitMQOptions options, MessageProcessService msgService) {
Expand Down Expand Up @@ -87,12 +87,17 @@ private void consume() {
}
}
});
} else {
LOGGER.error(
"failed to consume message from subscription-monitoring Q : {}",
receiveResultHandler.cause().getMessage());
}
});
})
.onFailure(
failureHandler -> {
LOGGER.fatal("Rabbit client startup failed for subscription message Q consumer.");
LOGGER.fatal(
"Rabbit client startup failed for subscription-monitoring message Q consumer.");
});
}

Expand Down

0 comments on commit 60e3349

Please sign in to comment.