Skip to content

Commit

Permalink
Refactor transactional counting to a producer consumer pattern
Browse files Browse the repository at this point in the history
  • Loading branch information
IsuruMaduranga committed Jul 26, 2023
1 parent 56140ea commit 1d16d38
Show file tree
Hide file tree
Showing 10 changed files with 335 additions and 111 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package org.wso2.carbon.apimgt.gateway.handlers.transaction;

import org.wso2.carbon.apimgt.impl.APIManagerConfiguration;
import org.wso2.carbon.apimgt.impl.internal.ServiceReferenceHolder;

public class TransactionCountConfig {

private static APIManagerConfiguration apiManagerConfiguration;
public TransactionCountConfig() {
apiManagerConfiguration = ServiceReferenceHolder.getInstance().getAPIManagerConfigurationService()
.getAPIManagerConfiguration();
}

Check warning on line 12 in components/apimgt/org.wso2.carbon.apimgt.gateway/src/main/java/org/wso2/carbon/apimgt/gateway/handlers/transaction/TransactionCountConfig.java

View check run for this annotation

Codecov / codecov/patch

components/apimgt/org.wso2.carbon.apimgt.gateway/src/main/java/org/wso2/carbon/apimgt/gateway/handlers/transaction/TransactionCountConfig.java#L9-L12

Added lines #L9 - L12 were not covered by tests


}
Original file line number Diff line number Diff line change
Expand Up @@ -6,52 +6,46 @@
import org.apache.synapse.MessageContext;
import org.apache.synapse.core.axis2.Axis2MessageContext;
import org.wso2.carbon.apimgt.gateway.APIMgtGatewayConstants;
import org.wso2.carbon.apimgt.gateway.handlers.transaction.store.TransactionCountStore;
import org.wso2.carbon.apimgt.gateway.handlers.transaction.queue.TransactionRecordQueue;
import org.wso2.carbon.apimgt.gateway.handlers.transaction.store.TransactionRecordStore;

import java.lang.reflect.Constructor;
import java.util.ArrayList;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;

public class TransactionCountHandler extends AbstractExtendedSynapseHandler {

// Todo: Make these parameters configurable via deployment.toml
private static final double MAX_TRANSACTION_COUNT = Integer.MAX_VALUE * 0.9;
private int MAX_RETRY_COUNT = 3;
private int TRANSACTION_COUNT_COMMIT_INTERVAL = 10;
private int THREAD_POOL_SIZE = 5;
private int TRANSACTION_COUNT_QUEUE_SIZE = 1000;
private String TRANSACTION_COUNT_STORE_CLASS = "org.wso2.carbon.apimgt.gateway.handlers.transaction.store.TransactionCountStoreImpl";
private final int PRODUCER_THREAD_POOL_SIZE = 5;
private final int CONSUMER_THREAD_POOL_SIZE = 5;
private final int TRANSACTION_RECORD_QUEUE_SIZE = 1000;
private final String TRANSACTION_COUNT_STORE_CLASS = "org.wso2.carbon.apimgt.gateway.handlers.transaction.store.TransactionRecordStoreImpl";

Check warning on line 21 in components/apimgt/org.wso2.carbon.apimgt.gateway/src/main/java/org/wso2/carbon/apimgt/gateway/handlers/transaction/TransactionCountHandler.java

View check run for this annotation

Codecov / codecov/patch

components/apimgt/org.wso2.carbon.apimgt.gateway/src/main/java/org/wso2/carbon/apimgt/gateway/handlers/transaction/TransactionCountHandler.java#L18-L21

Added lines #L18 - L21 were not covered by tests

private static final Log LOG = LogFactory.getLog(TransactionCountHandler.class);

Check warning on line 23 in components/apimgt/org.wso2.carbon.apimgt.gateway/src/main/java/org/wso2/carbon/apimgt/gateway/handlers/transaction/TransactionCountHandler.java

View check run for this annotation

Codecov / codecov/patch

components/apimgt/org.wso2.carbon.apimgt.gateway/src/main/java/org/wso2/carbon/apimgt/gateway/handlers/transaction/TransactionCountHandler.java#L23

Added line #L23 was not covered by tests
private static ReentrantLock lock = new ReentrantLock();
private static AtomicInteger transactionCount = new AtomicInteger(0);

private BlockingQueue<TransactionCountRecord> transactionCountRecordQueue;
private ExecutorService transactionCountExecutor;
private ScheduledExecutorService transactionCountScheduledExecutor;
private TransactionCountStore trasactionCountStore;
private TransactionRecordQueue transactionRecordQueue;
private TransactionRecordProducer transactionRecordProducer;
private TransactionRecordConsumer transactionRecordConsumer;
private TransactionRecordStore trasactionCountStore;


public TransactionCountHandler() {

Check warning on line 30 in components/apimgt/org.wso2.carbon.apimgt.gateway/src/main/java/org/wso2/carbon/apimgt/gateway/handlers/transaction/TransactionCountHandler.java

View check run for this annotation

Codecov / codecov/patch

components/apimgt/org.wso2.carbon.apimgt.gateway/src/main/java/org/wso2/carbon/apimgt/gateway/handlers/transaction/TransactionCountHandler.java#L30

Added line #L30 was not covered by tests

this.transactionCountRecordQueue = new LinkedBlockingDeque<>(TRANSACTION_COUNT_QUEUE_SIZE);
this.transactionCountExecutor = Executors.newFixedThreadPool(THREAD_POOL_SIZE);
this.transactionCountScheduledExecutor = Executors.newSingleThreadScheduledExecutor();

this.transactionRecordQueue = TransactionRecordQueue.getInstance(TRANSACTION_RECORD_QUEUE_SIZE);

Check warning on line 32 in components/apimgt/org.wso2.carbon.apimgt.gateway/src/main/java/org/wso2/carbon/apimgt/gateway/handlers/transaction/TransactionCountHandler.java

View check run for this annotation

Codecov / codecov/patch

components/apimgt/org.wso2.carbon.apimgt.gateway/src/main/java/org/wso2/carbon/apimgt/gateway/handlers/transaction/TransactionCountHandler.java#L32

Added line #L32 was not covered by tests
// Load the transaction count store
try {
Class<?> clazz = Class.forName(TRANSACTION_COUNT_STORE_CLASS);
Constructor<?> constructor = clazz.getConstructor();
this.trasactionCountStore = (TransactionCountStore) constructor.newInstance();
this.trasactionCountStore = (TransactionRecordStore) constructor.newInstance();
} catch (Exception e) {
LOG.error("Error while loading the transaction count store.", e);
}

Check warning on line 40 in components/apimgt/org.wso2.carbon.apimgt.gateway/src/main/java/org/wso2/carbon/apimgt/gateway/handlers/transaction/TransactionCountHandler.java

View check run for this annotation

Codecov / codecov/patch

components/apimgt/org.wso2.carbon.apimgt.gateway/src/main/java/org/wso2/carbon/apimgt/gateway/handlers/transaction/TransactionCountHandler.java#L35-L40

Added lines #L35 - L40 were not covered by tests

// Start the transaction count record scheduler
transactionCountScheduledExecutor.scheduleAtFixedRate(this::handleScheduledTransactionCountCommit,
0, TRANSACTION_COUNT_COMMIT_INTERVAL, TimeUnit.SECONDS);
this.transactionRecordProducer = TransactionRecordProducer.getInstance(transactionRecordQueue,

Check warning on line 42 in components/apimgt/org.wso2.carbon.apimgt.gateway/src/main/java/org/wso2/carbon/apimgt/gateway/handlers/transaction/TransactionCountHandler.java

View check run for this annotation

Codecov / codecov/patch

components/apimgt/org.wso2.carbon.apimgt.gateway/src/main/java/org/wso2/carbon/apimgt/gateway/handlers/transaction/TransactionCountHandler.java#L42

Added line #L42 was not covered by tests
PRODUCER_THREAD_POOL_SIZE);
this.transactionRecordConsumer = TransactionRecordConsumer.getInstance(transactionRecordQueue,

Check warning on line 44 in components/apimgt/org.wso2.carbon.apimgt.gateway/src/main/java/org/wso2/carbon/apimgt/gateway/handlers/transaction/TransactionCountHandler.java

View check run for this annotation

Codecov / codecov/patch

components/apimgt/org.wso2.carbon.apimgt.gateway/src/main/java/org/wso2/carbon/apimgt/gateway/handlers/transaction/TransactionCountHandler.java#L44

Added line #L44 was not covered by tests
trasactionCountStore, CONSUMER_THREAD_POOL_SIZE);

this.transactionRecordProducer.start();
this.transactionRecordConsumer.start();
}

Check warning on line 49 in components/apimgt/org.wso2.carbon.apimgt.gateway/src/main/java/org/wso2/carbon/apimgt/gateway/handlers/transaction/TransactionCountHandler.java

View check run for this annotation

Codecov / codecov/patch

components/apimgt/org.wso2.carbon.apimgt.gateway/src/main/java/org/wso2/carbon/apimgt/gateway/handlers/transaction/TransactionCountHandler.java#L47-L49

Added lines #L47 - L49 were not covered by tests

@Override
Expand All @@ -67,7 +61,7 @@ public boolean handleRequestInFlow(MessageContext messageContext) {
String transport = axis2MessageContext.getIncomingTransportName();

Check warning on line 61 in components/apimgt/org.wso2.carbon.apimgt.gateway/src/main/java/org/wso2/carbon/apimgt/gateway/handlers/transaction/TransactionCountHandler.java

View check run for this annotation

Codecov / codecov/patch

components/apimgt/org.wso2.carbon.apimgt.gateway/src/main/java/org/wso2/carbon/apimgt/gateway/handlers/transaction/TransactionCountHandler.java#L61

Added line #L61 was not covered by tests
if (transport.equals(APIMgtGatewayConstants.TRANSPORT_WS) || transport.equals(APIMgtGatewayConstants.TRANSPORT_WSS)){
LOG.info("Counting WebSocket message");
transactionCountExecutor.execute(this::handleTransactionCountCommit);
this.transactionRecordProducer.addTransaction();

Check warning on line 64 in components/apimgt/org.wso2.carbon.apimgt.gateway/src/main/java/org/wso2/carbon/apimgt/gateway/handlers/transaction/TransactionCountHandler.java

View check run for this annotation

Codecov / codecov/patch

components/apimgt/org.wso2.carbon.apimgt.gateway/src/main/java/org/wso2/carbon/apimgt/gateway/handlers/transaction/TransactionCountHandler.java#L63-L64

Added lines #L63 - L64 were not covered by tests
}
} catch (RejectedExecutionException e) {
LOG.error("Transaction could not be counted.", e);
Expand All @@ -86,7 +80,7 @@ public boolean handleRequestOutFlow(MessageContext messageContext) {
// Counting outgoing messages that are not related to any request-response pair
if (isThereAnAssociatedIncomingRequest == null) {
LOG.info("Counting async outgoing message");
transactionCountExecutor.execute(this::handleTransactionCountCommit);
this.transactionRecordProducer.addTransaction();

Check warning on line 83 in components/apimgt/org.wso2.carbon.apimgt.gateway/src/main/java/org/wso2/carbon/apimgt/gateway/handlers/transaction/TransactionCountHandler.java

View check run for this annotation

Codecov / codecov/patch

components/apimgt/org.wso2.carbon.apimgt.gateway/src/main/java/org/wso2/carbon/apimgt/gateway/handlers/transaction/TransactionCountHandler.java#L82-L83

Added lines #L82 - L83 were not covered by tests
}
} catch (RejectedExecutionException e) {
LOG.error("Transaction could not be counted.", e);
Expand All @@ -109,67 +103,11 @@ public boolean handleResponseOutFlow(MessageContext messageContext) {
// Counting request-response pairs
if (isThereAnAssociatedIncomingRequest instanceof Boolean) {
LOG.info("Counting request-response pair");
transactionCountExecutor.execute(this::handleTransactionCountCommit);
this.transactionRecordProducer.addTransaction();

Check warning on line 106 in components/apimgt/org.wso2.carbon.apimgt.gateway/src/main/java/org/wso2/carbon/apimgt/gateway/handlers/transaction/TransactionCountHandler.java

View check run for this annotation

Codecov / codecov/patch

components/apimgt/org.wso2.carbon.apimgt.gateway/src/main/java/org/wso2/carbon/apimgt/gateway/handlers/transaction/TransactionCountHandler.java#L105-L106

Added lines #L105 - L106 were not covered by tests
}
return true;

Check warning on line 108 in components/apimgt/org.wso2.carbon.apimgt.gateway/src/main/java/org/wso2/carbon/apimgt/gateway/handlers/transaction/TransactionCountHandler.java

View check run for this annotation

Codecov / codecov/patch

components/apimgt/org.wso2.carbon.apimgt.gateway/src/main/java/org/wso2/carbon/apimgt/gateway/handlers/transaction/TransactionCountHandler.java#L108

Added line #L108 was not covered by tests
}

private void handleTransactionCountCommit() {
lock.lock();
try {
if (transactionCount.incrementAndGet() >= MAX_TRANSACTION_COUNT) {
TransactionCountRecord transactionCountRecord = new TransactionCountRecord(transactionCount.get());
transactionCountRecordQueue.add(transactionCountRecord);
transactionCount.set(0);
}
} catch (Exception e) {
LOG.error("Error while handling transaction count.", e);
} finally {
lock.unlock();
}
this.commitWithRetries();
}

private void handleScheduledTransactionCountCommit() {
lock.lock();
try {
int transactionCountValue = transactionCount.get();
if (transactionCountValue != 0) {
TransactionCountRecord transactionCountRecord = new TransactionCountRecord(transactionCountValue);
transactionCountRecordQueue.add(transactionCountRecord);
transactionCount.set(0);
}
} catch (Exception e) {
LOG.error("Error while handling transaction count.", e);
} finally {
lock.unlock();
}
this.commitWithRetries();
}

private void commitWithRetries() {
LOG.info("Local transaction count: " + transactionCount.get());
// Arraylist of transaction count records will be committed to the store
ArrayList<TransactionCountRecord> transactionCountRecordList = new ArrayList<>();
transactionCountRecordQueue.drainTo(transactionCountRecordList);

if (transactionCountRecordList.isEmpty()) {
return;
}

// Committing the transaction count records to the store with retries
// If failed to commit after MAX_RETRY_COUNT, the transaction count records will be added to the queue again
boolean commited = false;
int retryCount = 0;
while (!commited && retryCount < MAX_RETRY_COUNT) {
commited = trasactionCountStore.commit(transactionCountRecordList);
retryCount++;
}
if (!commited) {
transactionCountRecordQueue.addAll(transactionCountRecordList);
}
}

@Override
public boolean handleServerInit() {
// Nothing to implement
Expand All @@ -179,8 +117,9 @@ public boolean handleServerInit() {
@Override
public boolean handleServerShutDown() {
// Clen up resources
transactionCountExecutor.shutdown();
transactionCountScheduledExecutor.shutdown();
transactionRecordProducer.shutdown();
transactionRecordConsumer.shutdown();
transactionRecordQueue.clenUp();
trasactionCountStore.clenUp();
return true;

Check warning on line 124 in components/apimgt/org.wso2.carbon.apimgt.gateway/src/main/java/org/wso2/carbon/apimgt/gateway/handlers/transaction/TransactionCountHandler.java

View check run for this annotation

Codecov / codecov/patch

components/apimgt/org.wso2.carbon.apimgt.gateway/src/main/java/org/wso2/carbon/apimgt/gateway/handlers/transaction/TransactionCountHandler.java#L120-L124

Added lines #L120 - L124 were not covered by tests
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package org.wso2.carbon.apimgt.gateway.handlers.transaction;

import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;

public class TransactionCounter {

private static TransactionCounter instance = null;
private static AtomicInteger transactionCount = new AtomicInteger(0);
private static ReentrantLock lock = new ReentrantLock();

Check warning on line 10 in components/apimgt/org.wso2.carbon.apimgt.gateway/src/main/java/org/wso2/carbon/apimgt/gateway/handlers/transaction/TransactionCounter.java

View check run for this annotation

Codecov / codecov/patch

components/apimgt/org.wso2.carbon.apimgt.gateway/src/main/java/org/wso2/carbon/apimgt/gateway/handlers/transaction/TransactionCounter.java#L8-L10

Added lines #L8 - L10 were not covered by tests
private static int MAX_TRANSACTION_COUNT;
private TransactionCounter(int maxTransactionCount) {
this.MAX_TRANSACTION_COUNT = maxTransactionCount;
}

Check warning on line 14 in components/apimgt/org.wso2.carbon.apimgt.gateway/src/main/java/org/wso2/carbon/apimgt/gateway/handlers/transaction/TransactionCounter.java

View check run for this annotation

Codecov / codecov/patch

components/apimgt/org.wso2.carbon.apimgt.gateway/src/main/java/org/wso2/carbon/apimgt/gateway/handlers/transaction/TransactionCounter.java#L12-L14

Added lines #L12 - L14 were not covered by tests

public static TransactionCounter getInstance(int maxTransactionCount) {
if(instance == null) {
instance = new TransactionCounter(maxTransactionCount);

Check warning on line 18 in components/apimgt/org.wso2.carbon.apimgt.gateway/src/main/java/org/wso2/carbon/apimgt/gateway/handlers/transaction/TransactionCounter.java

View check run for this annotation

Codecov / codecov/patch

components/apimgt/org.wso2.carbon.apimgt.gateway/src/main/java/org/wso2/carbon/apimgt/gateway/handlers/transaction/TransactionCounter.java#L18

Added line #L18 was not covered by tests
}
return instance;

Check warning on line 20 in components/apimgt/org.wso2.carbon.apimgt.gateway/src/main/java/org/wso2/carbon/apimgt/gateway/handlers/transaction/TransactionCounter.java

View check run for this annotation

Codecov / codecov/patch

components/apimgt/org.wso2.carbon.apimgt.gateway/src/main/java/org/wso2/carbon/apimgt/gateway/handlers/transaction/TransactionCounter.java#L20

Added line #L20 was not covered by tests
}
public static void increment() {
lock.lock();

Check warning on line 23 in components/apimgt/org.wso2.carbon.apimgt.gateway/src/main/java/org/wso2/carbon/apimgt/gateway/handlers/transaction/TransactionCounter.java

View check run for this annotation

Codecov / codecov/patch

components/apimgt/org.wso2.carbon.apimgt.gateway/src/main/java/org/wso2/carbon/apimgt/gateway/handlers/transaction/TransactionCounter.java#L23

Added line #L23 was not covered by tests
try {
transactionCount.incrementAndGet();

Check warning on line 25 in components/apimgt/org.wso2.carbon.apimgt.gateway/src/main/java/org/wso2/carbon/apimgt/gateway/handlers/transaction/TransactionCounter.java

View check run for this annotation

Codecov / codecov/patch

components/apimgt/org.wso2.carbon.apimgt.gateway/src/main/java/org/wso2/carbon/apimgt/gateway/handlers/transaction/TransactionCounter.java#L25

Added line #L25 was not covered by tests
} finally {
lock.unlock();

Check warning on line 27 in components/apimgt/org.wso2.carbon.apimgt.gateway/src/main/java/org/wso2/carbon/apimgt/gateway/handlers/transaction/TransactionCounter.java

View check run for this annotation

Codecov / codecov/patch

components/apimgt/org.wso2.carbon.apimgt.gateway/src/main/java/org/wso2/carbon/apimgt/gateway/handlers/transaction/TransactionCounter.java#L27

Added line #L27 was not covered by tests
}
}

Check warning on line 29 in components/apimgt/org.wso2.carbon.apimgt.gateway/src/main/java/org/wso2/carbon/apimgt/gateway/handlers/transaction/TransactionCounter.java

View check run for this annotation

Codecov / codecov/patch

components/apimgt/org.wso2.carbon.apimgt.gateway/src/main/java/org/wso2/carbon/apimgt/gateway/handlers/transaction/TransactionCounter.java#L29

Added line #L29 was not covered by tests

}
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@
import java.time.Instant;
import java.util.UUID;

public class TransactionCountRecord {
public class TransactionRecord {

private final String id = UUID.randomUUID().toString();
private static final String id = UUID.randomUUID().toString();

Check warning on line 8 in components/apimgt/org.wso2.carbon.apimgt.gateway/src/main/java/org/wso2/carbon/apimgt/gateway/handlers/transaction/TransactionRecord.java

View check run for this annotation

Codecov / codecov/patch

components/apimgt/org.wso2.carbon.apimgt.gateway/src/main/java/org/wso2/carbon/apimgt/gateway/handlers/transaction/TransactionRecord.java#L8

Added line #L8 was not covered by tests
private Integer count;
private final String recordedTime;

public TransactionCountRecord(Integer count) {
public TransactionRecord(Integer count) {
this.count = count;
this.recordedTime = Instant.now().toString();
}

Check warning on line 15 in components/apimgt/org.wso2.carbon.apimgt.gateway/src/main/java/org/wso2/carbon/apimgt/gateway/handlers/transaction/TransactionRecord.java

View check run for this annotation

Codecov / codecov/patch

components/apimgt/org.wso2.carbon.apimgt.gateway/src/main/java/org/wso2/carbon/apimgt/gateway/handlers/transaction/TransactionRecord.java#L12-L15

Added lines #L12 - L15 were not covered by tests
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package org.wso2.carbon.apimgt.gateway.handlers.transaction;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.wso2.carbon.apimgt.gateway.handlers.transaction.queue.TransactionRecordQueue;
import org.wso2.carbon.apimgt.gateway.handlers.transaction.store.TransactionRecordStore;

import java.util.ArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class TransactionRecordConsumer {

// Todo: Make these parameters configurable via deployment.toml
private int MAX_RETRY_COUNT = 3;
private int TRANSACTION_COUNT_COMMIT_INTERVAL = 10;
private int MAX_TRANSACTION_RECORDS_PER_COMMIT = 10;

Check warning on line 17 in components/apimgt/org.wso2.carbon.apimgt.gateway/src/main/java/org/wso2/carbon/apimgt/gateway/handlers/transaction/TransactionRecordConsumer.java

View check run for this annotation

Codecov / codecov/patch

components/apimgt/org.wso2.carbon.apimgt.gateway/src/main/java/org/wso2/carbon/apimgt/gateway/handlers/transaction/TransactionRecordConsumer.java#L15-L17

Added lines #L15 - L17 were not covered by tests

private static final Log LOG = LogFactory.getLog(TransactionRecordConsumer.class);
private static TransactionRecordConsumer instance = null;

Check warning on line 20 in components/apimgt/org.wso2.carbon.apimgt.gateway/src/main/java/org/wso2/carbon/apimgt/gateway/handlers/transaction/TransactionRecordConsumer.java

View check run for this annotation

Codecov / codecov/patch

components/apimgt/org.wso2.carbon.apimgt.gateway/src/main/java/org/wso2/carbon/apimgt/gateway/handlers/transaction/TransactionRecordConsumer.java#L19-L20

Added lines #L19 - L20 were not covered by tests
private TransactionRecordStore transactionRecordStore;
private TransactionRecordQueue transactionRecordQueue;
private ExecutorService executorService;
private final int threadPoolSize;

private TransactionRecordConsumer(TransactionRecordStore transactionRecordStore,
TransactionRecordQueue transactionRecordQueue, int threadPoolSize) {
this.transactionRecordStore = transactionRecordStore;
this.transactionRecordQueue = transactionRecordQueue;
this.executorService = Executors.newFixedThreadPool(threadPoolSize);
this.threadPoolSize = threadPoolSize;
}

Check warning on line 32 in components/apimgt/org.wso2.carbon.apimgt.gateway/src/main/java/org/wso2/carbon/apimgt/gateway/handlers/transaction/TransactionRecordConsumer.java

View check run for this annotation

Codecov / codecov/patch

components/apimgt/org.wso2.carbon.apimgt.gateway/src/main/java/org/wso2/carbon/apimgt/gateway/handlers/transaction/TransactionRecordConsumer.java#L27-L32

Added lines #L27 - L32 were not covered by tests

public static TransactionRecordConsumer getInstance(TransactionRecordQueue transactionRecordQueue,
TransactionRecordStore transactionRecordStore,
int threadPoolSize) {
if(instance == null) {
instance = new TransactionRecordConsumer(transactionRecordStore, transactionRecordQueue, threadPoolSize);

Check warning on line 38 in components/apimgt/org.wso2.carbon.apimgt.gateway/src/main/java/org/wso2/carbon/apimgt/gateway/handlers/transaction/TransactionRecordConsumer.java

View check run for this annotation

Codecov / codecov/patch

components/apimgt/org.wso2.carbon.apimgt.gateway/src/main/java/org/wso2/carbon/apimgt/gateway/handlers/transaction/TransactionRecordConsumer.java#L38

Added line #L38 was not covered by tests
}
return instance;

Check warning on line 40 in components/apimgt/org.wso2.carbon.apimgt.gateway/src/main/java/org/wso2/carbon/apimgt/gateway/handlers/transaction/TransactionRecordConsumer.java

View check run for this annotation

Codecov / codecov/patch

components/apimgt/org.wso2.carbon.apimgt.gateway/src/main/java/org/wso2/carbon/apimgt/gateway/handlers/transaction/TransactionRecordConsumer.java#L40

Added line #L40 was not covered by tests
}

public void start() {
LOG.info("Transaction record consumer started");

Check warning on line 44 in components/apimgt/org.wso2.carbon.apimgt.gateway/src/main/java/org/wso2/carbon/apimgt/gateway/handlers/transaction/TransactionRecordConsumer.java

View check run for this annotation

Codecov / codecov/patch

components/apimgt/org.wso2.carbon.apimgt.gateway/src/main/java/org/wso2/carbon/apimgt/gateway/handlers/transaction/TransactionRecordConsumer.java#L44

Added line #L44 was not covered by tests
// execute the startCommitting method in all the threads
for (int i = 0; i < threadPoolSize; i++) {
executorService.execute(this::startCommitting);

Check warning on line 47 in components/apimgt/org.wso2.carbon.apimgt.gateway/src/main/java/org/wso2/carbon/apimgt/gateway/handlers/transaction/TransactionRecordConsumer.java

View check run for this annotation

Codecov / codecov/patch

components/apimgt/org.wso2.carbon.apimgt.gateway/src/main/java/org/wso2/carbon/apimgt/gateway/handlers/transaction/TransactionRecordConsumer.java#L47

Added line #L47 was not covered by tests
}
}

Check warning on line 49 in components/apimgt/org.wso2.carbon.apimgt.gateway/src/main/java/org/wso2/carbon/apimgt/gateway/handlers/transaction/TransactionRecordConsumer.java

View check run for this annotation

Codecov / codecov/patch

components/apimgt/org.wso2.carbon.apimgt.gateway/src/main/java/org/wso2/carbon/apimgt/gateway/handlers/transaction/TransactionRecordConsumer.java#L49

Added line #L49 was not covered by tests

private void startCommitting() {
try {
while (true) {
commitWithRetries();

Check warning on line 54 in components/apimgt/org.wso2.carbon.apimgt.gateway/src/main/java/org/wso2/carbon/apimgt/gateway/handlers/transaction/TransactionRecordConsumer.java

View check run for this annotation

Codecov / codecov/patch

components/apimgt/org.wso2.carbon.apimgt.gateway/src/main/java/org/wso2/carbon/apimgt/gateway/handlers/transaction/TransactionRecordConsumer.java#L54

Added line #L54 was not covered by tests
}
} catch (InterruptedException ex) {
LOG.debug("Transaction record consumer interrupted");

Check warning on line 57 in components/apimgt/org.wso2.carbon.apimgt.gateway/src/main/java/org/wso2/carbon/apimgt/gateway/handlers/transaction/TransactionRecordConsumer.java

View check run for this annotation

Codecov / codecov/patch

components/apimgt/org.wso2.carbon.apimgt.gateway/src/main/java/org/wso2/carbon/apimgt/gateway/handlers/transaction/TransactionRecordConsumer.java#L56-L57

Added lines #L56 - L57 were not covered by tests
}
}

Check warning on line 59 in components/apimgt/org.wso2.carbon.apimgt.gateway/src/main/java/org/wso2/carbon/apimgt/gateway/handlers/transaction/TransactionRecordConsumer.java

View check run for this annotation

Codecov / codecov/patch

components/apimgt/org.wso2.carbon.apimgt.gateway/src/main/java/org/wso2/carbon/apimgt/gateway/handlers/transaction/TransactionRecordConsumer.java#L59

Added line #L59 was not covered by tests

private void commitWithRetries() throws InterruptedException {

// Arraylist of transaction count records will be committed to the store
ArrayList<TransactionRecord> transactionRecordList = new ArrayList<>();
TransactionRecord transactionRecord = null;
transactionRecord = transactionRecordQueue.take();

Check warning on line 66 in components/apimgt/org.wso2.carbon.apimgt.gateway/src/main/java/org/wso2/carbon/apimgt/gateway/handlers/transaction/TransactionRecordConsumer.java

View check run for this annotation

Codecov / codecov/patch

components/apimgt/org.wso2.carbon.apimgt.gateway/src/main/java/org/wso2/carbon/apimgt/gateway/handlers/transaction/TransactionRecordConsumer.java#L64-L66

Added lines #L64 - L66 were not covered by tests

transactionRecordList.add(transactionRecord);
transactionRecordQueue.drain(transactionRecordList, MAX_TRANSACTION_RECORDS_PER_COMMIT);

Check warning on line 69 in components/apimgt/org.wso2.carbon.apimgt.gateway/src/main/java/org/wso2/carbon/apimgt/gateway/handlers/transaction/TransactionRecordConsumer.java

View check run for this annotation

Codecov / codecov/patch

components/apimgt/org.wso2.carbon.apimgt.gateway/src/main/java/org/wso2/carbon/apimgt/gateway/handlers/transaction/TransactionRecordConsumer.java#L68-L69

Added lines #L68 - L69 were not covered by tests

// Committing the transaction count records to the store with retries
// If failed to commit after MAX_RETRY_COUNT, the transaction count records will be added to the queue again
boolean commited = false;
int retryCount = 0;

Check warning on line 74 in components/apimgt/org.wso2.carbon.apimgt.gateway/src/main/java/org/wso2/carbon/apimgt/gateway/handlers/transaction/TransactionRecordConsumer.java

View check run for this annotation

Codecov / codecov/patch

components/apimgt/org.wso2.carbon.apimgt.gateway/src/main/java/org/wso2/carbon/apimgt/gateway/handlers/transaction/TransactionRecordConsumer.java#L73-L74

Added lines #L73 - L74 were not covered by tests
while (!commited && retryCount < MAX_RETRY_COUNT) {
commited = this.transactionRecordStore.commit(transactionRecordList);
retryCount++;

Check warning on line 77 in components/apimgt/org.wso2.carbon.apimgt.gateway/src/main/java/org/wso2/carbon/apimgt/gateway/handlers/transaction/TransactionRecordConsumer.java

View check run for this annotation

Codecov / codecov/patch

components/apimgt/org.wso2.carbon.apimgt.gateway/src/main/java/org/wso2/carbon/apimgt/gateway/handlers/transaction/TransactionRecordConsumer.java#L76-L77

Added lines #L76 - L77 were not covered by tests
}
if (!commited) {
transactionRecordQueue.addAll(transactionRecordList);

Check warning on line 80 in components/apimgt/org.wso2.carbon.apimgt.gateway/src/main/java/org/wso2/carbon/apimgt/gateway/handlers/transaction/TransactionRecordConsumer.java

View check run for this annotation

Codecov / codecov/patch

components/apimgt/org.wso2.carbon.apimgt.gateway/src/main/java/org/wso2/carbon/apimgt/gateway/handlers/transaction/TransactionRecordConsumer.java#L80

Added line #L80 was not covered by tests
}
}

Check warning on line 82 in components/apimgt/org.wso2.carbon.apimgt.gateway/src/main/java/org/wso2/carbon/apimgt/gateway/handlers/transaction/TransactionRecordConsumer.java

View check run for this annotation

Codecov / codecov/patch

components/apimgt/org.wso2.carbon.apimgt.gateway/src/main/java/org/wso2/carbon/apimgt/gateway/handlers/transaction/TransactionRecordConsumer.java#L82

Added line #L82 was not covered by tests

public void shutdown() {
this.executorService.shutdownNow();
}

Check warning on line 86 in components/apimgt/org.wso2.carbon.apimgt.gateway/src/main/java/org/wso2/carbon/apimgt/gateway/handlers/transaction/TransactionRecordConsumer.java

View check run for this annotation

Codecov / codecov/patch

components/apimgt/org.wso2.carbon.apimgt.gateway/src/main/java/org/wso2/carbon/apimgt/gateway/handlers/transaction/TransactionRecordConsumer.java#L85-L86

Added lines #L85 - L86 were not covered by tests

}
Loading

0 comments on commit 1d16d38

Please sign in to comment.