diff --git a/components/apimgt/org.wso2.carbon.apimgt.gateway/src/main/java/org/wso2/carbon/apimgt/gateway/handlers/transaction/TransactionCountHandler.java b/components/apimgt/org.wso2.carbon.apimgt.gateway/src/main/java/org/wso2/carbon/apimgt/gateway/handlers/transaction/TransactionCountHandler.java index bd2bc32b8ef7..3ff44347c0e3 100644 --- a/components/apimgt/org.wso2.carbon.apimgt.gateway/src/main/java/org/wso2/carbon/apimgt/gateway/handlers/transaction/TransactionCountHandler.java +++ b/components/apimgt/org.wso2.carbon.apimgt.gateway/src/main/java/org/wso2/carbon/apimgt/gateway/handlers/transaction/TransactionCountHandler.java @@ -7,6 +7,7 @@ import org.apache.synapse.core.axis2.Axis2MessageContext; import org.wso2.carbon.apimgt.gateway.APIMgtGatewayConstants; import org.wso2.carbon.apimgt.gateway.handlers.transaction.store.TransactionCountStore; + import java.lang.reflect.Constructor; import java.util.ArrayList; import java.util.concurrent.*; @@ -21,6 +22,7 @@ public class TransactionCountHandler extends AbstractExtendedSynapseHandler { private int TRANSACTION_COUNT_COMMIT_INTERVAL = 10; private int THREAD_POOL_SIZE = 5; private int TRANSACTION_COUNT_QUEUE_SIZE = 1000; + private String TRANSACTION_COUNT_STORE_CLASS = "org.wso2.carbon.apimgt.gateway.handlers.transaction.store.TransactionCountStoreImpl"; private static final Log LOG = LogFactory.getLog(TransactionCountHandler.class); private static ReentrantLock lock = new ReentrantLock(); @@ -33,13 +35,14 @@ public class TransactionCountHandler extends AbstractExtendedSynapseHandler { public TransactionCountHandler() { + this.transactionCountRecordQueue = new LinkedBlockingDeque<>(TRANSACTION_COUNT_QUEUE_SIZE); this.transactionCountExecutor = Executors.newFixedThreadPool(THREAD_POOL_SIZE); - this.transactionCountScheduledExecutor = Executors.newScheduledThreadPool(1); + this.transactionCountScheduledExecutor = Executors.newSingleThreadScheduledExecutor(); // Load the transaction count store try { - Class clazz = Class.forName("org.wso2.carbon.apimgt.gateway.handlers.transaction.store.TransactionCountStoreImpl"); + Class clazz = Class.forName(TRANSACTION_COUNT_STORE_CLASS); Constructor constructor = clazz.getConstructor(); this.trasactionCountStore = (TransactionCountStore) constructor.newInstance(); } catch (Exception e) { @@ -63,6 +66,7 @@ public boolean handleRequestInFlow(MessageContext messageContext) { // Counting message received via an open WebSocket String transport = axis2MessageContext.getIncomingTransportName(); if (transport.equals(APIMgtGatewayConstants.TRANSPORT_WS) || transport.equals(APIMgtGatewayConstants.TRANSPORT_WSS)){ + LOG.info("Counting WebSocket message"); transactionCountExecutor.execute(this::handleTransactionCountCommit); } } catch (RejectedExecutionException e) { @@ -81,6 +85,7 @@ public boolean handleRequestOutFlow(MessageContext messageContext) { // Counting outgoing messages that are not related to any request-response pair if (isThereAnAssociatedIncomingRequest == null) { + LOG.info("Counting async outgoing message"); transactionCountExecutor.execute(this::handleTransactionCountCommit); } } catch (RejectedExecutionException e) { @@ -103,6 +108,7 @@ public boolean handleResponseOutFlow(MessageContext messageContext) { // Counting request-response pairs if (isThereAnAssociatedIncomingRequest instanceof Boolean) { + LOG.info("Counting request-response pair"); transactionCountExecutor.execute(this::handleTransactionCountCommit); } return true; @@ -142,6 +148,7 @@ private void handleScheduledTransactionCountCommit() { } private void commitWithRetries() { + LOG.info("Local transaction count: " + transactionCount.get()); // Arraylist of transaction count records will be committed to the store ArrayList transactionCountRecordList = new ArrayList<>(); transactionCountRecordQueue.drainTo(transactionCountRecordList);