Skip to content

Commit

Permalink
#188601341 v1.2.0 (#17)
Browse files Browse the repository at this point in the history
* v1.2.0

Improved concurrent batching with new configurable send-queue max size and max wait time

* Log update
  • Loading branch information
bakennedy authored Nov 23, 2024
1 parent a46a384 commit b88f9a3
Show file tree
Hide file tree
Showing 7 changed files with 190 additions and 113 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

<groupId>com.moesif</groupId>
<artifactId>moesif-okhttp-interceptor</artifactId>
<version>1.1.7</version>
<version>1.2.0</version>
<packaging>jar</packaging>
<name>moesif-okhttp-interceptor</name>
<url>https://www.moesif.com</url>
Expand Down
137 changes: 137 additions & 0 deletions src/main/java/com/moesif/sdk/okhttp3client/BatchEventLogger.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
package com.moesif.sdk.okhttp3client;

import com.moesif.api.MoesifAPIClient;
import com.moesif.api.controllers.APIController;
import com.moesif.api.http.client.APICallBack;
import com.moesif.api.http.client.HttpContext;
import com.moesif.api.http.response.HttpResponse;
import com.moesif.api.models.EventModel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

public class BatchEventLogger implements Runnable {
private static final Logger logger = LoggerFactory.getLogger(BatchEventLogger.class);

private final BlockingQueue<EventModel> queue;
private final int batchSize;
private final long maxWaitTimeMillis;
private final APIController apiController;
private final AtomicBoolean running = new AtomicBoolean(true);

public BatchEventLogger(BlockingQueue<EventModel> queue, int batchSize, long maxWaitTimeMillis, String applicationId) {
this.queue = queue;
this.batchSize = batchSize;
this.maxWaitTimeMillis = maxWaitTimeMillis;
MoesifAPIClient client = new MoesifAPIClient(applicationId);
this.apiController = client.getAPI();
}

@Override
public void run() {
try {
List<EventModel> batch = new ArrayList<>();
long batchStartTime = 0;

while (running.get() || !queue.isEmpty()) {
long timeout = maxWaitTimeMillis;

if (!batch.isEmpty()) {
long elapsedTime = System.currentTimeMillis() - batchStartTime;
timeout = maxWaitTimeMillis - elapsedTime;
if (timeout <= 0) {
// Time limit reached, send the batch
sendBatch(new ArrayList<>(batch));
batch.clear();
batchStartTime = 0;
timeout = maxWaitTimeMillis;
}
}

// Poll for the next event with the calculated timeout
EventModel event = queue.poll(timeout, TimeUnit.MILLISECONDS);

if (event != null) {
if (batch.isEmpty()) {
// Start the batch timer
batchStartTime = System.currentTimeMillis();
}
batch.add(event);

if (batch.size() >= batchSize) {
logger.debug("Seding batch of {} events after reaching batch size limit", batch.size());
// Batch size limit reached, send the batch
sendBatch(new ArrayList<>(batch));
batch.clear();
batchStartTime = 0;
}
} else {
// No event received within timeout
if (!batch.isEmpty()) {
logger.debug("Seding batch of {} events after max wait timeout", batch.size());
// Send any accumulated events
sendBatch(new ArrayList<>(batch));
batch.clear();
batchStartTime = 0;
}
if (!running.get()) {
// Exit if the running flag is false
break;
}
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.info("EventConsumer interrupted, shutting down");
} catch (Exception e) {
logger.error("Error in EventConsumer", e);
} finally {
// Process any remaining events before exiting
processRemainingEvents();
}
}

private void sendBatch(List<EventModel> batch) {
if (!batch.isEmpty()) {
try {
apiController.createEventsBatchAsync(batch, new MoesifApiCallBack());
} catch (Exception e) {
// Handle exception during sending
logger.error("Exception while sending event batch", e);
}
}
}

private void processRemainingEvents() {
List<EventModel> remainingEvents = new ArrayList<>();
queue.drainTo(remainingEvents);
if (!remainingEvents.isEmpty()) {
logger.info("Processing remaining events before shutdown");
sendBatch(remainingEvents);
}
}

public void shutdown() {
running.set(false);
}

public static class MoesifApiCallBack implements APICallBack<HttpResponse> {

public void onSuccess(HttpContext context, HttpResponse response) {
int respStatusCode = response.getStatusCode();
if (201 != respStatusCode)
logger.debug("Received status code {}", respStatusCode);
else
logger.debug("Events submitted to Moesif");
}

public void onFailure(HttpContext context, Throwable error) {
logger.debug("onFailure {} {}", context.getResponse(), error.getMessage());
}
}
}
36 changes: 0 additions & 36 deletions src/main/java/com/moesif/sdk/okhttp3client/EventModelBuffer.java

This file was deleted.

48 changes: 0 additions & 48 deletions src/main/java/com/moesif/sdk/okhttp3client/MoesifApiLogEvent.java

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
package com.moesif.sdk.okhttp3client;

import com.moesif.api.models.EventModel;
import com.moesif.api.models.EventRequestModel;
import com.moesif.api.models.EventResponseModel;
import com.moesif.external.facebook.stetho.inspector.network.NetworkEventReporterMoesif;
import com.moesif.external.facebook.stetho.inspector.network.NetworkEventReporterMoesifImpl;
import com.moesif.sdk.okhttp3client.config.MoesifApiConnConfig;
import com.moesif.sdk.okhttp3client.models.OkHttp3RequestMapper;
import com.moesif.sdk.okhttp3client.models.OkHttp3ResponseMapper;
import com.moesif.sdk.okhttp3client.models.filter.IInterceptEventFilter;
import com.moesif.sdk.okhttp3client.util.ResponseWrap;
import com.moesif.external.facebook.stetho.inspector.network.NetworkEventReporterMoesifImpl;
import com.moesif.external.facebook.stetho.inspector.network.NetworkEventReporterMoesif;
import okhttp3.*;
import okio.BufferedSource;
import okio.Okio;
Expand All @@ -20,12 +21,15 @@
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.time.Instant;
import java.util.Collection;
import java.util.Date;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;

import java.time.Instant;

/**
* MoesifOkHttp3Interceptor
* This intrceptor can be used both as Application and Network interceptor
Expand All @@ -39,12 +43,14 @@
*/
public class MoesifOkHttp3Interceptor implements Interceptor {
private static final Logger logger = LoggerFactory.getLogger(
MoesifOkHttp3Interceptor.class);

MoesifOkHttp3Interceptor.class);
private static MoesifApiConnConfig connConfig;
private final NetworkEventReporterMoesif mEventReporter =
NetworkEventReporterMoesifImpl.get();
NetworkEventReporterMoesifImpl.get();
private final AtomicInteger mNextRequestId = new AtomicInteger(0);
private static MoesifApiConnConfig connConfig;
private final ExecutorService executorService = Executors.newSingleThreadExecutor();
private BlockingQueue<EventModel> eventQueue;
private BatchEventLogger batchLogger;

/**
* Initialize the Interceptor
Expand All @@ -70,7 +76,7 @@ public MoesifOkHttp3Interceptor(String moesifApplicationId) {
* to collector
*/
public MoesifOkHttp3Interceptor(String moesifApplicationId, Integer eventsBufferSize) {
MoesifApiConnConfig c = new MoesifApiConnConfig(moesifApplicationId);
MoesifApiConnConfig c = new MoesifApiConnConfig(moesifApplicationId);
c.setEventsBufferSize(eventsBufferSize);
init(c);
}
Expand All @@ -82,10 +88,11 @@ public MoesifOkHttp3Interceptor(String moesifApplicationId, Integer eventsBuffer
* to collector
*/
public MoesifOkHttp3Interceptor(Integer eventsBufferSize) {
MoesifApiConnConfig c = new MoesifApiConnConfig(null);
MoesifApiConnConfig c = new MoesifApiConnConfig(null);
c.setEventsBufferSize(eventsBufferSize);
init(c);
}

/**
* Initialize the Interceptor
* @param connConfig MoesifApiConnConfig object
Expand All @@ -96,7 +103,27 @@ public MoesifOkHttp3Interceptor(MoesifApiConnConfig connConfig) {

public void init(MoesifApiConnConfig connConfig) {
MoesifOkHttp3Interceptor.connConfig = (null == connConfig)
? new MoesifApiConnConfig() : connConfig;
? new MoesifApiConnConfig() : connConfig;
eventQueue = new LinkedBlockingQueue<>(MoesifOkHttp3Interceptor.connConfig.maxQueueSize);
batchLogger = new BatchEventLogger(
eventQueue,
MoesifOkHttp3Interceptor.connConfig.eventsBufferSize,
MoesifOkHttp3Interceptor.connConfig.eventTimeoutMillis,
MoesifOkHttp3Interceptor.connConfig.getApplicationId()
);
executorService.submit(batchLogger);
// Add shutdown hook to clean up and gracefully exit
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
batchLogger.shutdown();
executorService.shutdown();
try {
if (!executorService.awaitTermination(5, java.util.concurrent.TimeUnit.SECONDS)) {
executorService.shutdownNow();
}
} catch (InterruptedException e) {
executorService.shutdownNow();
}
}));
if (getConnConfig().isDebug()) {
logger.debug("MoesifOkHttp3Interceptor initialized with config: {}", getConnConfig());
}
Expand Down Expand Up @@ -173,9 +200,8 @@ public Response intercept(Chain chain) throws IOException {
loggedResponse,
outputStream,
respw.isJsonHeader(),
connConfig.getApplicationId(),
eventQueue,
connConfig.getMaxAllowedBodyBytesResponse(),
connConfig.getEventsBufferSize(),
filter.identifyUser(request, response).orElse(null),
filter.identifyCompany(request, response).orElse(null),
filter.sessionToken(request, response).orElse(null),
Expand All @@ -201,8 +227,7 @@ public Response intercept(Chain chain) throws IOException {
} catch (Exception e) {
logger.warn("Error parsing response body", e);
}
}
else {
} else {
logger.warn("Body is null");
}
return response;
Expand Down
Loading

0 comments on commit b88f9a3

Please sign in to comment.