From e0ae7a3ddc816487c2bc396f5c49a90550675e1b Mon Sep 17 00:00:00 2001 From: Ankit Singh <101859999+ankitmashu@users.noreply.github.com> Date: Tue, 1 Oct 2024 01:14:14 +0530 Subject: [PATCH] publish adaptor , rmq push message change --- .../server/apiserver/ApiServerVerticle.java | 4 +- .../elastic/EsResponseFormatterToJson.java | 2 +- .../databroker/DataBrokerServiceImpl.java | 50 +++++++++++-------- .../server/databroker/DataBrokerVerticle.java | 32 +++++++++--- .../databroker/AdapterEntitiesTest.java | 8 ++- .../server/databroker/DBServiceImplTest.java | 18 ++++--- .../databroker/DataBrokerServiceImplTest.java | 15 +++--- .../databroker/DataBrokerServiceTest.java | 9 ++-- 8 files changed, 89 insertions(+), 49 deletions(-) diff --git a/src/main/java/iudx/resource/server/apiserver/ApiServerVerticle.java b/src/main/java/iudx/resource/server/apiserver/ApiServerVerticle.java index e67fbd0c..1f43d23c 100644 --- a/src/main/java/iudx/resource/server/apiserver/ApiServerVerticle.java +++ b/src/main/java/iudx/resource/server/apiserver/ApiServerVerticle.java @@ -1471,10 +1471,10 @@ public void publishDataFromAdapter(RoutingContext routingContext) { JsonObject requestJson = routingContext.body().asJsonObject(); HttpServerRequest request = routingContext.request(); HttpServerResponse response = routingContext.response(); - String instanceId = request.getHeader(HEADER_HOST); + /*String instanceId = request.getHeader(HEADER_HOST);*/ JsonObject authenticationInfo = new JsonObject(); authenticationInfo.put(API_ENDPOINT, "/iudx/v1/adapter"); - requestJson.put(JSON_INSTANCEID, instanceId); + /*requestJson.put(JSON_INSTANCEID, instanceId);*/ if (request.headers().contains(HEADER_TOKEN)) { authenticationInfo.put(HEADER_TOKEN, request.getHeader(HEADER_TOKEN)); diff --git a/src/main/java/iudx/resource/server/database/elastic/EsResponseFormatterToJson.java b/src/main/java/iudx/resource/server/database/elastic/EsResponseFormatterToJson.java index 670e4ae9..ff42539e 100644 --- a/src/main/java/iudx/resource/server/database/elastic/EsResponseFormatterToJson.java +++ b/src/main/java/iudx/resource/server/database/elastic/EsResponseFormatterToJson.java @@ -11,7 +11,7 @@ import org.apache.logging.log4j.Logger; public class EsResponseFormatterToJson extends AbstractEsSearchResponseFormatter { - private static final Logger LOGGER = LogManager.getLogger(EsResponseFormatterToJson.class); + /*private static final Logger LOGGER = LogManager.getLogger(EsResponseFormatterToJson.class);*/ FileWriter fileWriter; /** diff --git a/src/main/java/iudx/resource/server/databroker/DataBrokerServiceImpl.java b/src/main/java/iudx/resource/server/databroker/DataBrokerServiceImpl.java index 6b11ccf8..43da5219 100644 --- a/src/main/java/iudx/resource/server/databroker/DataBrokerServiceImpl.java +++ b/src/main/java/iudx/resource/server/databroker/DataBrokerServiceImpl.java @@ -40,16 +40,25 @@ public class DataBrokerServiceImpl implements DataBrokerService { JsonObject finalResponse = Util.getResponseJson(BAD_REQUEST_CODE, BAD_REQUEST_DATA, BAD_REQUEST_DATA); CacheService cacheService; + /*RabbitMQOptions iudxRabbitMQOptions;*/ private JsonObject config; private RabbitClient webClient; + private RabbitMQClient iudxRabbitMqClient; public DataBrokerServiceImpl( RabbitClient webClient, PostgresClient pgClient, JsonObject config, - CacheService cacheService) { + CacheService cacheService, + /*RabbitMQOptions iudxConfig, + Vertx vertx,*/ + RabbitMQClient iudxRabbitMqClient) { this.webClient = webClient; this.config = config; + /*this.iudxRabbitMQOptions = iudxConfig; + this.iudxRabbitMqClient = RabbitMQClient.create(vertx,iudxConfig);*/ + + this.iudxRabbitMqClient = iudxRabbitMqClient; this.subscriptionService = new SubscriptionService(this.webClient, pgClient, config, cacheService); this.cacheService = cacheService; @@ -659,11 +668,9 @@ public DataBrokerService listQueueSubscribers( public DataBrokerService publishFromAdaptor( JsonObject request, String vhost, Handler> handler) { JsonObject finalResponse = new JsonObject(); - JsonObject json = new JsonObject(); LOGGER.debug("request Json " + request); if (request != null && !request.isEmpty()) { - json.put("body", request.toString()); JsonObject cacheRequestJson = new JsonObject(); cacheRequestJson.put("type", CATALOGUE_CACHE); @@ -680,27 +687,26 @@ public DataBrokerService publishFromAdaptor( : cacheResult.getString(ID); LOGGER.debug("Info : resourceGroupId " + resourceGroupId); String routingKey = request.getJsonArray("entities").getString(0); + request.remove("entities"); + request.put("id", routingKey); if (resourceGroupId != null && !resourceGroupId.isBlank()) { LOGGER.debug("Info : routingKey " + routingKey); - Buffer buffer = Buffer.buffer(json.toString()); - webClient - .getRabbitmqClient() - .basicPublish( - resourceGroupId, - routingKey, - buffer, - resultHandler -> { - if (resultHandler.succeeded()) { - finalResponse.put(STATUS, HttpStatus.SC_OK); - LOGGER.info("Success : Message published to queue"); - handler.handle(Future.succeededFuture(finalResponse)); - } else { - finalResponse.put(TYPE, HttpStatus.SC_BAD_REQUEST); - LOGGER.error("Fail : " + resultHandler.cause().toString()); - handler.handle( - Future.failedFuture(resultHandler.cause().getMessage())); - } - }); + Buffer buffer = Buffer.buffer(request.encode()); + iudxRabbitMqClient.basicPublish( + resourceGroupId, + routingKey, + buffer, + resultHandler -> { + if (resultHandler.succeeded()) { + finalResponse.put(STATUS, HttpStatus.SC_OK); + LOGGER.info("Success : Message published to queue"); + handler.handle(Future.succeededFuture(finalResponse)); + } else { + finalResponse.put(TYPE, HttpStatus.SC_BAD_REQUEST); + LOGGER.error("Fail : " + resultHandler.cause().toString()); + handler.handle(Future.failedFuture(resultHandler.cause().getMessage())); + } + }); } } else { LOGGER.error("Item not found"); diff --git a/src/main/java/iudx/resource/server/databroker/DataBrokerVerticle.java b/src/main/java/iudx/resource/server/databroker/DataBrokerVerticle.java index dea92bb9..aeaf4305 100644 --- a/src/main/java/iudx/resource/server/databroker/DataBrokerVerticle.java +++ b/src/main/java/iudx/resource/server/databroker/DataBrokerVerticle.java @@ -22,26 +22,28 @@ import iudx.resource.server.databroker.listeners.RevokeClientQlistener; import iudx.resource.server.databroker.listeners.RmqListeners; import iudx.resource.server.databroker.listeners.UniqueAttribQlistener; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; /** * The Data Broker Verticle. * *

Data Broker Verticle

* - *

The Data Broker Verticle implementation in the the IUDX Resource Server exposes the {@link + *

The Data Broker Verticle implementation in the IUDX Resource Server exposes the {@link * iudx.resource.server.databroker.DataBrokerService} over the Vert.x Event Bus. * * @version 1.0 * @since 2020-05-31 */ public class DataBrokerVerticle extends AbstractVerticle { - + private static final Logger LOGGER = LogManager.getLogger(DataBrokerVerticle.class); + /*RabbitMQOptions iudxConfig;*/ private DataBrokerService databroker; private RabbitMQOptions config; private String dataBrokerIp; private int dataBrokerPort; private int dataBrokerManagementPort; - private String dataBrokerVhost; private String dataBrokerUserName; private String dataBrokerPassword; private int connectionTimeout; @@ -66,6 +68,7 @@ public class DataBrokerVerticle extends AbstractVerticle { private PostgresClient pgClient; private CacheService cache; private AsyncService asyncService; + private RabbitMQClient iudxRabbitMqClient; /** * This method is used to start the Verticle. It deploys a verticle in a cluster, registers the @@ -79,7 +82,6 @@ public void start() throws Exception { dataBrokerIp = config().getString("dataBrokerIP"); dataBrokerPort = config().getInteger("dataBrokerPort"); dataBrokerManagementPort = config().getInteger("dataBrokerManagementPort"); - dataBrokerVhost = config().getString("dataBrokerVhost"); dataBrokerUserName = config().getString("dataBrokerUserName"); dataBrokerPassword = config().getString("dataBrokerPassword"); connectionTimeout = config().getInteger("connectionTimeout"); @@ -101,7 +103,6 @@ public void start() throws Exception { config.setPassword(dataBrokerPassword); config.setHost(dataBrokerIp); config.setPort(dataBrokerPort); - config.setVirtualHost(dataBrokerVhost); config.setConnectionTimeout(connectionTimeout); config.setRequestedHeartbeat(requestedHeartbeat); config.setHandshakeTimeout(handshakeTimeout); @@ -109,6 +110,11 @@ public void start() throws Exception { config.setNetworkRecoveryInterval(networkRecoveryInterval); config.setAutomaticRecoveryEnabled(true); + RabbitMQOptions iudxConfig = new RabbitMQOptions(config); + String prodVhost = config().getString(Vhosts.IUDX_PROD.value); + + iudxConfig.setVirtualHost(prodVhost); + webConfig = new WebClientOptions(); webConfig.setKeepAlive(true); webConfig.setConnectTimeout(86400000); @@ -149,7 +155,6 @@ public void start() throws Exception { propObj.put("userName", dataBrokerUserName); propObj.put("password", dataBrokerPassword); - propObj.put("vHost", dataBrokerVhost); propObj.put("databaseIP", databaseIp); propObj.put("databasePort", databasePort); propObj.put("databaseName", databaseName); @@ -164,7 +169,20 @@ public void start() throws Exception { rabbitClient = new RabbitClient(vertx, config, rabbitWebClient, pgClient, config()); cache = CacheService.createProxy(vertx, CACHE_SERVICE_ADDRESS); binder = new ServiceBinder(vertx); - databroker = new DataBrokerServiceImpl(rabbitClient, pgClient, config(), cache); + iudxRabbitMqClient = RabbitMQClient.create(vertx, iudxConfig); + iudxRabbitMqClient + .start() + .onSuccess( + iudxRabbitClientStart -> { + LOGGER.info("RMQ client started for Prod Vhost"); + }) + .onFailure( + iudxRabbitClientStart -> { + LOGGER.fatal("RMQ client startup failed"); + }); + databroker = + new DataBrokerServiceImpl( + rabbitClient, pgClient, config(), cache, /*iudxConfig, vertx,*/ iudxRabbitMqClient); asyncService = AsyncService.createProxy(vertx, ASYNC_SERVICE_ADDRESS); String internalVhost = config().getString(Vhosts.IUDX_INTERNAL.value); diff --git a/src/test/java/iudx/resource/server/databroker/AdapterEntitiesTest.java b/src/test/java/iudx/resource/server/databroker/AdapterEntitiesTest.java index 9a1cebf1..697c5bea 100644 --- a/src/test/java/iudx/resource/server/databroker/AdapterEntitiesTest.java +++ b/src/test/java/iudx/resource/server/databroker/AdapterEntitiesTest.java @@ -82,6 +82,8 @@ public class AdapterEntitiesTest { private static PostgresClient pgClient; private static Configuration appConfig; @Mock static CacheService cacheService; + @Mock + static RabbitMQClient iudxRabbitMQClient; private static final Logger LOGGER = LogManager.getLogger(AdapterEntitiesTest.class); @@ -142,7 +144,9 @@ static void startVertx(Vertx vertx, VertxTestContext testContext) { webConfig.setDefaultHost(dataBrokerIP); webConfig.setDefaultPort(dataBrokerManagementPort); webConfig.setKeepAliveTimeout(86400000); - + RabbitMQOptions iudxConfig = new RabbitMQOptions(config); + String prodVhost = "IUDX"; + iudxConfig.setVirtualHost(prodVhost); /* Create a RabbitMQ Clinet with the configuration and vertx cluster instance. */ client = RabbitMQClient.create(vertx, config); @@ -181,7 +185,7 @@ static void startVertx(Vertx vertx, VertxTestContext testContext) { rabbitMQWebClient = new RabbitWebClient(vertx, webConfig, propObj); pgClient = new PostgresClient(vertx, connectOptions, poolOptions); rabbitMQStreamingClient = new RabbitClient(vertx, config, rabbitMQWebClient, pgClient, brokerConfig); - databroker = new DataBrokerServiceImpl(rabbitMQStreamingClient, pgClient, brokerConfig,cacheService); + databroker = new DataBrokerServiceImpl(rabbitMQStreamingClient, pgClient, brokerConfig,cacheService, /*iudxConfig, vertx,*/ iudxRabbitMQClient); resourceGroup = brokerConfig.getString("testResourceGroup"); resourceServer = brokerConfig.getString("testResourceServer"); diff --git a/src/test/java/iudx/resource/server/databroker/DBServiceImplTest.java b/src/test/java/iudx/resource/server/databroker/DBServiceImplTest.java index 7ae41e19..14e96291 100644 --- a/src/test/java/iudx/resource/server/databroker/DBServiceImplTest.java +++ b/src/test/java/iudx/resource/server/databroker/DBServiceImplTest.java @@ -3,12 +3,14 @@ import io.vertx.core.AsyncResult; import io.vertx.core.Future; import io.vertx.core.Handler; +import io.vertx.core.Vertx; import io.vertx.core.buffer.Buffer; import io.vertx.core.json.JsonArray; import io.vertx.core.json.JsonObject; import io.vertx.junit5.VertxExtension; import io.vertx.junit5.VertxTestContext; import io.vertx.rabbitmq.RabbitMQClient; +import io.vertx.rabbitmq.RabbitMQOptions; import iudx.resource.server.cache.CacheService; import org.apache.http.HttpStatus; import org.junit.jupiter.api.BeforeEach; @@ -45,13 +47,17 @@ public class DBServiceImplTest { @Mock PostgresClient pgClient; @Mock - RabbitMQClient rabbitMQClient; + RabbitMQClient rabbitMQClient,iudxRabbitMQClient; @Mock AsyncResult asyncResult1; DataBrokerServiceImpl databrokerSpy; JsonObject expected; @Mock CacheService cacheService; + @Mock + RabbitMQOptions iudxConfig; + @Mock + Vertx vertx; @BeforeEach public void setUp(VertxTestContext vertxTestContext) { @@ -66,7 +72,7 @@ public void setUp(VertxTestContext vertxTestContext) { request.put("type", HttpStatus.SC_OK); throwableMessage = "Dummy failure message"; when(config.getString(anyString())).thenReturn("internalVhost"); - databroker = new DataBrokerServiceImpl(webClient, pgClient, config,cacheService); + databroker = new DataBrokerServiceImpl(webClient, pgClient, config,cacheService,/* iudxConfig, vertx,*/ iudxRabbitMQClient); databrokerSpy = spy(databroker); vertxTestContext.completeNow(); } @@ -118,7 +124,7 @@ public void test_publishFromAdaptor_Success(VertxTestContext vertxTestContext) { request.put("routingKey", "routingKeyValue"); request.put("type", HttpStatus.SC_OK); request.put("entities", new JsonArray().add("5b7556b5-0779-4c47-9cf2-3f209779aa22")); - when(webClient.getRabbitmqClient()).thenReturn(rabbitMQClient); + /*when(webClient.getRabbitmqClient()).thenReturn(rabbitMQClient);*/ when(asyncResult1.succeeded()).thenReturn(true); JsonObject providerJson = @@ -135,7 +141,7 @@ public AsyncResult answer(InvocationOnMock arg0) throws Throwable { ((Handler>) arg0.getArgument(3)).handle(asyncResult1); return null; } - }).when(rabbitMQClient).basicPublish(anyString(), anyString(), any(Buffer.class), any(Handler.class)); + }).when(iudxRabbitMQClient).basicPublish(anyString(), anyString(), any(Buffer.class), any(Handler.class)); expected.put("status", 200); databroker.publishFromAdaptor(request, vHost, handler -> { @@ -847,7 +853,7 @@ public void test_publishFromAdaptor_Failure(VertxTestContext vertxTestContext) { request.put("routingKey", "routingKeyValue"); request.put("type", HttpStatus.SC_OK); request.put("entities", new JsonArray().add("5b7556b5-0779-4c47-9cf2-3f209779aa22")); - when(webClient.getRabbitmqClient()).thenReturn(rabbitMQClient); + lenient().when(webClient.getRabbitmqClient()).thenReturn(rabbitMQClient); when(asyncResult1.succeeded()).thenReturn(false); when(asyncResult1.cause()).thenReturn(throwable); when(throwable.getMessage()).thenReturn(throwableMessage); @@ -858,7 +864,7 @@ public AsyncResult answer(InvocationOnMock arg0) throws Throwable { ((Handler>) arg0.getArgument(3)).handle(asyncResult1); return null; } - }).when(rabbitMQClient).basicPublish(anyString(), anyString(), any(Buffer.class), any(Handler.class)); + }).when(iudxRabbitMQClient).basicPublish(anyString(), anyString(), any(Buffer.class), any(Handler.class)); expected.put("status", 200); JsonObject providerJson = new JsonObject() diff --git a/src/test/java/iudx/resource/server/databroker/DataBrokerServiceImplTest.java b/src/test/java/iudx/resource/server/databroker/DataBrokerServiceImplTest.java index addb8664..28c260a0 100644 --- a/src/test/java/iudx/resource/server/databroker/DataBrokerServiceImplTest.java +++ b/src/test/java/iudx/resource/server/databroker/DataBrokerServiceImplTest.java @@ -3,25 +3,22 @@ import io.vertx.core.AsyncResult; import io.vertx.core.Future; import io.vertx.core.Handler; -import io.vertx.core.buffer.Buffer; -import io.vertx.core.json.JsonArray; +import io.vertx.core.Vertx; import io.vertx.core.json.JsonObject; -import io.vertx.ext.web.client.HttpResponse; import io.vertx.junit5.VertxExtension; import io.vertx.junit5.VertxTestContext; import io.vertx.rabbitmq.RabbitMQClient; +import io.vertx.rabbitmq.RabbitMQOptions; import iudx.resource.server.cache.CacheService; import org.apache.http.HttpStatus; import org.junit.jupiter.api.*; import org.junit.jupiter.api.extension.ExtendWith; -import org.mockito.ArgumentCaptor; import org.mockito.Mock; import org.mockito.invocation.InvocationOnMock; import org.mockito.junit.jupiter.MockitoExtension; import org.mockito.stubbing.Answer; import static iudx.resource.server.databroker.util.Constants.ID; -import static iudx.resource.server.databroker.util.Constants.USER_ID; import static org.junit.jupiter.api.Assertions.*; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.*; @@ -51,6 +48,12 @@ public class DataBrokerServiceImplTest { JsonObject expected; @Mock CacheService cacheService; + @Mock + RabbitMQOptions iudxConfig; + @Mock + Vertx vertx; + @Mock + RabbitMQClient iudxRabbitMQClient; @BeforeEach public void setUp(VertxTestContext vertxTestContext) { @@ -65,7 +68,7 @@ public void setUp(VertxTestContext vertxTestContext) { request.put("type", HttpStatus.SC_OK); throwableMessage = "Dummy failure message"; when(config.getString(anyString())).thenReturn("internalVhost"); - databroker = new DataBrokerServiceImpl(webClient, pgClient, config,cacheService); + databroker = new DataBrokerServiceImpl(webClient, pgClient, config,cacheService, /*iudxConfig, vertx,*/ iudxRabbitMQClient); databrokerSpy = spy(databroker); vertxTestContext.completeNow(); } diff --git a/src/test/java/iudx/resource/server/databroker/DataBrokerServiceTest.java b/src/test/java/iudx/resource/server/databroker/DataBrokerServiceTest.java index b1fe2078..8f54443b 100644 --- a/src/test/java/iudx/resource/server/databroker/DataBrokerServiceTest.java +++ b/src/test/java/iudx/resource/server/databroker/DataBrokerServiceTest.java @@ -26,7 +26,6 @@ import static iudx.resource.server.databroker.util.Constants.USER_NAME; import static iudx.resource.server.databroker.util.Constants.VHOST; import static iudx.resource.server.databroker.util.Constants.VHOST_IUDX; -import static iudx.resource.server.metering.util.Constants.PROVIDER_ID; import static org.junit.jupiter.api.Assertions.*; import java.io.InputStream; @@ -108,6 +107,8 @@ public class DataBrokerServiceTest { @Mock static CacheService cacheService; + @Mock + static RabbitMQClient iudxRabbitMQClient; private static final Logger LOGGER = LogManager.getLogger(DataBrokerServiceTest.class); @@ -191,7 +192,9 @@ static void startVertx(Vertx vertx, VertxTestContext testContext) { webConfig.setDefaultHost(dataBrokerIP); webConfig.setDefaultPort(dataBrokerManagementPort); webConfig.setKeepAliveTimeout(86400000); - + RabbitMQOptions iudxConfig = new RabbitMQOptions(config); + String prodVhost = "IUDX"; + iudxConfig.setVirtualHost(prodVhost); /* Create a RabbitMQ Clinet with the configuration and vertx cluster instance. */ @@ -233,7 +236,7 @@ static void startVertx(Vertx vertx, VertxTestContext testContext) { rabbitMQWebClient = new RabbitWebClient(vertx, webConfig, propObj); pgClient = new PostgresClient(vertx, connectOptions, poolOptions); rabbitMQStreamingClient = new RabbitClient(vertx, config, rabbitMQWebClient, pgClient, brokerConfig); - databroker = new DataBrokerServiceImpl(rabbitMQStreamingClient, pgClient, brokerConfig,cacheService); + databroker = new DataBrokerServiceImpl(rabbitMQStreamingClient, pgClient, brokerConfig,cacheService, /*iudxConfig, vertx,*/ iudxRabbitMQClient); userid = UUID.randomUUID().toString();