Skip to content

Commit

Permalink
Use threadpool for subscription checking (#465)
Browse files Browse the repository at this point in the history
- Add thread pool for SubscriptionHandler
- Setup threadpool settings for SubscriptionHandler in SpringAsyncConfig
- Add new properties for subscription handler thread pool
  with same values as event handler thread pool
- Update test and info to use new properties
- Set org.mongodb.driver.protocol.command log out to error
- Fix of docker-compose correct path to rules
- Add config for subscription handler execution pool in tests
* Step patch version to 2.0.1
* Update default values to match eventhandler threadpool

Co-authored-by: Mattias Linner <[email protected]>
Co-authored-by: Christoffer Cortes Sjöwall <[email protected]>
Co-authored-by: SantoshNC68 <[email protected]>
  • Loading branch information
4 people authored Jun 12, 2020
1 parent 01a661f commit 5435c7b
Show file tree
Hide file tree
Showing 11 changed files with 132 additions and 94 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>com.github.ericsson</groupId>
<artifactId>eiffel-intelligence</artifactId>
<version>2.2.0</version>
<version>2.2.1</version>
<packaging>war</packaging>

<parent>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@
"threads.corePoolSize= 3",
"threads.queueCapacity= 1",
"threads.maxPoolSize= 4",
"subscription-handler.threads.corePoolSize= 3",
"subscription-handler.threads.queueCapacity= 1",
"subscription-handler.threads.maxPoolSize= 4",
"waitlist.collection.ttlValue: 60",
"waitlist.initialDelayResend= 500",
"waitlist.fixedRateResend= 1000",
Expand All @@ -46,12 +49,7 @@ public class ThreadingAndWaitlistRepeatSteps extends FunctionalTestBase {
@Autowired
private Environment environment;

@Value("${threads.corePoolSize}")
private int corePoolSize;
@Value("${threads.queueCapacity}")
private int queueCapacity;
@Value("${threads.maxPoolSize}")
private int maxPoolSize;

@Value("${waitlist.collection.ttlValue}")
private int waitlistTtl;

Expand All @@ -64,41 +62,41 @@ public class ThreadingAndWaitlistRepeatSteps extends FunctionalTestBase {

@Given("^that eiffel events are sent$")
public void that_eiffel_events_are_sent() throws Throwable {
List<String> eventNamesToSend = getEventNamesToSend();
final List<String> eventNamesToSend = getEventNamesToSend();
eventManager.sendEiffelEvents(EIFFEL_EVENTS_JSON_PATH, eventNamesToSend);
}

@Then("^waitlist should not be empty$")
public void waitlist_should_not_be_empty() throws Throwable {
TimeUnit.SECONDS.sleep(5);
int waitListSize = dbManager.waitListSize();
final int waitListSize = dbManager.waitListSize();
assertNotEquals(0, waitListSize);
}

@Given("^no event is aggregated$")
public void no_event_is_aggregated() throws Throwable {
boolean aggregatedObjectExists = dbManager.verifyAggregatedObjectExistsInDB();
final boolean aggregatedObjectExists = dbManager.verifyAggregatedObjectExistsInDB();
assertEquals("aggregatedObjectExists was true, should be false, ", false, aggregatedObjectExists);
}

@Then("^event-to-object-map is manipulated to include the sent events$")
public void event_to_object_map_is_manipulated_to_include_the_sent_events() throws Throwable {
JsonNode parsedJSON = eventManager.getJSONFromFile(EIFFEL_EVENTS_JSON_PATH);
ObjectMapper objectMapper = new ObjectMapper();
final JsonNode parsedJSON = eventManager.getJSONFromFile(EIFFEL_EVENTS_JSON_PATH);
final ObjectMapper objectMapper = new ObjectMapper();
rulesJson = objectMapper.readTree(ID_RULE);
rulesObject = new RulesObject(rulesJson);

String dummyObjectID = "1234abcd-12ab-12ab-12ab-123456abcdef";
List<String> eventNames = getEventNamesToSend();
for (String eventName : eventNames) {
JsonNode eventJson = parsedJSON.get(eventName);
final String dummyObjectID = "1234abcd-12ab-12ab-12ab-123456abcdef";
final List<String> eventNames = getEventNamesToSend();
for (final String eventName : eventNames) {
final JsonNode eventJson = parsedJSON.get(eventName);
eventToObjectMapHanler.updateEventToObjectMapInMemoryDB(rulesObject, eventJson.toString(), dummyObjectID);
}
}

@Then("^when waitlist has resent events they should have been deleted$")
public void when_waitlist_has_resent_events_they_should_have_been_deleted() throws Throwable {
long stopTime = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(3);
final long stopTime = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(3);
while (dbManager.waitListSize() > 0 && stopTime > System.currentTimeMillis()) {
TimeUnit.MILLISECONDS.sleep(100);
}
Expand All @@ -108,19 +106,19 @@ public void when_waitlist_has_resent_events_they_should_have_been_deleted() thro

@Then("^after the time to live has ended, the waitlist should be empty$")
public void after_the_time_to_live_has_ended_the_waitlist_should_be_empty() throws Throwable {
long stopTime = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(waitlistTtl + 60);
final long stopTime = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(waitlistTtl + 60);
while (dbManager.waitListSize() > 0 && stopTime > System.currentTimeMillis()) {
TimeUnit.MILLISECONDS.sleep(10000);
}
int waitListSize = dbManager.waitListSize();
final int waitListSize = dbManager.waitListSize();
assertEquals(0, waitListSize);
}

/**
* Events used in the aggregation.
*/
protected List<String> getEventNamesToSend() {
List<String> eventNames = new ArrayList<>();
final List<String> eventNames = new ArrayList<>();
eventNames.add("event_EiffelConfidenceLevelModifiedEvent_3_2");
eventNames.add("event_EiffelArtifactPublishedEvent_3");
eventNames.add("event_EiffelTestCaseTriggeredEvent_3");
Expand Down
2 changes: 1 addition & 1 deletion src/main/docker/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ services:
environment: # Overrides settings in application config file
- SpringApplicationName=eiffel-intelligence-backend
- server.port=8080
- rules.path=src/main/resources/ArtifactRules.json
- rules.path=/rules/ArtifactRules-Eiffel-Agen-Version.json
- rabbitmq.host=rabbitmq
- rabbitmq.port=${RABBITMQ_AMQP_PORT}
- rabbitmq.domainId=ei-domain
Expand Down
3 changes: 3 additions & 0 deletions src/main/java/com/ericsson/ei/config/ConfigurationLogger.java
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ private void logConfiguration() {
+ "threads.corePoolSize: " + env.getProperty("threads.corePoolSize") + "\n"
+ "threads.queueCapacity: " + env.getProperty("threads.queueCapacity") + "\n"
+ "threads.maxPoolSize: " + env.getProperty("threads.maxPoolSize") + "\n"
+ "subscription-handler.threads.corePoolSize: " + env.getProperty("subscription-handler.threads.corePoolSize") + "\n"
+ "subscription-handler.threads.queueCapacity: " + env.getProperty("subscription-handler.threads.queueCapacity") + "\n"
+ "subscription-handler.threads.maxPoolSize: " + env.getProperty("subscription-handler.threads.maxPoolSize") + "\n"
+ "missedNotificationCollectionName: " + env.getProperty("missedNotificationCollectionName") + "\n"
+ "missedNotificationDataBaseName: " + env.getProperty("missedNotificationDataBaseName") + "\n"
+ "email.sender: " + env.getProperty("email.sender") + "\n"
Expand Down
47 changes: 37 additions & 10 deletions src/main/java/com/ericsson/ei/config/SpringAsyncConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.AsyncConfigurer;
import org.springframework.scheduling.annotation.EnableAsync;
Expand All @@ -29,23 +30,49 @@
@EnableAsync
public class SpringAsyncConfig implements AsyncConfigurer{

@Value("${threads.corePoolSize}") private int corePoolSize;
@Value("${threads.queueCapacity}") private int queueCapacity;
@Value("${threads.maxPoolSize}") private int maxPoolSize;

@Value("${threads.corePoolSize}")
private int eventHandlerCorePoolSize;

@Override
public Executor getAsyncExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(corePoolSize);
executor.setQueueCapacity(queueCapacity);
executor.setMaxPoolSize(maxPoolSize);
executor.setThreadNamePrefix("EventHandler-");
@Value("${threads.queueCapacity}")
private int eventHandlerQueueCapacity;

@Value("${threads.maxPoolSize}")
private int eventHandlerMaxPoolSize;

@Value("${subscription-handler.threads.corePoolSize:50}")
private int subscriptionHandlerCorePoolSize;

@Value("${subscription-handler.threads.queueCapacity:5000}")
private int subscriptionHandlerQueueCapacity;

@Value("${subscription-handler.threads.maxPoolSize:50}")
private int subscriptionHandlerMaxPoolSize;


@Bean("subscriptionHandlerExecutor")
public Executor subscriptionHandlerExecutor() {
final ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(subscriptionHandlerCorePoolSize);
executor.setQueueCapacity(subscriptionHandlerQueueCapacity);
executor.setMaxPoolSize(subscriptionHandlerMaxPoolSize);
executor.setThreadNamePrefix("SubscriptionHandler-");
executor.initialize();
return executor;
}


@Bean("eventHandlerExecutor")
public Executor eventHandlerExecutor() {
final ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(eventHandlerCorePoolSize);
executor.setQueueCapacity(eventHandlerQueueCapacity);
executor.setMaxPoolSize(eventHandlerMaxPoolSize);
executor.setThreadNamePrefix("EventHandler-");
executor.initialize();
return executor;
}

@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
// TODO Auto-generated method stub
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ public class ParseInstanceInfoEI {

@PostConstruct
public void init() throws IOException {
Properties properties = new Properties();
final Properties properties = new Properties();
properties.load(
ParseInstanceInfoEI.class.getResourceAsStream("/default-application.properties"));
version = properties.getProperty("version");
Expand Down Expand Up @@ -184,14 +184,26 @@ public void init() throws IOException {
private class ThreadsValue {
@Getter
@Value("${threads.corePoolSize}")
private int corePoolSize;
private int eventHandlerCorePoolSize;

@Getter
@Value("${threads.queueCapacity}")
private int queueCapacity;
private int eventHandlerQueueCapacity;

@Getter
@Value("${threads.maxPoolSize}")
private int maxPoolSize;
private int eventHandlerMaxPoolSize;

@Getter
@Value("${subscription-handler.threads.corePoolSize}")
private int subscriptionHandlerCorePoolSize;

@Getter
@Value("${subscription-handler.threads.queueCapacity}")
private int subscriptionHandlerQueueCapacity;

@Getter
@Value("${subscription-handler.threads.maxPoolSize}")
private int subscriptionHandlerMaxPoolSize;
}
}
18 changes: 9 additions & 9 deletions src/main/java/com/ericsson/ei/handlers/EventHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,22 +51,22 @@ public RulesHandler getRulesHandler() {
return rulesHandler;
}

public void eventReceived(String event) {
RulesObject eventRules = rulesHandler.getRulesForEvent(event);
public void eventReceived(final String event) {
final RulesObject eventRules = rulesHandler.getRulesForEvent(event);
idRulesHandler.runIdRules(eventRules, event);
}

@Async
public void onMessage(Message message, Channel channel) throws Exception {
String messageBody = new String(message.getBody());
ObjectMapper objectMapper = new ObjectMapper();
JsonNode node = objectMapper.readTree(messageBody);
String id = node.get("meta").get("id").toString();
@Async("eventHandlerExecutor")
public void onMessage(final Message message, final Channel channel) throws Exception {
final String messageBody = new String(message.getBody());
final ObjectMapper objectMapper = new ObjectMapper();
final JsonNode node = objectMapper.readTree(messageBody);
final String id = node.get("meta").get("id").toString();
LOGGER.debug("Thread id {} spawned for EventHandler", Thread.currentThread().getId());
LOGGER.debug("Event {} received", id);

eventReceived(messageBody);
long deliveryTag = message.getMessageProperties().getDeliveryTag();
final long deliveryTag = message.getMessageProperties().getDeliveryTag();
channel.basicAck(deliveryTag, false);

LOGGER.debug("Event {} processed", id);
Expand Down
28 changes: 14 additions & 14 deletions src/main/java/com/ericsson/ei/handlers/RmqHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ public ConnectionFactory connectionFactory() {
try {
LOGGER.debug("Using SSL/TLS version {} connection to RabbitMQ.", tlsVersion);
cachingConnectionFactory.getRabbitConnectionFactory().useSslProtocol(tlsVersion);
} catch (Exception e) {
} catch (final Exception e) {
LOGGER.error("Failed to set SSL/TLS version.", e);
}
}
Expand Down Expand Up @@ -172,19 +172,19 @@ Binding binding() {

@Bean
public List<Binding> bindings() {
String[] bingingKeysArray = splitBindingKeys(bindingKeys);
List<Binding> bindingList = new ArrayList<Binding>();
for (String bindingKey : bingingKeysArray) {
final String[] bingingKeysArray = splitBindingKeys(bindingKeys);
final List<Binding> bindingList = new ArrayList<>();
for (final String bindingKey : bingingKeysArray) {
bindingList.add(BindingBuilder.bind(externalQueue()).to(exchange()).with(bindingKey));
}
return bindingList;
}

@Bean
public SimpleMessageListenerContainer bindToQueueForRecentEvents(
ConnectionFactory springConnectionFactory,
EventHandler eventHandler) {
MessageListenerAdapter listenerAdapter = new EIMessageListenerAdapter(eventHandler);
final ConnectionFactory springConnectionFactory,
final EventHandler eventHandler) {
final MessageListenerAdapter listenerAdapter = new EIMessageListenerAdapter(eventHandler);
container = new SimpleMessageListenerContainer();
container.setConnectionFactory(springConnectionFactory);
container.setQueueNames(getQueueName(), getWaitlistQueueName());
Expand Down Expand Up @@ -214,7 +214,7 @@ public RabbitTemplate rabbitMqTemplate() {
rabbitTemplate.setRoutingKey(getWaitlistQueueName());
rabbitTemplate.setConfirmCallback(new ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
public void confirm(final CorrelationData correlationData, final boolean ack, final String cause) {
LOGGER.info("Received confirm with result : {}", ack);
}
});
Expand All @@ -223,18 +223,18 @@ public void confirm(CorrelationData correlationData, boolean ack, String cause)
}

public String getQueueName() {
String durableName = queueDurable ? "durable" : "transient";
final String durableName = queueDurable ? "durable" : "transient";
return domainId + "." + componentName + "." + consumerName + "." + durableName;
}

public String getWaitlistQueueName() {

String durableName = queueDurable ? "durable" : "transient";
final String durableName = queueDurable ? "durable" : "transient";
return domainId + "." + componentName + "." + consumerName + "." + durableName + "."
+ waitlistSufix;
}

public void publishObjectToWaitlistQueue(String message) {
public void publishObjectToWaitlistQueue(final String message) {
LOGGER.debug("Publishing message to message bus...");
rabbitTemplate.convertAndSend(message);
}
Expand All @@ -243,13 +243,13 @@ public void close() {
try {
container.destroy();
cachingConnectionFactory.destroy();
} catch (Exception e) {
} catch (final Exception e) {
LOGGER.error("Exception occurred while closing connections.", e);
}
}

private String[] splitBindingKeys(String bindingKeys) {
String bindingKeysWithoutWhitespace = bindingKeys.replaceAll("\\s+", "");
private String[] splitBindingKeys(final String bindingKeys) {
final String bindingKeysWithoutWhitespace = bindingKeys.replaceAll("\\s+", "");
return bindingKeysWithoutWhitespace.split(",");
}
}
Loading

0 comments on commit 5435c7b

Please sign in to comment.