Skip to content

Commit

Permalink
Merge pull request #548 from ankitmashu/fix/publish_adaptor_ingestion_1
Browse files Browse the repository at this point in the history
publish adaptor , rmq push message change
  • Loading branch information
gopal-mahajan authored Oct 1, 2024
2 parents 3b93196 + e0ae7a3 commit 79f988b
Show file tree
Hide file tree
Showing 8 changed files with 89 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1471,10 +1471,10 @@ public void publishDataFromAdapter(RoutingContext routingContext) {
JsonObject requestJson = routingContext.body().asJsonObject();
HttpServerRequest request = routingContext.request();
HttpServerResponse response = routingContext.response();
String instanceId = request.getHeader(HEADER_HOST);
/*String instanceId = request.getHeader(HEADER_HOST);*/
JsonObject authenticationInfo = new JsonObject();
authenticationInfo.put(API_ENDPOINT, "/iudx/v1/adapter");
requestJson.put(JSON_INSTANCEID, instanceId);
/*requestJson.put(JSON_INSTANCEID, instanceId);*/
if (request.headers().contains(HEADER_TOKEN)) {
authenticationInfo.put(HEADER_TOKEN, request.getHeader(HEADER_TOKEN));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import org.apache.logging.log4j.Logger;

public class EsResponseFormatterToJson extends AbstractEsSearchResponseFormatter {
private static final Logger LOGGER = LogManager.getLogger(EsResponseFormatterToJson.class);
/*private static final Logger LOGGER = LogManager.getLogger(EsResponseFormatterToJson.class);*/
FileWriter fileWriter;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,16 +40,25 @@ public class DataBrokerServiceImpl implements DataBrokerService {
JsonObject finalResponse =
Util.getResponseJson(BAD_REQUEST_CODE, BAD_REQUEST_DATA, BAD_REQUEST_DATA);
CacheService cacheService;
/*RabbitMQOptions iudxRabbitMQOptions;*/
private JsonObject config;
private RabbitClient webClient;
private RabbitMQClient iudxRabbitMqClient;

public DataBrokerServiceImpl(
RabbitClient webClient,
PostgresClient pgClient,
JsonObject config,
CacheService cacheService) {
CacheService cacheService,
/*RabbitMQOptions iudxConfig,
Vertx vertx,*/
RabbitMQClient iudxRabbitMqClient) {
this.webClient = webClient;
this.config = config;
/*this.iudxRabbitMQOptions = iudxConfig;
this.iudxRabbitMqClient = RabbitMQClient.create(vertx,iudxConfig);*/

this.iudxRabbitMqClient = iudxRabbitMqClient;
this.subscriptionService =
new SubscriptionService(this.webClient, pgClient, config, cacheService);
this.cacheService = cacheService;
Expand Down Expand Up @@ -659,11 +668,9 @@ public DataBrokerService listQueueSubscribers(
public DataBrokerService publishFromAdaptor(
JsonObject request, String vhost, Handler<AsyncResult<JsonObject>> handler) {
JsonObject finalResponse = new JsonObject();
JsonObject json = new JsonObject();
LOGGER.debug("request Json " + request);

if (request != null && !request.isEmpty()) {
json.put("body", request.toString());

JsonObject cacheRequestJson = new JsonObject();
cacheRequestJson.put("type", CATALOGUE_CACHE);
Expand All @@ -680,27 +687,26 @@ public DataBrokerService publishFromAdaptor(
: cacheResult.getString(ID);
LOGGER.debug("Info : resourceGroupId " + resourceGroupId);
String routingKey = request.getJsonArray("entities").getString(0);
request.remove("entities");
request.put("id", routingKey);
if (resourceGroupId != null && !resourceGroupId.isBlank()) {
LOGGER.debug("Info : routingKey " + routingKey);
Buffer buffer = Buffer.buffer(json.toString());
webClient
.getRabbitmqClient()
.basicPublish(
resourceGroupId,
routingKey,
buffer,
resultHandler -> {
if (resultHandler.succeeded()) {
finalResponse.put(STATUS, HttpStatus.SC_OK);
LOGGER.info("Success : Message published to queue");
handler.handle(Future.succeededFuture(finalResponse));
} else {
finalResponse.put(TYPE, HttpStatus.SC_BAD_REQUEST);
LOGGER.error("Fail : " + resultHandler.cause().toString());
handler.handle(
Future.failedFuture(resultHandler.cause().getMessage()));
}
});
Buffer buffer = Buffer.buffer(request.encode());
iudxRabbitMqClient.basicPublish(
resourceGroupId,
routingKey,
buffer,
resultHandler -> {
if (resultHandler.succeeded()) {
finalResponse.put(STATUS, HttpStatus.SC_OK);
LOGGER.info("Success : Message published to queue");
handler.handle(Future.succeededFuture(finalResponse));
} else {
finalResponse.put(TYPE, HttpStatus.SC_BAD_REQUEST);
LOGGER.error("Fail : " + resultHandler.cause().toString());
handler.handle(Future.failedFuture(resultHandler.cause().getMessage()));
}
});
}
} else {
LOGGER.error("Item not found");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,26 +22,28 @@
import iudx.resource.server.databroker.listeners.RevokeClientQlistener;
import iudx.resource.server.databroker.listeners.RmqListeners;
import iudx.resource.server.databroker.listeners.UniqueAttribQlistener;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/**
* The Data Broker Verticle.
*
* <h1>Data Broker Verticle</h1>
*
* <p>The Data Broker Verticle implementation in the the IUDX Resource Server exposes the {@link
* <p>The Data Broker Verticle implementation in the IUDX Resource Server exposes the {@link
* iudx.resource.server.databroker.DataBrokerService} over the Vert.x Event Bus.
*
* @version 1.0
* @since 2020-05-31
*/
public class DataBrokerVerticle extends AbstractVerticle {

private static final Logger LOGGER = LogManager.getLogger(DataBrokerVerticle.class);
/*RabbitMQOptions iudxConfig;*/
private DataBrokerService databroker;
private RabbitMQOptions config;
private String dataBrokerIp;
private int dataBrokerPort;
private int dataBrokerManagementPort;
private String dataBrokerVhost;
private String dataBrokerUserName;
private String dataBrokerPassword;
private int connectionTimeout;
Expand All @@ -66,6 +68,7 @@ public class DataBrokerVerticle extends AbstractVerticle {
private PostgresClient pgClient;
private CacheService cache;
private AsyncService asyncService;
private RabbitMQClient iudxRabbitMqClient;

/**
* This method is used to start the Verticle. It deploys a verticle in a cluster, registers the
Expand All @@ -79,7 +82,6 @@ public void start() throws Exception {
dataBrokerIp = config().getString("dataBrokerIP");
dataBrokerPort = config().getInteger("dataBrokerPort");
dataBrokerManagementPort = config().getInteger("dataBrokerManagementPort");
dataBrokerVhost = config().getString("dataBrokerVhost");
dataBrokerUserName = config().getString("dataBrokerUserName");
dataBrokerPassword = config().getString("dataBrokerPassword");
connectionTimeout = config().getInteger("connectionTimeout");
Expand All @@ -101,14 +103,18 @@ public void start() throws Exception {
config.setPassword(dataBrokerPassword);
config.setHost(dataBrokerIp);
config.setPort(dataBrokerPort);
config.setVirtualHost(dataBrokerVhost);
config.setConnectionTimeout(connectionTimeout);
config.setRequestedHeartbeat(requestedHeartbeat);
config.setHandshakeTimeout(handshakeTimeout);
config.setRequestedChannelMax(requestedChannelMax);
config.setNetworkRecoveryInterval(networkRecoveryInterval);
config.setAutomaticRecoveryEnabled(true);

RabbitMQOptions iudxConfig = new RabbitMQOptions(config);
String prodVhost = config().getString(Vhosts.IUDX_PROD.value);

iudxConfig.setVirtualHost(prodVhost);

webConfig = new WebClientOptions();
webConfig.setKeepAlive(true);
webConfig.setConnectTimeout(86400000);
Expand Down Expand Up @@ -149,7 +155,6 @@ public void start() throws Exception {

propObj.put("userName", dataBrokerUserName);
propObj.put("password", dataBrokerPassword);
propObj.put("vHost", dataBrokerVhost);
propObj.put("databaseIP", databaseIp);
propObj.put("databasePort", databasePort);
propObj.put("databaseName", databaseName);
Expand All @@ -164,7 +169,20 @@ public void start() throws Exception {
rabbitClient = new RabbitClient(vertx, config, rabbitWebClient, pgClient, config());
cache = CacheService.createProxy(vertx, CACHE_SERVICE_ADDRESS);
binder = new ServiceBinder(vertx);
databroker = new DataBrokerServiceImpl(rabbitClient, pgClient, config(), cache);
iudxRabbitMqClient = RabbitMQClient.create(vertx, iudxConfig);
iudxRabbitMqClient
.start()
.onSuccess(
iudxRabbitClientStart -> {
LOGGER.info("RMQ client started for Prod Vhost");
})
.onFailure(
iudxRabbitClientStart -> {
LOGGER.fatal("RMQ client startup failed");
});
databroker =
new DataBrokerServiceImpl(
rabbitClient, pgClient, config(), cache, /*iudxConfig, vertx,*/ iudxRabbitMqClient);
asyncService = AsyncService.createProxy(vertx, ASYNC_SERVICE_ADDRESS);

String internalVhost = config().getString(Vhosts.IUDX_INTERNAL.value);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ public class AdapterEntitiesTest {
private static PostgresClient pgClient;
private static Configuration appConfig;
@Mock static CacheService cacheService;
@Mock
static RabbitMQClient iudxRabbitMQClient;

private static final Logger LOGGER = LogManager.getLogger(AdapterEntitiesTest.class);

Expand Down Expand Up @@ -142,7 +144,9 @@ static void startVertx(Vertx vertx, VertxTestContext testContext) {
webConfig.setDefaultHost(dataBrokerIP);
webConfig.setDefaultPort(dataBrokerManagementPort);
webConfig.setKeepAliveTimeout(86400000);

RabbitMQOptions iudxConfig = new RabbitMQOptions(config);
String prodVhost = "IUDX";
iudxConfig.setVirtualHost(prodVhost);

/* Create a RabbitMQ Clinet with the configuration and vertx cluster instance. */
client = RabbitMQClient.create(vertx, config);
Expand Down Expand Up @@ -181,7 +185,7 @@ static void startVertx(Vertx vertx, VertxTestContext testContext) {
rabbitMQWebClient = new RabbitWebClient(vertx, webConfig, propObj);
pgClient = new PostgresClient(vertx, connectOptions, poolOptions);
rabbitMQStreamingClient = new RabbitClient(vertx, config, rabbitMQWebClient, pgClient, brokerConfig);
databroker = new DataBrokerServiceImpl(rabbitMQStreamingClient, pgClient, brokerConfig,cacheService);
databroker = new DataBrokerServiceImpl(rabbitMQStreamingClient, pgClient, brokerConfig,cacheService, /*iudxConfig, vertx,*/ iudxRabbitMQClient);

resourceGroup = brokerConfig.getString("testResourceGroup");
resourceServer = brokerConfig.getString("testResourceServer");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
import io.vertx.junit5.VertxExtension;
import io.vertx.junit5.VertxTestContext;
import io.vertx.rabbitmq.RabbitMQClient;
import io.vertx.rabbitmq.RabbitMQOptions;
import iudx.resource.server.cache.CacheService;
import org.apache.http.HttpStatus;
import org.junit.jupiter.api.BeforeEach;
Expand Down Expand Up @@ -45,13 +47,17 @@ public class DBServiceImplTest {
@Mock
PostgresClient pgClient;
@Mock
RabbitMQClient rabbitMQClient;
RabbitMQClient rabbitMQClient,iudxRabbitMQClient;
@Mock
AsyncResult<Void> asyncResult1;
DataBrokerServiceImpl databrokerSpy;
JsonObject expected;
@Mock
CacheService cacheService;
@Mock
RabbitMQOptions iudxConfig;
@Mock
Vertx vertx;

@BeforeEach
public void setUp(VertxTestContext vertxTestContext) {
Expand All @@ -66,7 +72,7 @@ public void setUp(VertxTestContext vertxTestContext) {
request.put("type", HttpStatus.SC_OK);
throwableMessage = "Dummy failure message";
when(config.getString(anyString())).thenReturn("internalVhost");
databroker = new DataBrokerServiceImpl(webClient, pgClient, config,cacheService);
databroker = new DataBrokerServiceImpl(webClient, pgClient, config,cacheService,/* iudxConfig, vertx,*/ iudxRabbitMQClient);
databrokerSpy = spy(databroker);
vertxTestContext.completeNow();
}
Expand Down Expand Up @@ -118,7 +124,7 @@ public void test_publishFromAdaptor_Success(VertxTestContext vertxTestContext) {
request.put("routingKey", "routingKeyValue");
request.put("type", HttpStatus.SC_OK);
request.put("entities", new JsonArray().add("5b7556b5-0779-4c47-9cf2-3f209779aa22"));
when(webClient.getRabbitmqClient()).thenReturn(rabbitMQClient);
/*when(webClient.getRabbitmqClient()).thenReturn(rabbitMQClient);*/
when(asyncResult1.succeeded()).thenReturn(true);

JsonObject providerJson =
Expand All @@ -135,7 +141,7 @@ public AsyncResult<Void> answer(InvocationOnMock arg0) throws Throwable {
((Handler<AsyncResult<Void>>) arg0.getArgument(3)).handle(asyncResult1);
return null;
}
}).when(rabbitMQClient).basicPublish(anyString(), anyString(), any(Buffer.class), any(Handler.class));
}).when(iudxRabbitMQClient).basicPublish(anyString(), anyString(), any(Buffer.class), any(Handler.class));
expected.put("status", 200);

databroker.publishFromAdaptor(request, vHost, handler -> {
Expand Down Expand Up @@ -847,7 +853,7 @@ public void test_publishFromAdaptor_Failure(VertxTestContext vertxTestContext) {
request.put("routingKey", "routingKeyValue");
request.put("type", HttpStatus.SC_OK);
request.put("entities", new JsonArray().add("5b7556b5-0779-4c47-9cf2-3f209779aa22"));
when(webClient.getRabbitmqClient()).thenReturn(rabbitMQClient);
lenient().when(webClient.getRabbitmqClient()).thenReturn(rabbitMQClient);
when(asyncResult1.succeeded()).thenReturn(false);
when(asyncResult1.cause()).thenReturn(throwable);
when(throwable.getMessage()).thenReturn(throwableMessage);
Expand All @@ -858,7 +864,7 @@ public AsyncResult<Void> answer(InvocationOnMock arg0) throws Throwable {
((Handler<AsyncResult<Void>>) arg0.getArgument(3)).handle(asyncResult1);
return null;
}
}).when(rabbitMQClient).basicPublish(anyString(), anyString(), any(Buffer.class), any(Handler.class));
}).when(iudxRabbitMQClient).basicPublish(anyString(), anyString(), any(Buffer.class), any(Handler.class));
expected.put("status", 200);
JsonObject providerJson =
new JsonObject()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,25 +3,22 @@
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.json.JsonArray;
import io.vertx.core.Vertx;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.web.client.HttpResponse;
import io.vertx.junit5.VertxExtension;
import io.vertx.junit5.VertxTestContext;
import io.vertx.rabbitmq.RabbitMQClient;
import io.vertx.rabbitmq.RabbitMQOptions;
import iudx.resource.server.cache.CacheService;
import org.apache.http.HttpStatus;
import org.junit.jupiter.api.*;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.mockito.stubbing.Answer;

import static iudx.resource.server.databroker.util.Constants.ID;
import static iudx.resource.server.databroker.util.Constants.USER_ID;
import static org.junit.jupiter.api.Assertions.*;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.*;
Expand Down Expand Up @@ -51,6 +48,12 @@ public class DataBrokerServiceImplTest {
JsonObject expected;
@Mock
CacheService cacheService;
@Mock
RabbitMQOptions iudxConfig;
@Mock
Vertx vertx;
@Mock
RabbitMQClient iudxRabbitMQClient;

@BeforeEach
public void setUp(VertxTestContext vertxTestContext) {
Expand All @@ -65,7 +68,7 @@ public void setUp(VertxTestContext vertxTestContext) {
request.put("type", HttpStatus.SC_OK);
throwableMessage = "Dummy failure message";
when(config.getString(anyString())).thenReturn("internalVhost");
databroker = new DataBrokerServiceImpl(webClient, pgClient, config,cacheService);
databroker = new DataBrokerServiceImpl(webClient, pgClient, config,cacheService, /*iudxConfig, vertx,*/ iudxRabbitMQClient);
databrokerSpy = spy(databroker);
vertxTestContext.completeNow();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import static iudx.resource.server.databroker.util.Constants.USER_NAME;
import static iudx.resource.server.databroker.util.Constants.VHOST;
import static iudx.resource.server.databroker.util.Constants.VHOST_IUDX;
import static iudx.resource.server.metering.util.Constants.PROVIDER_ID;
import static org.junit.jupiter.api.Assertions.*;

import java.io.InputStream;
Expand Down Expand Up @@ -108,6 +107,8 @@ public class DataBrokerServiceTest {
@Mock
static
CacheService cacheService;
@Mock
static RabbitMQClient iudxRabbitMQClient;

private static final Logger LOGGER = LogManager.getLogger(DataBrokerServiceTest.class);

Expand Down Expand Up @@ -191,7 +192,9 @@ static void startVertx(Vertx vertx, VertxTestContext testContext) {
webConfig.setDefaultHost(dataBrokerIP);
webConfig.setDefaultPort(dataBrokerManagementPort);
webConfig.setKeepAliveTimeout(86400000);

RabbitMQOptions iudxConfig = new RabbitMQOptions(config);
String prodVhost = "IUDX";
iudxConfig.setVirtualHost(prodVhost);

/* Create a RabbitMQ Clinet with the configuration and vertx cluster instance. */

Expand Down Expand Up @@ -233,7 +236,7 @@ static void startVertx(Vertx vertx, VertxTestContext testContext) {
rabbitMQWebClient = new RabbitWebClient(vertx, webConfig, propObj);
pgClient = new PostgresClient(vertx, connectOptions, poolOptions);
rabbitMQStreamingClient = new RabbitClient(vertx, config, rabbitMQWebClient, pgClient, brokerConfig);
databroker = new DataBrokerServiceImpl(rabbitMQStreamingClient, pgClient, brokerConfig,cacheService);
databroker = new DataBrokerServiceImpl(rabbitMQStreamingClient, pgClient, brokerConfig,cacheService, /*iudxConfig, vertx,*/ iudxRabbitMQClient);

userid = UUID.randomUUID().toString();

Expand Down

0 comments on commit 79f988b

Please sign in to comment.