From 1d16d38d4dca0fea0551b6af644b9a8d966a8c3a Mon Sep 17 00:00:00 2001 From: Isuru Wijesiri Date: Wed, 26 Jul 2023 14:04:47 +0530 Subject: [PATCH] Refactor transactional counting to a producer consumer pattern --- .../transaction/TransactionCountConfig.java | 15 +++ .../transaction/TransactionCountHandler.java | 111 ++++-------------- .../transaction/TransactionCounter.java | 31 +++++ ...ountRecord.java => TransactionRecord.java} | 6 +- .../TransactionRecordConsumer.java | 88 ++++++++++++++ .../TransactionRecordProducer.java | 90 ++++++++++++++ .../queue/TransactionRecordQueue.java | 47 ++++++++ .../store/TransactionCountStoreImpl.java | 19 --- ...Store.java => TransactionRecordStore.java} | 6 +- .../store/TransactionRecordStoreImpl.java | 33 ++++++ 10 files changed, 335 insertions(+), 111 deletions(-) create mode 100644 components/apimgt/org.wso2.carbon.apimgt.gateway/src/main/java/org/wso2/carbon/apimgt/gateway/handlers/transaction/TransactionCountConfig.java create mode 100644 components/apimgt/org.wso2.carbon.apimgt.gateway/src/main/java/org/wso2/carbon/apimgt/gateway/handlers/transaction/TransactionCounter.java rename components/apimgt/org.wso2.carbon.apimgt.gateway/src/main/java/org/wso2/carbon/apimgt/gateway/handlers/transaction/{TransactionCountRecord.java => TransactionRecord.java} (76%) create mode 100644 components/apimgt/org.wso2.carbon.apimgt.gateway/src/main/java/org/wso2/carbon/apimgt/gateway/handlers/transaction/TransactionRecordConsumer.java create mode 100644 components/apimgt/org.wso2.carbon.apimgt.gateway/src/main/java/org/wso2/carbon/apimgt/gateway/handlers/transaction/TransactionRecordProducer.java create mode 100644 components/apimgt/org.wso2.carbon.apimgt.gateway/src/main/java/org/wso2/carbon/apimgt/gateway/handlers/transaction/queue/TransactionRecordQueue.java delete mode 100644 components/apimgt/org.wso2.carbon.apimgt.gateway/src/main/java/org/wso2/carbon/apimgt/gateway/handlers/transaction/store/TransactionCountStoreImpl.java rename components/apimgt/org.wso2.carbon.apimgt.gateway/src/main/java/org/wso2/carbon/apimgt/gateway/handlers/transaction/store/{TransactionCountStore.java => TransactionRecordStore.java} (56%) create mode 100644 components/apimgt/org.wso2.carbon.apimgt.gateway/src/main/java/org/wso2/carbon/apimgt/gateway/handlers/transaction/store/TransactionRecordStoreImpl.java diff --git a/components/apimgt/org.wso2.carbon.apimgt.gateway/src/main/java/org/wso2/carbon/apimgt/gateway/handlers/transaction/TransactionCountConfig.java b/components/apimgt/org.wso2.carbon.apimgt.gateway/src/main/java/org/wso2/carbon/apimgt/gateway/handlers/transaction/TransactionCountConfig.java new file mode 100644 index 000000000000..00afeac9e06e --- /dev/null +++ b/components/apimgt/org.wso2.carbon.apimgt.gateway/src/main/java/org/wso2/carbon/apimgt/gateway/handlers/transaction/TransactionCountConfig.java @@ -0,0 +1,15 @@ +package org.wso2.carbon.apimgt.gateway.handlers.transaction; + +import org.wso2.carbon.apimgt.impl.APIManagerConfiguration; +import org.wso2.carbon.apimgt.impl.internal.ServiceReferenceHolder; + +public class TransactionCountConfig { + + private static APIManagerConfiguration apiManagerConfiguration; + public TransactionCountConfig() { + apiManagerConfiguration = ServiceReferenceHolder.getInstance().getAPIManagerConfigurationService() + .getAPIManagerConfiguration(); + } + + +} diff --git a/components/apimgt/org.wso2.carbon.apimgt.gateway/src/main/java/org/wso2/carbon/apimgt/gateway/handlers/transaction/TransactionCountHandler.java b/components/apimgt/org.wso2.carbon.apimgt.gateway/src/main/java/org/wso2/carbon/apimgt/gateway/handlers/transaction/TransactionCountHandler.java index 3ff44347c0e3..b9df5fc9363a 100644 --- a/components/apimgt/org.wso2.carbon.apimgt.gateway/src/main/java/org/wso2/carbon/apimgt/gateway/handlers/transaction/TransactionCountHandler.java +++ b/components/apimgt/org.wso2.carbon.apimgt.gateway/src/main/java/org/wso2/carbon/apimgt/gateway/handlers/transaction/TransactionCountHandler.java @@ -6,52 +6,46 @@ import org.apache.synapse.MessageContext; import org.apache.synapse.core.axis2.Axis2MessageContext; import org.wso2.carbon.apimgt.gateway.APIMgtGatewayConstants; -import org.wso2.carbon.apimgt.gateway.handlers.transaction.store.TransactionCountStore; +import org.wso2.carbon.apimgt.gateway.handlers.transaction.queue.TransactionRecordQueue; +import org.wso2.carbon.apimgt.gateway.handlers.transaction.store.TransactionRecordStore; import java.lang.reflect.Constructor; -import java.util.ArrayList; import java.util.concurrent.*; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.locks.ReentrantLock; public class TransactionCountHandler extends AbstractExtendedSynapseHandler { // Todo: Make these parameters configurable via deployment.toml - private static final double MAX_TRANSACTION_COUNT = Integer.MAX_VALUE * 0.9; - private int MAX_RETRY_COUNT = 3; - private int TRANSACTION_COUNT_COMMIT_INTERVAL = 10; - private int THREAD_POOL_SIZE = 5; - private int TRANSACTION_COUNT_QUEUE_SIZE = 1000; - private String TRANSACTION_COUNT_STORE_CLASS = "org.wso2.carbon.apimgt.gateway.handlers.transaction.store.TransactionCountStoreImpl"; + private final int PRODUCER_THREAD_POOL_SIZE = 5; + private final int CONSUMER_THREAD_POOL_SIZE = 5; + private final int TRANSACTION_RECORD_QUEUE_SIZE = 1000; + private final String TRANSACTION_COUNT_STORE_CLASS = "org.wso2.carbon.apimgt.gateway.handlers.transaction.store.TransactionRecordStoreImpl"; private static final Log LOG = LogFactory.getLog(TransactionCountHandler.class); - private static ReentrantLock lock = new ReentrantLock(); - private static AtomicInteger transactionCount = new AtomicInteger(0); - - private BlockingQueue transactionCountRecordQueue; - private ExecutorService transactionCountExecutor; - private ScheduledExecutorService transactionCountScheduledExecutor; - private TransactionCountStore trasactionCountStore; + private TransactionRecordQueue transactionRecordQueue; + private TransactionRecordProducer transactionRecordProducer; + private TransactionRecordConsumer transactionRecordConsumer; + private TransactionRecordStore trasactionCountStore; public TransactionCountHandler() { - this.transactionCountRecordQueue = new LinkedBlockingDeque<>(TRANSACTION_COUNT_QUEUE_SIZE); - this.transactionCountExecutor = Executors.newFixedThreadPool(THREAD_POOL_SIZE); - this.transactionCountScheduledExecutor = Executors.newSingleThreadScheduledExecutor(); - + this.transactionRecordQueue = TransactionRecordQueue.getInstance(TRANSACTION_RECORD_QUEUE_SIZE); // Load the transaction count store try { Class clazz = Class.forName(TRANSACTION_COUNT_STORE_CLASS); Constructor constructor = clazz.getConstructor(); - this.trasactionCountStore = (TransactionCountStore) constructor.newInstance(); + this.trasactionCountStore = (TransactionRecordStore) constructor.newInstance(); } catch (Exception e) { LOG.error("Error while loading the transaction count store.", e); } - // Start the transaction count record scheduler - transactionCountScheduledExecutor.scheduleAtFixedRate(this::handleScheduledTransactionCountCommit, - 0, TRANSACTION_COUNT_COMMIT_INTERVAL, TimeUnit.SECONDS); + this.transactionRecordProducer = TransactionRecordProducer.getInstance(transactionRecordQueue, + PRODUCER_THREAD_POOL_SIZE); + this.transactionRecordConsumer = TransactionRecordConsumer.getInstance(transactionRecordQueue, + trasactionCountStore, CONSUMER_THREAD_POOL_SIZE); + + this.transactionRecordProducer.start(); + this.transactionRecordConsumer.start(); } @Override @@ -67,7 +61,7 @@ public boolean handleRequestInFlow(MessageContext messageContext) { String transport = axis2MessageContext.getIncomingTransportName(); if (transport.equals(APIMgtGatewayConstants.TRANSPORT_WS) || transport.equals(APIMgtGatewayConstants.TRANSPORT_WSS)){ LOG.info("Counting WebSocket message"); - transactionCountExecutor.execute(this::handleTransactionCountCommit); + this.transactionRecordProducer.addTransaction(); } } catch (RejectedExecutionException e) { LOG.error("Transaction could not be counted.", e); @@ -86,7 +80,7 @@ public boolean handleRequestOutFlow(MessageContext messageContext) { // Counting outgoing messages that are not related to any request-response pair if (isThereAnAssociatedIncomingRequest == null) { LOG.info("Counting async outgoing message"); - transactionCountExecutor.execute(this::handleTransactionCountCommit); + this.transactionRecordProducer.addTransaction(); } } catch (RejectedExecutionException e) { LOG.error("Transaction could not be counted.", e); @@ -109,67 +103,11 @@ public boolean handleResponseOutFlow(MessageContext messageContext) { // Counting request-response pairs if (isThereAnAssociatedIncomingRequest instanceof Boolean) { LOG.info("Counting request-response pair"); - transactionCountExecutor.execute(this::handleTransactionCountCommit); + this.transactionRecordProducer.addTransaction(); } return true; } - private void handleTransactionCountCommit() { - lock.lock(); - try { - if (transactionCount.incrementAndGet() >= MAX_TRANSACTION_COUNT) { - TransactionCountRecord transactionCountRecord = new TransactionCountRecord(transactionCount.get()); - transactionCountRecordQueue.add(transactionCountRecord); - transactionCount.set(0); - } - } catch (Exception e) { - LOG.error("Error while handling transaction count.", e); - } finally { - lock.unlock(); - } - this.commitWithRetries(); - } - - private void handleScheduledTransactionCountCommit() { - lock.lock(); - try { - int transactionCountValue = transactionCount.get(); - if (transactionCountValue != 0) { - TransactionCountRecord transactionCountRecord = new TransactionCountRecord(transactionCountValue); - transactionCountRecordQueue.add(transactionCountRecord); - transactionCount.set(0); - } - } catch (Exception e) { - LOG.error("Error while handling transaction count.", e); - } finally { - lock.unlock(); - } - this.commitWithRetries(); - } - - private void commitWithRetries() { - LOG.info("Local transaction count: " + transactionCount.get()); - // Arraylist of transaction count records will be committed to the store - ArrayList transactionCountRecordList = new ArrayList<>(); - transactionCountRecordQueue.drainTo(transactionCountRecordList); - - if (transactionCountRecordList.isEmpty()) { - return; - } - - // Committing the transaction count records to the store with retries - // If failed to commit after MAX_RETRY_COUNT, the transaction count records will be added to the queue again - boolean commited = false; - int retryCount = 0; - while (!commited && retryCount < MAX_RETRY_COUNT) { - commited = trasactionCountStore.commit(transactionCountRecordList); - retryCount++; - } - if (!commited) { - transactionCountRecordQueue.addAll(transactionCountRecordList); - } - } - @Override public boolean handleServerInit() { // Nothing to implement @@ -179,8 +117,9 @@ public boolean handleServerInit() { @Override public boolean handleServerShutDown() { // Clen up resources - transactionCountExecutor.shutdown(); - transactionCountScheduledExecutor.shutdown(); + transactionRecordProducer.shutdown(); + transactionRecordConsumer.shutdown(); + transactionRecordQueue.clenUp(); trasactionCountStore.clenUp(); return true; } diff --git a/components/apimgt/org.wso2.carbon.apimgt.gateway/src/main/java/org/wso2/carbon/apimgt/gateway/handlers/transaction/TransactionCounter.java b/components/apimgt/org.wso2.carbon.apimgt.gateway/src/main/java/org/wso2/carbon/apimgt/gateway/handlers/transaction/TransactionCounter.java new file mode 100644 index 000000000000..2ae53aaea1eb --- /dev/null +++ b/components/apimgt/org.wso2.carbon.apimgt.gateway/src/main/java/org/wso2/carbon/apimgt/gateway/handlers/transaction/TransactionCounter.java @@ -0,0 +1,31 @@ +package org.wso2.carbon.apimgt.gateway.handlers.transaction; + +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.ReentrantLock; + +public class TransactionCounter { + + private static TransactionCounter instance = null; + private static AtomicInteger transactionCount = new AtomicInteger(0); + private static ReentrantLock lock = new ReentrantLock(); + private static int MAX_TRANSACTION_COUNT; + private TransactionCounter(int maxTransactionCount) { + this.MAX_TRANSACTION_COUNT = maxTransactionCount; + } + + public static TransactionCounter getInstance(int maxTransactionCount) { + if(instance == null) { + instance = new TransactionCounter(maxTransactionCount); + } + return instance; + } + public static void increment() { + lock.lock(); + try { + transactionCount.incrementAndGet(); + } finally { + lock.unlock(); + } + } + +} diff --git a/components/apimgt/org.wso2.carbon.apimgt.gateway/src/main/java/org/wso2/carbon/apimgt/gateway/handlers/transaction/TransactionCountRecord.java b/components/apimgt/org.wso2.carbon.apimgt.gateway/src/main/java/org/wso2/carbon/apimgt/gateway/handlers/transaction/TransactionRecord.java similarity index 76% rename from components/apimgt/org.wso2.carbon.apimgt.gateway/src/main/java/org/wso2/carbon/apimgt/gateway/handlers/transaction/TransactionCountRecord.java rename to components/apimgt/org.wso2.carbon.apimgt.gateway/src/main/java/org/wso2/carbon/apimgt/gateway/handlers/transaction/TransactionRecord.java index 606a76f9a936..ceef7bc65ad0 100644 --- a/components/apimgt/org.wso2.carbon.apimgt.gateway/src/main/java/org/wso2/carbon/apimgt/gateway/handlers/transaction/TransactionCountRecord.java +++ b/components/apimgt/org.wso2.carbon.apimgt.gateway/src/main/java/org/wso2/carbon/apimgt/gateway/handlers/transaction/TransactionRecord.java @@ -3,13 +3,13 @@ import java.time.Instant; import java.util.UUID; -public class TransactionCountRecord { +public class TransactionRecord { - private final String id = UUID.randomUUID().toString(); + private static final String id = UUID.randomUUID().toString(); private Integer count; private final String recordedTime; - public TransactionCountRecord(Integer count) { + public TransactionRecord(Integer count) { this.count = count; this.recordedTime = Instant.now().toString(); } diff --git a/components/apimgt/org.wso2.carbon.apimgt.gateway/src/main/java/org/wso2/carbon/apimgt/gateway/handlers/transaction/TransactionRecordConsumer.java b/components/apimgt/org.wso2.carbon.apimgt.gateway/src/main/java/org/wso2/carbon/apimgt/gateway/handlers/transaction/TransactionRecordConsumer.java new file mode 100644 index 000000000000..ee5514e944f5 --- /dev/null +++ b/components/apimgt/org.wso2.carbon.apimgt.gateway/src/main/java/org/wso2/carbon/apimgt/gateway/handlers/transaction/TransactionRecordConsumer.java @@ -0,0 +1,88 @@ +package org.wso2.carbon.apimgt.gateway.handlers.transaction; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.wso2.carbon.apimgt.gateway.handlers.transaction.queue.TransactionRecordQueue; +import org.wso2.carbon.apimgt.gateway.handlers.transaction.store.TransactionRecordStore; + +import java.util.ArrayList; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +public class TransactionRecordConsumer { + + // Todo: Make these parameters configurable via deployment.toml + private int MAX_RETRY_COUNT = 3; + private int TRANSACTION_COUNT_COMMIT_INTERVAL = 10; + private int MAX_TRANSACTION_RECORDS_PER_COMMIT = 10; + + private static final Log LOG = LogFactory.getLog(TransactionRecordConsumer.class); + private static TransactionRecordConsumer instance = null; + private TransactionRecordStore transactionRecordStore; + private TransactionRecordQueue transactionRecordQueue; + private ExecutorService executorService; + private final int threadPoolSize; + + private TransactionRecordConsumer(TransactionRecordStore transactionRecordStore, + TransactionRecordQueue transactionRecordQueue, int threadPoolSize) { + this.transactionRecordStore = transactionRecordStore; + this.transactionRecordQueue = transactionRecordQueue; + this.executorService = Executors.newFixedThreadPool(threadPoolSize); + this.threadPoolSize = threadPoolSize; + } + + public static TransactionRecordConsumer getInstance(TransactionRecordQueue transactionRecordQueue, + TransactionRecordStore transactionRecordStore, + int threadPoolSize) { + if(instance == null) { + instance = new TransactionRecordConsumer(transactionRecordStore, transactionRecordQueue, threadPoolSize); + } + return instance; + } + + public void start() { + LOG.info("Transaction record consumer started"); + // execute the startCommitting method in all the threads + for (int i = 0; i < threadPoolSize; i++) { + executorService.execute(this::startCommitting); + } + } + + private void startCommitting() { + try { + while (true) { + commitWithRetries(); + } + } catch (InterruptedException ex) { + LOG.debug("Transaction record consumer interrupted"); + } + } + + private void commitWithRetries() throws InterruptedException { + + // Arraylist of transaction count records will be committed to the store + ArrayList transactionRecordList = new ArrayList<>(); + TransactionRecord transactionRecord = null; + transactionRecord = transactionRecordQueue.take(); + + transactionRecordList.add(transactionRecord); + transactionRecordQueue.drain(transactionRecordList, MAX_TRANSACTION_RECORDS_PER_COMMIT); + + // Committing the transaction count records to the store with retries + // If failed to commit after MAX_RETRY_COUNT, the transaction count records will be added to the queue again + boolean commited = false; + int retryCount = 0; + while (!commited && retryCount < MAX_RETRY_COUNT) { + commited = this.transactionRecordStore.commit(transactionRecordList); + retryCount++; + } + if (!commited) { + transactionRecordQueue.addAll(transactionRecordList); + } + } + + public void shutdown() { + this.executorService.shutdownNow(); + } + +} diff --git a/components/apimgt/org.wso2.carbon.apimgt.gateway/src/main/java/org/wso2/carbon/apimgt/gateway/handlers/transaction/TransactionRecordProducer.java b/components/apimgt/org.wso2.carbon.apimgt.gateway/src/main/java/org/wso2/carbon/apimgt/gateway/handlers/transaction/TransactionRecordProducer.java new file mode 100644 index 000000000000..57bebac782d1 --- /dev/null +++ b/components/apimgt/org.wso2.carbon.apimgt.gateway/src/main/java/org/wso2/carbon/apimgt/gateway/handlers/transaction/TransactionRecordProducer.java @@ -0,0 +1,90 @@ +package org.wso2.carbon.apimgt.gateway.handlers.transaction; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.wso2.carbon.apimgt.gateway.handlers.transaction.queue.TransactionRecordQueue; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.ReentrantLock; + +public class TransactionRecordProducer { + + // Todo: Make these parameters configurable via deployment.toml + private static final double MAX_TRANSACTION_COUNT = Integer.MAX_VALUE * 0.9; + private int TRANSACTION_COUNT_COMMIT_INTERVAL = 10; + + private static final Log LOG = LogFactory.getLog(TransactionRecordProducer.class); + private static TransactionRecordProducer instance = null; + private TransactionRecordQueue transactionRecordQueue; + private ExecutorService executorService; + private ScheduledExecutorService scheduledExecutorService; + private static ReentrantLock lock = new ReentrantLock(); + private static AtomicInteger transactionCount = new AtomicInteger(0); + private TransactionRecordProducer(TransactionRecordQueue transactionRecordQueue, int threadPoolSize) { + this.transactionRecordQueue = transactionRecordQueue; + this.executorService = Executors.newFixedThreadPool(threadPoolSize); + this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(); + } + + public static TransactionRecordProducer getInstance(TransactionRecordQueue transactionRecordQueue, + int threadPoolSize) { + if(instance == null) { + instance = new TransactionRecordProducer(transactionRecordQueue, threadPoolSize); + } + return instance; + } + + public void start() { + LOG.info("Transaction record producer started"); + // Start the transaction count record scheduler + scheduledExecutorService.scheduleAtFixedRate(this::produceRecordScheduled, + 0, TRANSACTION_COUNT_COMMIT_INTERVAL, TimeUnit.SECONDS); + } + + public void addTransaction() { + executorService.execute(this::produceRecord); + } + + private void produceRecord() { + lock.lock(); + try { + if (transactionCount.incrementAndGet() >= MAX_TRANSACTION_COUNT) { + TransactionRecord transactionRecord = new TransactionRecord(transactionCount.get()); + LOG.info("Transaction count is added to the queue from producer"); + transactionRecordQueue.add(transactionRecord); + transactionCount.set(0); + } + } catch (Exception e) { + LOG.error("Error while handling transaction count.", e); + } finally { + lock.unlock(); + } + } + + private void produceRecordScheduled() { + lock.lock(); + try { + int transactionCountValue = transactionCount.get(); + if (transactionCountValue != 0) { + TransactionRecord transactionRecord = new TransactionRecord(transactionCountValue); + LOG.info("Transaction count is added to the queue from scheduled producer"); + transactionRecordQueue.add(transactionRecord); + transactionCount.set(0); + } + } catch (Exception e) { + LOG.error("Error while handling transaction count.", e); + } finally { + lock.unlock(); + } + } + + public void shutdown() { + scheduledExecutorService.shutdownNow(); + executorService.shutdownNow(); + } + +} diff --git a/components/apimgt/org.wso2.carbon.apimgt.gateway/src/main/java/org/wso2/carbon/apimgt/gateway/handlers/transaction/queue/TransactionRecordQueue.java b/components/apimgt/org.wso2.carbon.apimgt.gateway/src/main/java/org/wso2/carbon/apimgt/gateway/handlers/transaction/queue/TransactionRecordQueue.java new file mode 100644 index 000000000000..46b3cd392f81 --- /dev/null +++ b/components/apimgt/org.wso2.carbon.apimgt.gateway/src/main/java/org/wso2/carbon/apimgt/gateway/handlers/transaction/queue/TransactionRecordQueue.java @@ -0,0 +1,47 @@ +package org.wso2.carbon.apimgt.gateway.handlers.transaction.queue; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.wso2.carbon.apimgt.gateway.handlers.transaction.TransactionRecord; +import org.wso2.carbon.apimgt.gateway.handlers.transaction.TransactionRecordProducer; + +import java.util.ArrayList; +import java.util.concurrent.ArrayBlockingQueue; + +public class TransactionRecordQueue { + + private static final Log LOG = LogFactory.getLog(TransactionRecordQueue.class); + private static TransactionRecordQueue instance = null; + private static ArrayBlockingQueue transactionRecordQueue; + + private TransactionRecordQueue(int size) { + transactionRecordQueue = new ArrayBlockingQueue<>(size); + } + + public static TransactionRecordQueue getInstance(int size) { + if(instance == null) { + instance = new TransactionRecordQueue(size); + } + return instance; + } + + public void add(TransactionRecord transactionRecord) { + LOG.info("Transaction count is added to the queue"); + transactionRecordQueue.add(transactionRecord); + } + public void addAll(ArrayList transactionRecordList) { + transactionRecordQueue.addAll(transactionRecordList); + } + + public TransactionRecord take() throws InterruptedException { + return transactionRecordQueue.take(); + } + + public void drain(ArrayList transactionRecordList, int maxRecords) { + transactionRecordQueue.drainTo(transactionRecordList, maxRecords); + } + + public void clenUp() { + transactionRecordQueue.clear(); + } +} diff --git a/components/apimgt/org.wso2.carbon.apimgt.gateway/src/main/java/org/wso2/carbon/apimgt/gateway/handlers/transaction/store/TransactionCountStoreImpl.java b/components/apimgt/org.wso2.carbon.apimgt.gateway/src/main/java/org/wso2/carbon/apimgt/gateway/handlers/transaction/store/TransactionCountStoreImpl.java deleted file mode 100644 index da808653ffc4..000000000000 --- a/components/apimgt/org.wso2.carbon.apimgt.gateway/src/main/java/org/wso2/carbon/apimgt/gateway/handlers/transaction/store/TransactionCountStoreImpl.java +++ /dev/null @@ -1,19 +0,0 @@ -package org.wso2.carbon.apimgt.gateway.handlers.transaction.store; - -import org.wso2.carbon.apimgt.gateway.handlers.transaction.TransactionCountRecord; - -import java.util.ArrayList; - -public class TransactionCountStoreImpl implements TransactionCountStore { - - @Override - public boolean commit(ArrayList transactionCountRecordList) { - // To be implemented - return true; - } - - @Override - public void clenUp() { - // To be implemented - } -} diff --git a/components/apimgt/org.wso2.carbon.apimgt.gateway/src/main/java/org/wso2/carbon/apimgt/gateway/handlers/transaction/store/TransactionCountStore.java b/components/apimgt/org.wso2.carbon.apimgt.gateway/src/main/java/org/wso2/carbon/apimgt/gateway/handlers/transaction/store/TransactionRecordStore.java similarity index 56% rename from components/apimgt/org.wso2.carbon.apimgt.gateway/src/main/java/org/wso2/carbon/apimgt/gateway/handlers/transaction/store/TransactionCountStore.java rename to components/apimgt/org.wso2.carbon.apimgt.gateway/src/main/java/org/wso2/carbon/apimgt/gateway/handlers/transaction/store/TransactionRecordStore.java index 3f555c786301..c668d1ac4527 100644 --- a/components/apimgt/org.wso2.carbon.apimgt.gateway/src/main/java/org/wso2/carbon/apimgt/gateway/handlers/transaction/store/TransactionCountStore.java +++ b/components/apimgt/org.wso2.carbon.apimgt.gateway/src/main/java/org/wso2/carbon/apimgt/gateway/handlers/transaction/store/TransactionRecordStore.java @@ -1,10 +1,10 @@ package org.wso2.carbon.apimgt.gateway.handlers.transaction.store; -import org.wso2.carbon.apimgt.gateway.handlers.transaction.TransactionCountRecord; +import org.wso2.carbon.apimgt.gateway.handlers.transaction.TransactionRecord; import java.util.ArrayList; -public interface TransactionCountStore { - boolean commit(ArrayList transactionCountRecordList); +public interface TransactionRecordStore { + boolean commit(ArrayList transactionRecordList); void clenUp(); } diff --git a/components/apimgt/org.wso2.carbon.apimgt.gateway/src/main/java/org/wso2/carbon/apimgt/gateway/handlers/transaction/store/TransactionRecordStoreImpl.java b/components/apimgt/org.wso2.carbon.apimgt.gateway/src/main/java/org/wso2/carbon/apimgt/gateway/handlers/transaction/store/TransactionRecordStoreImpl.java new file mode 100644 index 000000000000..2165e292691f --- /dev/null +++ b/components/apimgt/org.wso2.carbon.apimgt.gateway/src/main/java/org/wso2/carbon/apimgt/gateway/handlers/transaction/store/TransactionRecordStoreImpl.java @@ -0,0 +1,33 @@ +package org.wso2.carbon.apimgt.gateway.handlers.transaction.store; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.wso2.carbon.apimgt.gateway.handlers.transaction.TransactionRecord; + +import java.util.ArrayList; +import java.util.concurrent.atomic.AtomicInteger; + +public class TransactionRecordStoreImpl implements TransactionRecordStore { + + private static final Log LOG = LogFactory.getLog(TransactionRecordStoreImpl.class); + private static final AtomicInteger transactionCount = new AtomicInteger(0); + + public TransactionRecordStoreImpl() { + LOG.info("Transaction store loaded"); + } + + @Override + public boolean commit(ArrayList transactionRecordList) { + LOG.info("Transaction count is commited"); + transactionRecordList.forEach(transactionRecord -> { + transactionCount.addAndGet(transactionRecord.getCount()); + }); + LOG.info("Global Transaction count: " + transactionCount.get()); + return true; + } + + @Override + public void clenUp() { + // To be implemented + } +}