From 5435c7b00da54a5187ed0e92ab710943cc082a79 Mon Sep 17 00:00:00 2001 From: e-pettersson-ericsson <38855976+e-pettersson-ericsson@users.noreply.github.com> Date: Fri, 12 Jun 2020 14:15:18 +0200 Subject: [PATCH] Use threadpool for subscription checking (#465) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add thread pool for SubscriptionHandler - Setup threadpool settings for SubscriptionHandler in SpringAsyncConfig - Add new properties for subscription handler thread pool with same values as event handler thread pool - Update test and info to use new properties - Set org.mongodb.driver.protocol.command log out to error - Fix of docker-compose correct path to rules - Add config for subscription handler execution pool in tests * Step patch version to 2.0.1 * Update default values to match eventhandler threadpool Co-authored-by: Mattias Linner Co-authored-by: Christoffer Cortes Sjöwall <35095827+Christoffer-Cortes@users.noreply.github.com> Co-authored-by: SantoshNC68 --- pom.xml | 2 +- .../ThreadingAndWaitlistRepeatSteps.java | 36 ++++++----- src/main/docker/docker-compose.yml | 2 +- .../ei/config/ConfigurationLogger.java | 3 + .../ericsson/ei/config/SpringAsyncConfig.java | 47 +++++++++++---- .../controller/model/ParseInstanceInfoEI.java | 20 +++++-- .../ericsson/ei/handlers/EventHandler.java | 18 +++--- .../com/ericsson/ei/handlers/RmqHandler.java | 28 ++++----- .../ei/subscription/SubscriptionHandler.java | 60 ++++++++----------- src/main/resources/application.properties | 9 ++- src/test/resources/logback.xml | 1 + 11 files changed, 132 insertions(+), 94 deletions(-) diff --git a/pom.xml b/pom.xml index 0ca79892e..9911409f2 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ 4.0.0 com.github.ericsson eiffel-intelligence - 2.2.0 + 2.2.1 war diff --git a/src/functionaltests/java/com/ericsson/ei/threadingAndWaitlistRepeat/ThreadingAndWaitlistRepeatSteps.java b/src/functionaltests/java/com/ericsson/ei/threadingAndWaitlistRepeat/ThreadingAndWaitlistRepeatSteps.java index e51afe390..f9e457dfd 100644 --- a/src/functionaltests/java/com/ericsson/ei/threadingAndWaitlistRepeat/ThreadingAndWaitlistRepeatSteps.java +++ b/src/functionaltests/java/com/ericsson/ei/threadingAndWaitlistRepeat/ThreadingAndWaitlistRepeatSteps.java @@ -29,6 +29,9 @@ "threads.corePoolSize= 3", "threads.queueCapacity= 1", "threads.maxPoolSize= 4", + "subscription-handler.threads.corePoolSize= 3", + "subscription-handler.threads.queueCapacity= 1", + "subscription-handler.threads.maxPoolSize= 4", "waitlist.collection.ttlValue: 60", "waitlist.initialDelayResend= 500", "waitlist.fixedRateResend= 1000", @@ -46,12 +49,7 @@ public class ThreadingAndWaitlistRepeatSteps extends FunctionalTestBase { @Autowired private Environment environment; - @Value("${threads.corePoolSize}") - private int corePoolSize; - @Value("${threads.queueCapacity}") - private int queueCapacity; - @Value("${threads.maxPoolSize}") - private int maxPoolSize; + @Value("${waitlist.collection.ttlValue}") private int waitlistTtl; @@ -64,41 +62,41 @@ public class ThreadingAndWaitlistRepeatSteps extends FunctionalTestBase { @Given("^that eiffel events are sent$") public void that_eiffel_events_are_sent() throws Throwable { - List eventNamesToSend = getEventNamesToSend(); + final List eventNamesToSend = getEventNamesToSend(); eventManager.sendEiffelEvents(EIFFEL_EVENTS_JSON_PATH, eventNamesToSend); } @Then("^waitlist should not be empty$") public void waitlist_should_not_be_empty() throws Throwable { TimeUnit.SECONDS.sleep(5); - int waitListSize = dbManager.waitListSize(); + final int waitListSize = dbManager.waitListSize(); assertNotEquals(0, waitListSize); } @Given("^no event is aggregated$") public void no_event_is_aggregated() throws Throwable { - boolean aggregatedObjectExists = dbManager.verifyAggregatedObjectExistsInDB(); + final boolean aggregatedObjectExists = dbManager.verifyAggregatedObjectExistsInDB(); assertEquals("aggregatedObjectExists was true, should be false, ", false, aggregatedObjectExists); } @Then("^event-to-object-map is manipulated to include the sent events$") public void event_to_object_map_is_manipulated_to_include_the_sent_events() throws Throwable { - JsonNode parsedJSON = eventManager.getJSONFromFile(EIFFEL_EVENTS_JSON_PATH); - ObjectMapper objectMapper = new ObjectMapper(); + final JsonNode parsedJSON = eventManager.getJSONFromFile(EIFFEL_EVENTS_JSON_PATH); + final ObjectMapper objectMapper = new ObjectMapper(); rulesJson = objectMapper.readTree(ID_RULE); rulesObject = new RulesObject(rulesJson); - String dummyObjectID = "1234abcd-12ab-12ab-12ab-123456abcdef"; - List eventNames = getEventNamesToSend(); - for (String eventName : eventNames) { - JsonNode eventJson = parsedJSON.get(eventName); + final String dummyObjectID = "1234abcd-12ab-12ab-12ab-123456abcdef"; + final List eventNames = getEventNamesToSend(); + for (final String eventName : eventNames) { + final JsonNode eventJson = parsedJSON.get(eventName); eventToObjectMapHanler.updateEventToObjectMapInMemoryDB(rulesObject, eventJson.toString(), dummyObjectID); } } @Then("^when waitlist has resent events they should have been deleted$") public void when_waitlist_has_resent_events_they_should_have_been_deleted() throws Throwable { - long stopTime = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(3); + final long stopTime = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(3); while (dbManager.waitListSize() > 0 && stopTime > System.currentTimeMillis()) { TimeUnit.MILLISECONDS.sleep(100); } @@ -108,11 +106,11 @@ public void when_waitlist_has_resent_events_they_should_have_been_deleted() thro @Then("^after the time to live has ended, the waitlist should be empty$") public void after_the_time_to_live_has_ended_the_waitlist_should_be_empty() throws Throwable { - long stopTime = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(waitlistTtl + 60); + final long stopTime = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(waitlistTtl + 60); while (dbManager.waitListSize() > 0 && stopTime > System.currentTimeMillis()) { TimeUnit.MILLISECONDS.sleep(10000); } - int waitListSize = dbManager.waitListSize(); + final int waitListSize = dbManager.waitListSize(); assertEquals(0, waitListSize); } @@ -120,7 +118,7 @@ public void after_the_time_to_live_has_ended_the_waitlist_should_be_empty() thro * Events used in the aggregation. */ protected List getEventNamesToSend() { - List eventNames = new ArrayList<>(); + final List eventNames = new ArrayList<>(); eventNames.add("event_EiffelConfidenceLevelModifiedEvent_3_2"); eventNames.add("event_EiffelArtifactPublishedEvent_3"); eventNames.add("event_EiffelTestCaseTriggeredEvent_3"); diff --git a/src/main/docker/docker-compose.yml b/src/main/docker/docker-compose.yml index d100f19cf..20989d70e 100644 --- a/src/main/docker/docker-compose.yml +++ b/src/main/docker/docker-compose.yml @@ -183,7 +183,7 @@ services: environment: # Overrides settings in application config file - SpringApplicationName=eiffel-intelligence-backend - server.port=8080 - - rules.path=src/main/resources/ArtifactRules.json + - rules.path=/rules/ArtifactRules-Eiffel-Agen-Version.json - rabbitmq.host=rabbitmq - rabbitmq.port=${RABBITMQ_AMQP_PORT} - rabbitmq.domainId=ei-domain diff --git a/src/main/java/com/ericsson/ei/config/ConfigurationLogger.java b/src/main/java/com/ericsson/ei/config/ConfigurationLogger.java index d229fcdf2..4f4642a35 100644 --- a/src/main/java/com/ericsson/ei/config/ConfigurationLogger.java +++ b/src/main/java/com/ericsson/ei/config/ConfigurationLogger.java @@ -57,6 +57,9 @@ private void logConfiguration() { + "threads.corePoolSize: " + env.getProperty("threads.corePoolSize") + "\n" + "threads.queueCapacity: " + env.getProperty("threads.queueCapacity") + "\n" + "threads.maxPoolSize: " + env.getProperty("threads.maxPoolSize") + "\n" + + "subscription-handler.threads.corePoolSize: " + env.getProperty("subscription-handler.threads.corePoolSize") + "\n" + + "subscription-handler.threads.queueCapacity: " + env.getProperty("subscription-handler.threads.queueCapacity") + "\n" + + "subscription-handler.threads.maxPoolSize: " + env.getProperty("subscription-handler.threads.maxPoolSize") + "\n" + "missedNotificationCollectionName: " + env.getProperty("missedNotificationCollectionName") + "\n" + "missedNotificationDataBaseName: " + env.getProperty("missedNotificationDataBaseName") + "\n" + "email.sender: " + env.getProperty("email.sender") + "\n" diff --git a/src/main/java/com/ericsson/ei/config/SpringAsyncConfig.java b/src/main/java/com/ericsson/ei/config/SpringAsyncConfig.java index 3bb62c6d2..bc96b1d8f 100644 --- a/src/main/java/com/ericsson/ei/config/SpringAsyncConfig.java +++ b/src/main/java/com/ericsson/ei/config/SpringAsyncConfig.java @@ -20,6 +20,7 @@ import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler; import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.annotation.AsyncConfigurer; import org.springframework.scheduling.annotation.EnableAsync; @@ -29,23 +30,49 @@ @EnableAsync public class SpringAsyncConfig implements AsyncConfigurer{ - @Value("${threads.corePoolSize}") private int corePoolSize; - @Value("${threads.queueCapacity}") private int queueCapacity; - @Value("${threads.maxPoolSize}") private int maxPoolSize; + @Value("${threads.corePoolSize}") + private int eventHandlerCorePoolSize; - @Override - public Executor getAsyncExecutor() { - ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); - executor.setCorePoolSize(corePoolSize); - executor.setQueueCapacity(queueCapacity); - executor.setMaxPoolSize(maxPoolSize); - executor.setThreadNamePrefix("EventHandler-"); + @Value("${threads.queueCapacity}") + private int eventHandlerQueueCapacity; + + @Value("${threads.maxPoolSize}") + private int eventHandlerMaxPoolSize; + + @Value("${subscription-handler.threads.corePoolSize:50}") + private int subscriptionHandlerCorePoolSize; + + @Value("${subscription-handler.threads.queueCapacity:5000}") + private int subscriptionHandlerQueueCapacity; + + @Value("${subscription-handler.threads.maxPoolSize:50}") + private int subscriptionHandlerMaxPoolSize; + + + @Bean("subscriptionHandlerExecutor") + public Executor subscriptionHandlerExecutor() { + final ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); + executor.setCorePoolSize(subscriptionHandlerCorePoolSize); + executor.setQueueCapacity(subscriptionHandlerQueueCapacity); + executor.setMaxPoolSize(subscriptionHandlerMaxPoolSize); + executor.setThreadNamePrefix("SubscriptionHandler-"); executor.initialize(); return executor; } + @Bean("eventHandlerExecutor") + public Executor eventHandlerExecutor() { + final ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); + executor.setCorePoolSize(eventHandlerCorePoolSize); + executor.setQueueCapacity(eventHandlerQueueCapacity); + executor.setMaxPoolSize(eventHandlerMaxPoolSize); + executor.setThreadNamePrefix("EventHandler-"); + executor.initialize(); + return executor; + } + @Override public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() { // TODO Auto-generated method stub diff --git a/src/main/java/com/ericsson/ei/controller/model/ParseInstanceInfoEI.java b/src/main/java/com/ericsson/ei/controller/model/ParseInstanceInfoEI.java index 8be22a144..eae68a377 100644 --- a/src/main/java/com/ericsson/ei/controller/model/ParseInstanceInfoEI.java +++ b/src/main/java/com/ericsson/ei/controller/model/ParseInstanceInfoEI.java @@ -109,7 +109,7 @@ public class ParseInstanceInfoEI { @PostConstruct public void init() throws IOException { - Properties properties = new Properties(); + final Properties properties = new Properties(); properties.load( ParseInstanceInfoEI.class.getResourceAsStream("/default-application.properties")); version = properties.getProperty("version"); @@ -184,14 +184,26 @@ public void init() throws IOException { private class ThreadsValue { @Getter @Value("${threads.corePoolSize}") - private int corePoolSize; + private int eventHandlerCorePoolSize; @Getter @Value("${threads.queueCapacity}") - private int queueCapacity; + private int eventHandlerQueueCapacity; @Getter @Value("${threads.maxPoolSize}") - private int maxPoolSize; + private int eventHandlerMaxPoolSize; + + @Getter + @Value("${subscription-handler.threads.corePoolSize}") + private int subscriptionHandlerCorePoolSize; + + @Getter + @Value("${subscription-handler.threads.queueCapacity}") + private int subscriptionHandlerQueueCapacity; + + @Getter + @Value("${subscription-handler.threads.maxPoolSize}") + private int subscriptionHandlerMaxPoolSize; } } diff --git a/src/main/java/com/ericsson/ei/handlers/EventHandler.java b/src/main/java/com/ericsson/ei/handlers/EventHandler.java index 18a8caff2..1a93b25be 100644 --- a/src/main/java/com/ericsson/ei/handlers/EventHandler.java +++ b/src/main/java/com/ericsson/ei/handlers/EventHandler.java @@ -51,22 +51,22 @@ public RulesHandler getRulesHandler() { return rulesHandler; } - public void eventReceived(String event) { - RulesObject eventRules = rulesHandler.getRulesForEvent(event); + public void eventReceived(final String event) { + final RulesObject eventRules = rulesHandler.getRulesForEvent(event); idRulesHandler.runIdRules(eventRules, event); } - @Async - public void onMessage(Message message, Channel channel) throws Exception { - String messageBody = new String(message.getBody()); - ObjectMapper objectMapper = new ObjectMapper(); - JsonNode node = objectMapper.readTree(messageBody); - String id = node.get("meta").get("id").toString(); + @Async("eventHandlerExecutor") + public void onMessage(final Message message, final Channel channel) throws Exception { + final String messageBody = new String(message.getBody()); + final ObjectMapper objectMapper = new ObjectMapper(); + final JsonNode node = objectMapper.readTree(messageBody); + final String id = node.get("meta").get("id").toString(); LOGGER.debug("Thread id {} spawned for EventHandler", Thread.currentThread().getId()); LOGGER.debug("Event {} received", id); eventReceived(messageBody); - long deliveryTag = message.getMessageProperties().getDeliveryTag(); + final long deliveryTag = message.getMessageProperties().getDeliveryTag(); channel.basicAck(deliveryTag, false); LOGGER.debug("Event {} processed", id); diff --git a/src/main/java/com/ericsson/ei/handlers/RmqHandler.java b/src/main/java/com/ericsson/ei/handlers/RmqHandler.java index 90cd49ac6..64bd006fd 100644 --- a/src/main/java/com/ericsson/ei/handlers/RmqHandler.java +++ b/src/main/java/com/ericsson/ei/handlers/RmqHandler.java @@ -135,7 +135,7 @@ public ConnectionFactory connectionFactory() { try { LOGGER.debug("Using SSL/TLS version {} connection to RabbitMQ.", tlsVersion); cachingConnectionFactory.getRabbitConnectionFactory().useSslProtocol(tlsVersion); - } catch (Exception e) { + } catch (final Exception e) { LOGGER.error("Failed to set SSL/TLS version.", e); } } @@ -172,9 +172,9 @@ Binding binding() { @Bean public List bindings() { - String[] bingingKeysArray = splitBindingKeys(bindingKeys); - List bindingList = new ArrayList(); - for (String bindingKey : bingingKeysArray) { + final String[] bingingKeysArray = splitBindingKeys(bindingKeys); + final List bindingList = new ArrayList<>(); + for (final String bindingKey : bingingKeysArray) { bindingList.add(BindingBuilder.bind(externalQueue()).to(exchange()).with(bindingKey)); } return bindingList; @@ -182,9 +182,9 @@ public List bindings() { @Bean public SimpleMessageListenerContainer bindToQueueForRecentEvents( - ConnectionFactory springConnectionFactory, - EventHandler eventHandler) { - MessageListenerAdapter listenerAdapter = new EIMessageListenerAdapter(eventHandler); + final ConnectionFactory springConnectionFactory, + final EventHandler eventHandler) { + final MessageListenerAdapter listenerAdapter = new EIMessageListenerAdapter(eventHandler); container = new SimpleMessageListenerContainer(); container.setConnectionFactory(springConnectionFactory); container.setQueueNames(getQueueName(), getWaitlistQueueName()); @@ -214,7 +214,7 @@ public RabbitTemplate rabbitMqTemplate() { rabbitTemplate.setRoutingKey(getWaitlistQueueName()); rabbitTemplate.setConfirmCallback(new ConfirmCallback() { @Override - public void confirm(CorrelationData correlationData, boolean ack, String cause) { + public void confirm(final CorrelationData correlationData, final boolean ack, final String cause) { LOGGER.info("Received confirm with result : {}", ack); } }); @@ -223,18 +223,18 @@ public void confirm(CorrelationData correlationData, boolean ack, String cause) } public String getQueueName() { - String durableName = queueDurable ? "durable" : "transient"; + final String durableName = queueDurable ? "durable" : "transient"; return domainId + "." + componentName + "." + consumerName + "." + durableName; } public String getWaitlistQueueName() { - String durableName = queueDurable ? "durable" : "transient"; + final String durableName = queueDurable ? "durable" : "transient"; return domainId + "." + componentName + "." + consumerName + "." + durableName + "." + waitlistSufix; } - public void publishObjectToWaitlistQueue(String message) { + public void publishObjectToWaitlistQueue(final String message) { LOGGER.debug("Publishing message to message bus..."); rabbitTemplate.convertAndSend(message); } @@ -243,13 +243,13 @@ public void close() { try { container.destroy(); cachingConnectionFactory.destroy(); - } catch (Exception e) { + } catch (final Exception e) { LOGGER.error("Exception occurred while closing connections.", e); } } - private String[] splitBindingKeys(String bindingKeys) { - String bindingKeysWithoutWhitespace = bindingKeys.replaceAll("\\s+", ""); + private String[] splitBindingKeys(final String bindingKeys) { + final String bindingKeysWithoutWhitespace = bindingKeys.replaceAll("\\s+", ""); return bindingKeysWithoutWhitespace.split(","); } } diff --git a/src/main/java/com/ericsson/ei/subscription/SubscriptionHandler.java b/src/main/java/com/ericsson/ei/subscription/SubscriptionHandler.java index 7dec495b1..d985b844a 100644 --- a/src/main/java/com/ericsson/ei/subscription/SubscriptionHandler.java +++ b/src/main/java/com/ericsson/ei/subscription/SubscriptionHandler.java @@ -16,17 +16,15 @@ import java.util.Iterator; import java.util.List; -import javax.annotation.PostConstruct; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; +import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Component; -import com.ericsson.ei.jmespath.JmesPathInterface; -import com.ericsson.ei.notifications.InformSubscriber; import com.ericsson.ei.handlers.MongoDBHandler; +import com.ericsson.ei.notifications.InformSubscriber; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ArrayNode; @@ -35,10 +33,9 @@ import lombok.Setter; /** - * This class is responsible to take a aggregatedObject and match it with all - * the subscription object, to check ALL Conditions/requirement for - * notification. (AND between conditions in requirements, "OR" between - * requirements with conditions) + * This class is responsible to take a aggregatedObject and match it with all the subscription + * object, to check ALL Conditions/requirement for notification. (AND between conditions in + * requirements, "OR" between requirements with conditions) * * @author xjibbal */ @@ -68,57 +65,52 @@ public class SubscriptionHandler { private RunSubscription runSubscription; /** - * The method takes a aggregatedObject as argument and fetches all the - * subscriber from the database in order to match the subscription - * conditions in a separate thread. + * The method takes a aggregatedObject as argument and fetches all the subscriber from the + * database in order to match the subscription conditions in a separate thread. * * @param aggregatedObject * @param id */ + @Async("subscriptionHandlerExecutor") public void checkSubscriptionForObject(final String aggregatedObject, - final String id) { - Thread subscriptionThread = new Thread(() -> { - List subscriptions = mongoDBHandler.getAllDocuments( + final String id) { + final List subscriptions = mongoDBHandler.getAllDocuments( subscriptionDataBaseName, subscriptionCollectionName); - subscriptions.forEach( + subscriptions.forEach( subscription -> extractConditions(aggregatedObject, - subscription, id)); - }); - subscriptionThread.setName("SubscriptionHandler"); - subscriptionThread.start(); + subscription, id)); } /** - * This method takes both aggregatedObject and a Subscription object as - * arguments and fetches the subscription conditions from the - * subscription object and matches these conditions with + * This method takes both aggregatedObject and a Subscription object as arguments and fetches + * the subscription conditions from the subscription object and matches these conditions with * the aggregatedObject. * * @param aggregatedObject * @param subscriptionData * @param id */ - private void extractConditions(String aggregatedObject, - String subscriptionData, String id) { + private void extractConditions(final String aggregatedObject, + final String subscriptionData, final String id) { try { - JsonNode subscriptionJson = new ObjectMapper().readTree( - subscriptionData); + final JsonNode subscriptionJson = new ObjectMapper().readTree( + subscriptionData); LOGGER.debug("SubscriptionJson : " + subscriptionJson.toString()); LOGGER.debug("Aggregated Object : " + aggregatedObject); - ArrayNode requirementNode = (ArrayNode) subscriptionJson.get( - "requirements"); + final ArrayNode requirementNode = (ArrayNode) subscriptionJson.get( + "requirements"); LOGGER.debug("Requirements : " + requirementNode.toString()); - Iterator requirementIterator = requirementNode.elements(); + final Iterator requirementIterator = requirementNode.elements(); if (runSubscription.runSubscriptionOnObject(aggregatedObject, - requirementIterator, subscriptionJson, id)) { + requirementIterator, subscriptionJson, id)) { LOGGER.debug( - "The subscription conditions match for the aggregatedObject"); + "The subscription conditions match for the aggregatedObject"); informSubscriber.informSubscriber(aggregatedObject, - subscriptionJson); + subscriptionJson); } - } catch (Exception e) { + } catch (final Exception e) { LOGGER.error("Subscription: {}, failed for aggregated object: {}", - subscriptionData, aggregatedObject, e); + subscriptionData, aggregatedObject, e); } } } \ No newline at end of file diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index 84095a04c..bce1a1e42 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -66,14 +66,14 @@ aggregated.collection.name: aggregated_objects # this needs to be set if using all_events_rules to avoid a copy of # event repository. Recommended setting is 10 minutes aggregated.collection.ttlValue: -# the name of the collection where to store the maping between +# the name of the collection where to store the mapping between # aggregated objects and id of the events that contributed to them event_object_map.collection.name: event_object_map # name of the collection where unprocessed events are stored waitlist.collection.name: wait_list # time to live value in database in seconds for unprocessed events waitlist.collection.ttlValue: 600 -# resend information for unprocessed events in wait list, time in miliseconds +# resend information for unprocessed events in wait list, time in milliseconds waitlist.initialDelayResend: 2000 waitlist.fixedRateResend: 15000 # name of the collection where subscriptions are stored @@ -92,6 +92,11 @@ threads.corePoolSize: 100 threads.queueCapacity: 5000 threads.maxPoolSize: 150 +subscription-handler.threads.corePoolSize: 100 +subscription-handler.threads.queueCapacity: 5000 +subscription-handler.threads.maxPoolSize: 150 + + # WaitList-worker and Scheduling task executor thread settings scheduled.threadpool.size: 100 diff --git a/src/test/resources/logback.xml b/src/test/resources/logback.xml index b95b27f52..c0c689bf2 100644 --- a/src/test/resources/logback.xml +++ b/src/test/resources/logback.xml @@ -9,4 +9,5 @@ + \ No newline at end of file