Skip to content

Commit

Permalink
Code refactoring in TransactionCountHandler
Browse files Browse the repository at this point in the history
  • Loading branch information
IsuruMaduranga committed Jul 19, 2023
1 parent 801b881 commit 56140ea
Showing 1 changed file with 9 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import org.apache.synapse.core.axis2.Axis2MessageContext;
import org.wso2.carbon.apimgt.gateway.APIMgtGatewayConstants;
import org.wso2.carbon.apimgt.gateway.handlers.transaction.store.TransactionCountStore;

import java.lang.reflect.Constructor;
import java.util.ArrayList;
import java.util.concurrent.*;
Expand All @@ -21,6 +22,7 @@ public class TransactionCountHandler extends AbstractExtendedSynapseHandler {
private int TRANSACTION_COUNT_COMMIT_INTERVAL = 10;
private int THREAD_POOL_SIZE = 5;
private int TRANSACTION_COUNT_QUEUE_SIZE = 1000;
private String TRANSACTION_COUNT_STORE_CLASS = "org.wso2.carbon.apimgt.gateway.handlers.transaction.store.TransactionCountStoreImpl";

private static final Log LOG = LogFactory.getLog(TransactionCountHandler.class);
private static ReentrantLock lock = new ReentrantLock();
Expand All @@ -33,13 +35,14 @@ public class TransactionCountHandler extends AbstractExtendedSynapseHandler {


public TransactionCountHandler() {

this.transactionCountRecordQueue = new LinkedBlockingDeque<>(TRANSACTION_COUNT_QUEUE_SIZE);
this.transactionCountExecutor = Executors.newFixedThreadPool(THREAD_POOL_SIZE);
this.transactionCountScheduledExecutor = Executors.newScheduledThreadPool(1);
this.transactionCountScheduledExecutor = Executors.newSingleThreadScheduledExecutor();

// Load the transaction count store
try {
Class<?> clazz = Class.forName("org.wso2.carbon.apimgt.gateway.handlers.transaction.store.TransactionCountStoreImpl");
Class<?> clazz = Class.forName(TRANSACTION_COUNT_STORE_CLASS);
Constructor<?> constructor = clazz.getConstructor();
this.trasactionCountStore = (TransactionCountStore) constructor.newInstance();
} catch (Exception e) {
Expand All @@ -63,6 +66,7 @@ public boolean handleRequestInFlow(MessageContext messageContext) {
// Counting message received via an open WebSocket
String transport = axis2MessageContext.getIncomingTransportName();
if (transport.equals(APIMgtGatewayConstants.TRANSPORT_WS) || transport.equals(APIMgtGatewayConstants.TRANSPORT_WSS)){
LOG.info("Counting WebSocket message");
transactionCountExecutor.execute(this::handleTransactionCountCommit);
}
} catch (RejectedExecutionException e) {
Expand All @@ -81,6 +85,7 @@ public boolean handleRequestOutFlow(MessageContext messageContext) {

// Counting outgoing messages that are not related to any request-response pair
if (isThereAnAssociatedIncomingRequest == null) {
LOG.info("Counting async outgoing message");
transactionCountExecutor.execute(this::handleTransactionCountCommit);
}
} catch (RejectedExecutionException e) {
Expand All @@ -103,6 +108,7 @@ public boolean handleResponseOutFlow(MessageContext messageContext) {

// Counting request-response pairs
if (isThereAnAssociatedIncomingRequest instanceof Boolean) {
LOG.info("Counting request-response pair");
transactionCountExecutor.execute(this::handleTransactionCountCommit);
}
return true;
Expand Down Expand Up @@ -142,6 +148,7 @@ private void handleScheduledTransactionCountCommit() {
}

private void commitWithRetries() {
LOG.info("Local transaction count: " + transactionCount.get());
// Arraylist of transaction count records will be committed to the store
ArrayList<TransactionCountRecord> transactionCountRecordList = new ArrayList<>();
transactionCountRecordQueue.drainTo(transactionCountRecordList);
Expand Down

0 comments on commit 56140ea

Please sign in to comment.