Skip to content

Commit

Permalink
Merge pull request #97 from ananjaykumar2/fix/rmqconsumer-problem
Browse files Browse the repository at this point in the history
testing
  • Loading branch information
ankitmashu authored Jun 25, 2024
2 parents 6dd9d8f + 1b4e4d3 commit 9712f7e
Showing 1 changed file with 9 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,35 +48,37 @@ private void consume() {
RabbitMQConsumer mqConsumer = receiveResultHandler.result();
mqConsumer.handler(
message -> {
mqConsumer.pause();
LOGGER.debug("message consumption paused.");
/* mqConsumer.pause();
LOGGER.debug("message consumption paused.");*/
JsonObject request = new JsonObject();
try {
long deliveryTag = message.envelope().getDeliveryTag();
request =
message.body().toJsonObject().put(DELIVERY_TAG, deliveryTag);
LOGGER.info("message received from {}", request.getString(ORIGIN));
Future<JsonObject> processResult =
client.basicAck(deliveryTag, false);

/* Future<JsonObject> processResult =
msgService.processAuditEventMessages(request);
processResult.onComplete(
handler -> {
if (handler.succeeded()) {
LOGGER.info("Audit message published in databases.");
client.basicAck(
handler.result().getLong(DELIVERY_TAG), false);
mqConsumer.resume();
// mqConsumer.resume();
LOGGER.debug("message consumption resumed");
} else {
LOGGER.error(
"Error while publishing messages for processing "
+ handler.cause().getMessage());
mqConsumer.resume();
// mqConsumer.resume();
LOGGER.debug("message consumption resumed");
}
});
});*/
} catch (Exception e) {
LOGGER.error("Error while decoding the message");
mqConsumer.resume();
// mqConsumer.resume();
LOGGER.debug("message consumption resumed");
}
});
Expand Down

0 comments on commit 9712f7e

Please sign in to comment.