diff --git a/pom.xml b/pom.xml index 9c1e641..b0b3665 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ com.moesif moesif-okhttp-interceptor - 1.1.7 + 1.2.0 jar moesif-okhttp-interceptor https://www.moesif.com diff --git a/src/main/java/com/moesif/sdk/okhttp3client/BatchEventLogger.java b/src/main/java/com/moesif/sdk/okhttp3client/BatchEventLogger.java new file mode 100644 index 0000000..0e36a98 --- /dev/null +++ b/src/main/java/com/moesif/sdk/okhttp3client/BatchEventLogger.java @@ -0,0 +1,137 @@ +package com.moesif.sdk.okhttp3client; + +import com.moesif.api.MoesifAPIClient; +import com.moesif.api.controllers.APIController; +import com.moesif.api.http.client.APICallBack; +import com.moesif.api.http.client.HttpContext; +import com.moesif.api.http.response.HttpResponse; +import com.moesif.api.models.EventModel; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +public class BatchEventLogger implements Runnable { + private static final Logger logger = LoggerFactory.getLogger(BatchEventLogger.class); + + private final BlockingQueue queue; + private final int batchSize; + private final long maxWaitTimeMillis; + private final APIController apiController; + private final AtomicBoolean running = new AtomicBoolean(true); + + public BatchEventLogger(BlockingQueue queue, int batchSize, long maxWaitTimeMillis, String applicationId) { + this.queue = queue; + this.batchSize = batchSize; + this.maxWaitTimeMillis = maxWaitTimeMillis; + MoesifAPIClient client = new MoesifAPIClient(applicationId); + this.apiController = client.getAPI(); + } + + @Override + public void run() { + try { + List batch = new ArrayList<>(); + long batchStartTime = 0; + + while (running.get() || !queue.isEmpty()) { + long timeout = maxWaitTimeMillis; + + if (!batch.isEmpty()) { + long elapsedTime = System.currentTimeMillis() - batchStartTime; + timeout = maxWaitTimeMillis - elapsedTime; + if (timeout <= 0) { + // Time limit reached, send the batch + sendBatch(new ArrayList<>(batch)); + batch.clear(); + batchStartTime = 0; + timeout = maxWaitTimeMillis; + } + } + + // Poll for the next event with the calculated timeout + EventModel event = queue.poll(timeout, TimeUnit.MILLISECONDS); + + if (event != null) { + if (batch.isEmpty()) { + // Start the batch timer + batchStartTime = System.currentTimeMillis(); + } + batch.add(event); + + if (batch.size() >= batchSize) { + logger.debug("Seding batch of {} events after reaching batch size limit", batch.size()); + // Batch size limit reached, send the batch + sendBatch(new ArrayList<>(batch)); + batch.clear(); + batchStartTime = 0; + } + } else { + // No event received within timeout + if (!batch.isEmpty()) { + logger.debug("Seding batch of {} events after max wait timeout", batch.size()); + // Send any accumulated events + sendBatch(new ArrayList<>(batch)); + batch.clear(); + batchStartTime = 0; + } + if (!running.get()) { + // Exit if the running flag is false + break; + } + } + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + logger.info("EventConsumer interrupted, shutting down"); + } catch (Exception e) { + logger.error("Error in EventConsumer", e); + } finally { + // Process any remaining events before exiting + processRemainingEvents(); + } + } + + private void sendBatch(List batch) { + if (!batch.isEmpty()) { + try { + apiController.createEventsBatchAsync(batch, new MoesifApiCallBack()); + } catch (Exception e) { + // Handle exception during sending + logger.error("Exception while sending event batch", e); + } + } + } + + private void processRemainingEvents() { + List remainingEvents = new ArrayList<>(); + queue.drainTo(remainingEvents); + if (!remainingEvents.isEmpty()) { + logger.info("Processing remaining events before shutdown"); + sendBatch(remainingEvents); + } + } + + public void shutdown() { + running.set(false); + } + + public static class MoesifApiCallBack implements APICallBack { + + public void onSuccess(HttpContext context, HttpResponse response) { + int respStatusCode = response.getStatusCode(); + if (201 != respStatusCode) + logger.debug("Received status code {}", respStatusCode); + else + logger.debug("Events submitted to Moesif"); + } + + public void onFailure(HttpContext context, Throwable error) { + logger.debug("onFailure {} {}", context.getResponse(), error.getMessage()); + } + } +} diff --git a/src/main/java/com/moesif/sdk/okhttp3client/EventModelBuffer.java b/src/main/java/com/moesif/sdk/okhttp3client/EventModelBuffer.java deleted file mode 100644 index 343aa82..0000000 --- a/src/main/java/com/moesif/sdk/okhttp3client/EventModelBuffer.java +++ /dev/null @@ -1,36 +0,0 @@ -package com.moesif.sdk.okhttp3client; - -import com.moesif.api.models.EventModel; - -import java.util.ArrayList; -import java.util.List; - -public class EventModelBuffer { - private List buffer; - private int maxSize; - - public EventModelBuffer(int maxSize) { - this.maxSize = maxSize; - init(); - } - - private void init(){ - buffer = new ArrayList(); - - } - - public boolean isFull() { - return buffer.size() >= this.maxSize; - } - - public void add(EventModel loggedEvent) { - buffer.add(loggedEvent); - } - - public List empty(){ - List b = this.buffer; - init(); - return b; - } - -} diff --git a/src/main/java/com/moesif/sdk/okhttp3client/MoesifApiLogEvent.java b/src/main/java/com/moesif/sdk/okhttp3client/MoesifApiLogEvent.java deleted file mode 100644 index 08799dd..0000000 --- a/src/main/java/com/moesif/sdk/okhttp3client/MoesifApiLogEvent.java +++ /dev/null @@ -1,48 +0,0 @@ -package com.moesif.sdk.okhttp3client; - -import com.moesif.api.MoesifAPIClient; -import com.moesif.api.controllers.APIController; -import com.moesif.api.http.client.APICallBack; -import com.moesif.api.http.client.HttpContext; -import com.moesif.api.http.response.HttpResponse; -import com.moesif.api.models.EventModel; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.List; - - -public class MoesifApiLogEvent { - - private static final Logger logger = LoggerFactory.getLogger( - MoesifApiLogEvent.class); - - public static void sendEventsAsync(String moesifApplicationId, - List loggedEvents) - throws IOException { - MoesifAPIClient client = new MoesifAPIClient(moesifApplicationId); - final APIController apiController = client.getAPI(); - MoesifApiCallBack callBack = new MoesifApiCallBack(); - apiController.createEventsBatchAsync(loggedEvents, callBack); - } - - public static class MoesifApiCallBack implements APICallBack{ - - public void onSuccess(HttpContext context, HttpResponse response) { - inspectStatusCode(context.getResponse().getStatusCode()); - } - - private static void inspectStatusCode(int respStatusCode){ - if (201 != respStatusCode) - logger.debug("Received status code " + respStatusCode); - else - logger.debug("Event submitted to Moesif"); - } - - public void onFailure(HttpContext context, Throwable error) { - logger.debug("onFailure " + context.getResponse() - + " " + error.getMessage()); - } - } -} diff --git a/src/main/java/com/moesif/sdk/okhttp3client/MoesifOkHttp3Interceptor.java b/src/main/java/com/moesif/sdk/okhttp3client/MoesifOkHttp3Interceptor.java index dcfc8a8..06c0890 100644 --- a/src/main/java/com/moesif/sdk/okhttp3client/MoesifOkHttp3Interceptor.java +++ b/src/main/java/com/moesif/sdk/okhttp3client/MoesifOkHttp3Interceptor.java @@ -1,14 +1,15 @@ package com.moesif.sdk.okhttp3client; +import com.moesif.api.models.EventModel; import com.moesif.api.models.EventRequestModel; import com.moesif.api.models.EventResponseModel; +import com.moesif.external.facebook.stetho.inspector.network.NetworkEventReporterMoesif; +import com.moesif.external.facebook.stetho.inspector.network.NetworkEventReporterMoesifImpl; import com.moesif.sdk.okhttp3client.config.MoesifApiConnConfig; import com.moesif.sdk.okhttp3client.models.OkHttp3RequestMapper; import com.moesif.sdk.okhttp3client.models.OkHttp3ResponseMapper; import com.moesif.sdk.okhttp3client.models.filter.IInterceptEventFilter; import com.moesif.sdk.okhttp3client.util.ResponseWrap; -import com.moesif.external.facebook.stetho.inspector.network.NetworkEventReporterMoesifImpl; -import com.moesif.external.facebook.stetho.inspector.network.NetworkEventReporterMoesif; import okhttp3.*; import okio.BufferedSource; import okio.Okio; @@ -20,12 +21,15 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; +import java.time.Instant; import java.util.Collection; import java.util.Date; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicInteger; -import java.time.Instant; - /** * MoesifOkHttp3Interceptor * This intrceptor can be used both as Application and Network interceptor @@ -39,12 +43,14 @@ */ public class MoesifOkHttp3Interceptor implements Interceptor { private static final Logger logger = LoggerFactory.getLogger( - MoesifOkHttp3Interceptor.class); - + MoesifOkHttp3Interceptor.class); + private static MoesifApiConnConfig connConfig; private final NetworkEventReporterMoesif mEventReporter = - NetworkEventReporterMoesifImpl.get(); + NetworkEventReporterMoesifImpl.get(); private final AtomicInteger mNextRequestId = new AtomicInteger(0); - private static MoesifApiConnConfig connConfig; + private final ExecutorService executorService = Executors.newSingleThreadExecutor(); + private BlockingQueue eventQueue; + private BatchEventLogger batchLogger; /** * Initialize the Interceptor @@ -70,7 +76,7 @@ public MoesifOkHttp3Interceptor(String moesifApplicationId) { * to collector */ public MoesifOkHttp3Interceptor(String moesifApplicationId, Integer eventsBufferSize) { - MoesifApiConnConfig c = new MoesifApiConnConfig(moesifApplicationId); + MoesifApiConnConfig c = new MoesifApiConnConfig(moesifApplicationId); c.setEventsBufferSize(eventsBufferSize); init(c); } @@ -82,10 +88,11 @@ public MoesifOkHttp3Interceptor(String moesifApplicationId, Integer eventsBuffer * to collector */ public MoesifOkHttp3Interceptor(Integer eventsBufferSize) { - MoesifApiConnConfig c = new MoesifApiConnConfig(null); + MoesifApiConnConfig c = new MoesifApiConnConfig(null); c.setEventsBufferSize(eventsBufferSize); init(c); } + /** * Initialize the Interceptor * @param connConfig MoesifApiConnConfig object @@ -96,7 +103,27 @@ public MoesifOkHttp3Interceptor(MoesifApiConnConfig connConfig) { public void init(MoesifApiConnConfig connConfig) { MoesifOkHttp3Interceptor.connConfig = (null == connConfig) - ? new MoesifApiConnConfig() : connConfig; + ? new MoesifApiConnConfig() : connConfig; + eventQueue = new LinkedBlockingQueue<>(MoesifOkHttp3Interceptor.connConfig.maxQueueSize); + batchLogger = new BatchEventLogger( + eventQueue, + MoesifOkHttp3Interceptor.connConfig.eventsBufferSize, + MoesifOkHttp3Interceptor.connConfig.eventTimeoutMillis, + MoesifOkHttp3Interceptor.connConfig.getApplicationId() + ); + executorService.submit(batchLogger); + // Add shutdown hook to clean up and gracefully exit + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + batchLogger.shutdown(); + executorService.shutdown(); + try { + if (!executorService.awaitTermination(5, java.util.concurrent.TimeUnit.SECONDS)) { + executorService.shutdownNow(); + } + } catch (InterruptedException e) { + executorService.shutdownNow(); + } + })); if (getConnConfig().isDebug()) { logger.debug("MoesifOkHttp3Interceptor initialized with config: {}", getConnConfig()); } @@ -173,9 +200,8 @@ public Response intercept(Chain chain) throws IOException { loggedResponse, outputStream, respw.isJsonHeader(), - connConfig.getApplicationId(), + eventQueue, connConfig.getMaxAllowedBodyBytesResponse(), - connConfig.getEventsBufferSize(), filter.identifyUser(request, response).orElse(null), filter.identifyCompany(request, response).orElse(null), filter.sessionToken(request, response).orElse(null), @@ -201,8 +227,7 @@ public Response intercept(Chain chain) throws IOException { } catch (Exception e) { logger.warn("Error parsing response body", e); } - } - else { + } else { logger.warn("Body is null"); } return response; diff --git a/src/main/java/com/moesif/sdk/okhttp3client/MoesifResponseHandler.java b/src/main/java/com/moesif/sdk/okhttp3client/MoesifResponseHandler.java index 02fd68f..173a52a 100644 --- a/src/main/java/com/moesif/sdk/okhttp3client/MoesifResponseHandler.java +++ b/src/main/java/com/moesif/sdk/okhttp3client/MoesifResponseHandler.java @@ -14,6 +14,7 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; import java.util.List; +import java.util.concurrent.BlockingQueue; public class MoesifResponseHandler implements ResponseHandler { @@ -23,9 +24,8 @@ public class MoesifResponseHandler implements ResponseHandler { private final EventResponseModel loggedResponse; private final ByteArrayOutputStream outputStream; private final Boolean jsonHeader; - private final String moesifApplicationId; + private final BlockingQueue eventQueue; private final Long maxAllowedBodySize; - private final EventModelBuffer buffer; private final String userId; private final String companyId; private final String sessionToken; @@ -36,9 +36,8 @@ public MoesifResponseHandler(EventRequestModel loggedRequest, EventResponseModel loggedResponse, ByteArrayOutputStream outputStream, Boolean jsonHeader, - String moesifApplicationId, + BlockingQueue eventQueue, Long maxAllowedBodyBytes, - Integer maxSendBufferSize, String userId, String companyId, String sessionToken, @@ -48,9 +47,8 @@ public MoesifResponseHandler(EventRequestModel loggedRequest, this.loggedResponse = loggedResponse; this.outputStream = outputStream; this.jsonHeader = jsonHeader; - this.moesifApplicationId = moesifApplicationId; + this.eventQueue = eventQueue; this.maxAllowedBodySize = maxAllowedBodyBytes; - this.buffer = new EventModelBuffer(maxSendBufferSize); this.userId = userId; this.companyId = companyId; this.sessionToken = sessionToken; @@ -97,12 +95,11 @@ private void sendEvent() { if( null != moesifEventFilter) { loggedEvent = moesifEventFilter.maskContent(loggedEvent); } - buffer.add(loggedEvent); - if (buffer.isFull()){ - List loggedEvents = buffer.empty(); - MoesifApiLogEvent.sendEventsAsync( - moesifApplicationId, - loggedEvents); + // Send the event to Moesif asynchronously and drop if queue is full instead of blocking + if (eventQueue.offer(loggedEvent)){ + logger.debug("Event Queued: {}", loggedEvent); + } else { + logger.warn("Event Queue is full, dropping event: {}", loggedEvent); } } catch (IllegalArgumentException e) { logger.warn("Is Moesif Application ID configured? {}", e.getMessage()); diff --git a/src/main/java/com/moesif/sdk/okhttp3client/config/MoesifApiConnConfig.java b/src/main/java/com/moesif/sdk/okhttp3client/config/MoesifApiConnConfig.java index 8efa91a..a2cddc7 100755 --- a/src/main/java/com/moesif/sdk/okhttp3client/config/MoesifApiConnConfig.java +++ b/src/main/java/com/moesif/sdk/okhttp3client/config/MoesifApiConnConfig.java @@ -13,7 +13,9 @@ public class MoesifApiConnConfig { public static String DEFAULT_BASE_URI = "https://api.moesif.net"; public String baseUri; - public Integer eventsBufferSize = 5; + public Integer eventsBufferSize = 100; + public Integer maxQueueSize = 100000; + public Integer eventTimeoutMillis = 2000; public Collection bodyContentTypesBlackList = DefaultDomainData.bodyContentTypesBlackList; private String applicationId;