Skip to content

Commit

Permalink
Parameterize EventHub threadpool configurations
Browse files Browse the repository at this point in the history
  • Loading branch information
PasanT9 committed Oct 4, 2023
1 parent a0e8dbd commit 55ccfbc
Show file tree
Hide file tree
Showing 11 changed files with 106 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,11 @@ public class APIMgtServerStartupListener implements ServerStartupObserver, Serve

public APIMgtServerStartupListener() {

EventHubConfigurationDto.EventHubReceiverConfiguration eventHubReceiverConfiguration =
EventHubConfigurationDto eventHubConfiguration =
DataHolder.getInstance().getAPIManagerConfigurationService().getAPIManagerConfiguration()
.getEventHubConfigurationDto().getEventHubReceiverConfiguration();
if (eventHubReceiverConfiguration != null) {
this.jmsTransportHandlerForEventHub =
new JMSTransportHandler(eventHubReceiverConfiguration.getJmsConnectionParameters());
.getEventHubConfigurationDto();

Check warning on line 40 in components/apimgt/org.wso2.carbon.apimgt.cache.invalidation/src/main/java/org/wso2/carbon/apimgt/cache/invalidation/APIMgtServerStartupListener.java

View check run for this annotation

Codecov / codecov/patch

components/apimgt/org.wso2.carbon.apimgt.cache.invalidation/src/main/java/org/wso2/carbon/apimgt/cache/invalidation/APIMgtServerStartupListener.java#L40

Added line #L40 was not covered by tests
if (eventHubConfiguration.getEventHubReceiverConfiguration() != null) {
this.jmsTransportHandlerForEventHub = new JMSTransportHandler(eventHubConfiguration);

Check warning on line 42 in components/apimgt/org.wso2.carbon.apimgt.cache.invalidation/src/main/java/org/wso2/carbon/apimgt/cache/invalidation/APIMgtServerStartupListener.java

View check run for this annotation

Codecov / codecov/patch

components/apimgt/org.wso2.carbon.apimgt.cache.invalidation/src/main/java/org/wso2/carbon/apimgt/cache/invalidation/APIMgtServerStartupListener.java#L42

Added line #L42 was not covered by tests
}
}

Expand Down
4 changes: 4 additions & 0 deletions components/apimgt/org.wso2.carbon.apimgt.common.jms/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@
<classifier>runtime</classifier>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.wso2.carbon.apimgt</groupId>
<artifactId>org.wso2.carbon.apimgt.impl</artifactId>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.commons.logging.LogFactory;
import org.wso2.carbon.apimgt.common.jms.factory.JMSConnectionFactory;
import org.wso2.carbon.apimgt.common.jms.factory.JMSTaskManagerFactory;
import org.wso2.carbon.apimgt.impl.dto.EventHubConfigurationDto;

import java.io.IOException;
import java.io.InputStream;
Expand Down Expand Up @@ -73,6 +74,40 @@ public JMSTransportHandler(Properties jmsConnectionProperties) {
jmsConnectionFactory = new JMSConnectionFactory(parameters, ListenerConstants.CONNECTION_FACTORY_NAME);
}

/**
* Initializes a JMSTransportHandler with the provided EventHubConfigurationDto.
*
* @param eventHubConfiguration The EventHubConfigurationDto containing configuration parameters
* for the JMSTransportHandler.
*/
public JMSTransportHandler(EventHubConfigurationDto eventHubConfiguration) {

Properties jmsConnectionProperties =
eventHubConfiguration.getEventHubReceiverConfiguration().getJmsConnectionParameters();
Properties properties;
Hashtable<String, String> parameters = new Hashtable<>();

if (jmsConnectionProperties.isEmpty()) {
properties = new Properties();
ClassLoader classLoader = getClass().getClassLoader();
try (InputStream resourceStream = classLoader.getResourceAsStream(ListenerConstants.MB_PROPERTIES)) {
properties.load(resourceStream);
} catch (IOException e) {
log.error("Cannot read properties file from resources. " + e.getMessage(), e);
}
} else {
properties = jmsConnectionProperties;
}

for (final String name : properties.stringPropertyNames()) {
parameters.put(name, properties.getProperty(name));
}
jmsConnectionFactory = new JMSConnectionFactory(parameters, ListenerConstants.CONNECTION_FACTORY_NAME);
minThreadPoolSize = eventHubConfiguration.getMinThreadPoolSize();
maxThreadPoolSize = eventHubConfiguration.getMaxThreadPoolSize();
jobQueueSize = eventHubConfiguration.getThreadPoolQueueLength();
}

/**
* This method is used to subscribe to JMS topics and receive JMS messages
* @param messageListener
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,12 +100,10 @@ public GatewayStartupListener() {
throttleProperties.getJmsConnectionProperties();
this.jmsTransportHandlerForTrafficManager =
new JMSTransportHandler(jmsConnectionProperties.getJmsConnectionProperties());
EventHubConfigurationDto.EventHubReceiverConfiguration eventHubReceiverConfiguration =
ServiceReferenceHolder.getInstance().getAPIManagerConfiguration().getEventHubConfigurationDto()
.getEventHubReceiverConfiguration();
if (eventHubReceiverConfiguration != null) {
this.jmsTransportHandlerForEventHub =
new JMSTransportHandler(eventHubReceiverConfiguration.getJmsConnectionParameters());
EventHubConfigurationDto eventHubConfiguration =
ServiceReferenceHolder.getInstance().getAPIManagerConfiguration().getEventHubConfigurationDto();

Check warning on line 104 in components/apimgt/org.wso2.carbon.apimgt.gateway/src/main/java/org/wso2/carbon/apimgt/gateway/listeners/GatewayStartupListener.java

View check run for this annotation

Codecov / codecov/patch

components/apimgt/org.wso2.carbon.apimgt.gateway/src/main/java/org/wso2/carbon/apimgt/gateway/listeners/GatewayStartupListener.java#L104

Added line #L104 was not covered by tests
if (eventHubConfiguration.getEventHubReceiverConfiguration() != null) {
this.jmsTransportHandlerForEventHub = new JMSTransportHandler(eventHubConfiguration);

Check warning on line 106 in components/apimgt/org.wso2.carbon.apimgt.gateway/src/main/java/org/wso2/carbon/apimgt/gateway/listeners/GatewayStartupListener.java

View check run for this annotation

Codecov / codecov/patch

components/apimgt/org.wso2.carbon.apimgt.gateway/src/main/java/org/wso2/carbon/apimgt/gateway/listeners/GatewayStartupListener.java#L106

Added line #L106 was not covered by tests
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2513,6 +2513,9 @@ public static class KeyManager {

public static final String SERVICE_URL = "ServiceURL";
public static final String INIT_DELAY = "InitDelay";
public static final String MIN_THREAD_POOL_SIZE = "MinThreadPoolSize";
public static final String MAX_THREAD_POOL_SIZE = "MaxThreadPoolSize";
public static final String THREAD_POOL_QUEUE_LENGTH = "ThreadPoolQueueLength";
public static final String INTROSPECTION_ENDPOINT = "introspection_endpoint";
public static final String CLIENT_REGISTRATION_ENDPOINT = "client_registration_endpoint";
public static final String KEY_MANAGER_OPERATIONS_DCR_ENDPOINT = "/keymanager-operations/dcr/register";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1971,6 +1971,21 @@ private void setEventHubConfiguration(OMElement omElement) {
if (initDelay != null) {
eventHubConfigurationDto.setInitDelay(Integer.parseInt(initDelay.getText()));
}
OMElement minThreadPoolSize = omElement.getFirstChildWithName(

Check warning on line 1974 in components/apimgt/org.wso2.carbon.apimgt.impl/src/main/java/org/wso2/carbon/apimgt/impl/APIManagerConfiguration.java

View check run for this annotation

Codecov / codecov/patch

components/apimgt/org.wso2.carbon.apimgt.impl/src/main/java/org/wso2/carbon/apimgt/impl/APIManagerConfiguration.java#L1974

Added line #L1974 was not covered by tests
new QName(APIConstants.KeyManager.MIN_THREAD_POOL_SIZE));
if (minThreadPoolSize != null) {
eventHubConfigurationDto.setMinThreadPoolSize(Integer.parseInt(minThreadPoolSize.getText()));

Check warning on line 1977 in components/apimgt/org.wso2.carbon.apimgt.impl/src/main/java/org/wso2/carbon/apimgt/impl/APIManagerConfiguration.java

View check run for this annotation

Codecov / codecov/patch

components/apimgt/org.wso2.carbon.apimgt.impl/src/main/java/org/wso2/carbon/apimgt/impl/APIManagerConfiguration.java#L1977

Added line #L1977 was not covered by tests
}
OMElement maxThreadPoolSize = omElement.getFirstChildWithName(

Check warning on line 1979 in components/apimgt/org.wso2.carbon.apimgt.impl/src/main/java/org/wso2/carbon/apimgt/impl/APIManagerConfiguration.java

View check run for this annotation

Codecov / codecov/patch

components/apimgt/org.wso2.carbon.apimgt.impl/src/main/java/org/wso2/carbon/apimgt/impl/APIManagerConfiguration.java#L1979

Added line #L1979 was not covered by tests
new QName(APIConstants.KeyManager.MAX_THREAD_POOL_SIZE));
if (maxThreadPoolSize != null) {
eventHubConfigurationDto.setMaxThreadPoolSize(Integer.parseInt(maxThreadPoolSize.getText()));

Check warning on line 1982 in components/apimgt/org.wso2.carbon.apimgt.impl/src/main/java/org/wso2/carbon/apimgt/impl/APIManagerConfiguration.java

View check run for this annotation

Codecov / codecov/patch

components/apimgt/org.wso2.carbon.apimgt.impl/src/main/java/org/wso2/carbon/apimgt/impl/APIManagerConfiguration.java#L1982

Added line #L1982 was not covered by tests
}
OMElement threadPoolQueueLength = omElement.getFirstChildWithName(

Check warning on line 1984 in components/apimgt/org.wso2.carbon.apimgt.impl/src/main/java/org/wso2/carbon/apimgt/impl/APIManagerConfiguration.java

View check run for this annotation

Codecov / codecov/patch

components/apimgt/org.wso2.carbon.apimgt.impl/src/main/java/org/wso2/carbon/apimgt/impl/APIManagerConfiguration.java#L1984

Added line #L1984 was not covered by tests
new QName(APIConstants.KeyManager.THREAD_POOL_QUEUE_LENGTH));
if (threadPoolQueueLength != null) {
eventHubConfigurationDto.setThreadPoolQueueLength(Integer.parseInt(threadPoolQueueLength.getText()));

Check warning on line 1987 in components/apimgt/org.wso2.carbon.apimgt.impl/src/main/java/org/wso2/carbon/apimgt/impl/APIManagerConfiguration.java

View check run for this annotation

Codecov / codecov/patch

components/apimgt/org.wso2.carbon.apimgt.impl/src/main/java/org/wso2/carbon/apimgt/impl/APIManagerConfiguration.java#L1987

Added line #L1987 was not covered by tests
}
OMElement usernameElement = omElement.getFirstChildWithName(new QName(APIConstants.KeyManager.USERNAME));
if (usernameElement != null) {
eventHubConfigurationDto.setUsername(usernameElement.getText());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ public class EventHubConfigurationDto {
private boolean enabled = false;
private String serviceUrl;
private int initDelay = 1000;
private int minThreadPoolSize;
private int maxThreadPoolSize;
private int threadPoolQueueLength;
private String username;
private char[] password;
private EventHubReceiverConfiguration eventHubReceiverConfiguration;
Expand Down Expand Up @@ -60,6 +63,30 @@ public void setInitDelay(int initDelay) {
this.initDelay = initDelay;
}

public int getMinThreadPoolSize() {
return minThreadPoolSize;

Check warning on line 67 in components/apimgt/org.wso2.carbon.apimgt.impl/src/main/java/org/wso2/carbon/apimgt/impl/dto/EventHubConfigurationDto.java

View check run for this annotation

Codecov / codecov/patch

components/apimgt/org.wso2.carbon.apimgt.impl/src/main/java/org/wso2/carbon/apimgt/impl/dto/EventHubConfigurationDto.java#L67

Added line #L67 was not covered by tests
}

public void setMinThreadPoolSize(int minThreadPoolSize) {
this.minThreadPoolSize = minThreadPoolSize;
}

Check warning on line 72 in components/apimgt/org.wso2.carbon.apimgt.impl/src/main/java/org/wso2/carbon/apimgt/impl/dto/EventHubConfigurationDto.java

View check run for this annotation

Codecov / codecov/patch

components/apimgt/org.wso2.carbon.apimgt.impl/src/main/java/org/wso2/carbon/apimgt/impl/dto/EventHubConfigurationDto.java#L71-L72

Added lines #L71 - L72 were not covered by tests

public int getMaxThreadPoolSize() {
return maxThreadPoolSize;

Check warning on line 75 in components/apimgt/org.wso2.carbon.apimgt.impl/src/main/java/org/wso2/carbon/apimgt/impl/dto/EventHubConfigurationDto.java

View check run for this annotation

Codecov / codecov/patch

components/apimgt/org.wso2.carbon.apimgt.impl/src/main/java/org/wso2/carbon/apimgt/impl/dto/EventHubConfigurationDto.java#L75

Added line #L75 was not covered by tests
}

public void setMaxThreadPoolSize(int maxThreadPoolSize) {
this.maxThreadPoolSize = maxThreadPoolSize;
}

Check warning on line 80 in components/apimgt/org.wso2.carbon.apimgt.impl/src/main/java/org/wso2/carbon/apimgt/impl/dto/EventHubConfigurationDto.java

View check run for this annotation

Codecov / codecov/patch

components/apimgt/org.wso2.carbon.apimgt.impl/src/main/java/org/wso2/carbon/apimgt/impl/dto/EventHubConfigurationDto.java#L79-L80

Added lines #L79 - L80 were not covered by tests

public int getThreadPoolQueueLength() {
return threadPoolQueueLength;

Check warning on line 83 in components/apimgt/org.wso2.carbon.apimgt.impl/src/main/java/org/wso2/carbon/apimgt/impl/dto/EventHubConfigurationDto.java

View check run for this annotation

Codecov / codecov/patch

components/apimgt/org.wso2.carbon.apimgt.impl/src/main/java/org/wso2/carbon/apimgt/impl/dto/EventHubConfigurationDto.java#L83

Added line #L83 was not covered by tests
}

public void setThreadPoolQueueLength(int threadPoolQueueLength) {
this.threadPoolQueueLength = threadPoolQueueLength;
}

Check warning on line 88 in components/apimgt/org.wso2.carbon.apimgt.impl/src/main/java/org/wso2/carbon/apimgt/impl/dto/EventHubConfigurationDto.java

View check run for this annotation

Codecov / codecov/patch

components/apimgt/org.wso2.carbon.apimgt.impl/src/main/java/org/wso2/carbon/apimgt/impl/dto/EventHubConfigurationDto.java#L87-L88

Added lines #L87 - L88 were not covered by tests

public EventHubReceiverConfiguration getEventHubReceiverConfiguration() {

return eventHubReceiverConfiguration;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,10 @@ public class JMSListenerStartupShutdownListener implements ServerStartupObserver

public JMSListenerStartupShutdownListener() {

EventHubConfigurationDto.EventHubReceiverConfiguration eventHubReceiverConfiguration =
ServiceReferenceHolder.getInstance().getAPIMConfiguration().getEventHubConfigurationDto()
.getEventHubReceiverConfiguration();
if (eventHubReceiverConfiguration != null) {
this.jmsTransportHandlerForEventHub =
new JMSTransportHandler(eventHubReceiverConfiguration.getJmsConnectionParameters());
EventHubConfigurationDto eventHubConfiguration =
ServiceReferenceHolder.getInstance().getAPIMConfiguration().getEventHubConfigurationDto();

Check warning on line 45 in components/apimgt/org.wso2.carbon.apimgt.jms.listener/src/main/java/org/wso2/carbon/apimgt/jms/listener/utils/JMSListenerStartupShutdownListener.java

View check run for this annotation

Codecov / codecov/patch

components/apimgt/org.wso2.carbon.apimgt.jms.listener/src/main/java/org/wso2/carbon/apimgt/jms/listener/utils/JMSListenerStartupShutdownListener.java#L45

Added line #L45 was not covered by tests
if (eventHubConfiguration.getEventHubReceiverConfiguration() != null) {
this.jmsTransportHandlerForEventHub = new JMSTransportHandler(eventHubConfiguration);

Check warning on line 47 in components/apimgt/org.wso2.carbon.apimgt.jms.listener/src/main/java/org/wso2/carbon/apimgt/jms/listener/utils/JMSListenerStartupShutdownListener.java

View check run for this annotation

Codecov / codecov/patch

components/apimgt/org.wso2.carbon.apimgt.jms.listener/src/main/java/org/wso2/carbon/apimgt/jms/listener/utils/JMSListenerStartupShutdownListener.java#L47

Added line #L47 was not covered by tests
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,10 @@ public class ThrottlePolicyStartupListener implements ServerStartupObserver, Ser

public ThrottlePolicyStartupListener() {

EventHubConfigurationDto.EventHubReceiverConfiguration eventHubReceiverConfiguration =
ServiceReferenceHolder.getInstance().getAPIMConfiguration().getEventHubConfigurationDto()
.getEventHubReceiverConfiguration();
if (eventHubReceiverConfiguration != null) {
this.jmsTransportHandlerForEventHub =
new JMSTransportHandler(eventHubReceiverConfiguration.getJmsConnectionParameters());
EventHubConfigurationDto eventHubConfiguration =
ServiceReferenceHolder.getInstance().getAPIMConfiguration().getEventHubConfigurationDto();

Check warning on line 42 in components/apimgt/org.wso2.carbon.apimgt.throttle.policy.deployer/src/main/java/org/wso2/carbon/apimgt/throttle/policy/deployer/utils/ThrottlePolicyStartupListener.java

View check run for this annotation

Codecov / codecov/patch

components/apimgt/org.wso2.carbon.apimgt.throttle.policy.deployer/src/main/java/org/wso2/carbon/apimgt/throttle/policy/deployer/utils/ThrottlePolicyStartupListener.java#L42

Added line #L42 was not covered by tests
if (eventHubConfiguration.getEventHubReceiverConfiguration() != null) {
this.jmsTransportHandlerForEventHub = new JMSTransportHandler(eventHubConfiguration);

Check warning on line 44 in components/apimgt/org.wso2.carbon.apimgt.throttle.policy.deployer/src/main/java/org/wso2/carbon/apimgt/throttle/policy/deployer/utils/ThrottlePolicyStartupListener.java

View check run for this annotation

Codecov / codecov/patch

components/apimgt/org.wso2.carbon.apimgt.throttle.policy.deployer/src/main/java/org/wso2/carbon/apimgt/throttle/policy/deployer/utils/ThrottlePolicyStartupListener.java#L44

Added line #L44 was not covered by tests
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,9 @@
"apim.key_manager.enable_retriever": true,
"apim.event_hub.username": "$ref{apim.throttling.username}",
"apim.event_hub.password": "$ref{apim.throttling.password}",
"apim.event_hub.min_thread_pool_size": "20",
"apim.event_hub.max_thread_pool_size": "100",
"apim.event_hub.thread_pool_queue_length": "10",
"apim.key_manager.type": "default",
"apim.devportal.enable_cross_tenant_subscriptions": false,
"apim.devportal.default_reserved_username": "apim_reserved_user",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1301,6 +1301,9 @@
{% if apim.event_hub.init_delay is defined %}
<InitDelay>{{apim.event_hub.init_delay}}</InitDelay>
{% endif %}
<MinThreadPoolSize>{{apim.event_hub.min_thread_pool_size}}</MinThreadPoolSize>
<MaxThreadPoolSize>{{apim.event_hub.max_thread_pool_size}}</MaxThreadPoolSize>
<ThreadPoolQueueLength>{{apim.event_hub.thread_pool_queue_length}}</ThreadPoolQueueLength>
<EventPublisherConfiguration>
<Type>{{apim.event_hub.event_type}}</Type>
{% if apim.event_hub.publish.url_group is defined %}
Expand Down

0 comments on commit 55ccfbc

Please sign in to comment.