From 4718691dd0666d842decd957f150107ba69de49c Mon Sep 17 00:00:00 2001 From: calvin681 Date: Mon, 10 May 2021 10:16:28 -0700 Subject: [PATCH] Feeds backpressure back to drop operator on SSE connection (#103) * Feeds backpressure back to drop operator on SSE connection * Add test for serial consumption * clean up test Co-authored-by: Calvin Cheung --- .../mantis/operators/DropOperator.java | 3 +- .../worker/client/SseWorkerConnection.java | 6 +- .../client/SseWorkerConnectionTest.java | 129 ++++++++++++++++++ 3 files changed, 133 insertions(+), 5 deletions(-) create mode 100644 mantis-server/mantis-server-worker-client/src/test/java/io/mantisrx/server/worker/client/SseWorkerConnectionTest.java diff --git a/mantis-common/src/main/java/io/reactivx/mantis/operators/DropOperator.java b/mantis-common/src/main/java/io/reactivx/mantis/operators/DropOperator.java index c30a8ce2d..911917641 100644 --- a/mantis-common/src/main/java/io/reactivx/mantis/operators/DropOperator.java +++ b/mantis-common/src/main/java/io/reactivx/mantis/operators/DropOperator.java @@ -113,10 +113,9 @@ public void onError(Throwable e) { public void onNext(T t) { if (requested.get() > 0) { + requested.decrementAndGet(); o.onNext(t); next.increment(); - requested.decrementAndGet(); - } else { dropped.increment(); diff --git a/mantis-server/mantis-server-worker-client/src/main/java/io/mantisrx/server/worker/client/SseWorkerConnection.java b/mantis-server/mantis-server-worker-client/src/main/java/io/mantisrx/server/worker/client/SseWorkerConnection.java index 67a4742ad..9dbd8a31e 100644 --- a/mantis-server/mantis-server-worker-client/src/main/java/io/mantisrx/server/worker/client/SseWorkerConnection.java +++ b/mantis-server/mantis-server-worker-client/src/main/java/io/mantisrx/server/worker/client/SseWorkerConnection.java @@ -252,7 +252,7 @@ private void resetConnected() { } } - private Observable streamContent(HttpClientResponse response, + protected Observable streamContent(HttpClientResponse response, final Action1 updateDataRecvngStatus, final long dataRecvTimeoutSecs, String delimiter) { long interval = Math.max(1, dataRecvTimeoutSecs / 2); @@ -297,7 +297,7 @@ private Observable streamContent(HttpClientResponse { if (data.startsWith("ping")) { pingCounter.increment(); @@ -308,7 +308,7 @@ private Observable streamContent(HttpClientResponse { boolean useSnappy = true; return CompressionUtils.decompressAndBase64Decode(data, compressedBinaryInputEnabled, useSnappy, delimiter); - }) + }, 1) .takeUntil(shutdownSubject) .takeWhile((event) -> !isShutdown); } diff --git a/mantis-server/mantis-server-worker-client/src/test/java/io/mantisrx/server/worker/client/SseWorkerConnectionTest.java b/mantis-server/mantis-server-worker-client/src/test/java/io/mantisrx/server/worker/client/SseWorkerConnectionTest.java new file mode 100644 index 000000000..52870c01c --- /dev/null +++ b/mantis-server/mantis-server-worker-client/src/test/java/io/mantisrx/server/worker/client/SseWorkerConnectionTest.java @@ -0,0 +1,129 @@ +package io.mantisrx.server.worker.client; + +import io.mantisrx.common.MantisServerSentEvent; +import io.mantisrx.common.metrics.Counter; +import io.mantisrx.common.metrics.Metrics; +import io.mantisrx.common.metrics.MetricsRegistry; +import io.mantisrx.common.metrics.spectator.MetricGroupId; +import io.mantisrx.common.metrics.spectator.MetricId; +import io.netty.buffer.Unpooled; +import io.reactivx.mantis.operators.DropOperator; +import mantis.io.reactivex.netty.protocol.http.client.HttpClientResponse; +import mantis.io.reactivex.netty.protocol.http.sse.ServerSentEvent; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import rx.Observable; +import rx.observers.TestSubscriber; +import rx.schedulers.Schedulers; +import rx.schedulers.TestScheduler; + +import java.nio.charset.Charset; +import java.util.List; +import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class SseWorkerConnectionTest { + private static final Logger logger = LoggerFactory.getLogger(SseWorkerConnectionTest.class); + + @Test + public void testStreamContentDrops() throws Exception { + String metricGroupString = "testmetric"; + MetricGroupId metricGroupId = new MetricGroupId(metricGroupString); + SseWorkerConnection workerConnection = new SseWorkerConnection("connection_type", + "hostname", + 80, + b -> {}, + b -> {}, + t -> {}, + 600, + false, + new CopyOnWriteArraySet<>(), + 1, + null, + true, + metricGroupId); + HttpClientResponse response = mock(HttpClientResponse.class); + Metrics metrics = mock(Metrics.class); + Counter onNextCounter = new CounterImpl(new MetricId(metricGroupString, DropOperator.Counters.onNext.toString())); + Counter onErrorCounter = new CounterImpl(new MetricId(metricGroupString, DropOperator.Counters.onError.toString())); + Counter onCompleteCounter = new CounterImpl(new MetricId(metricGroupString, DropOperator.Counters.onComplete.toString())); + Counter droppedCounter = new CounterImpl(new MetricId(metricGroupString, DropOperator.Counters.dropped.toString())); + when(metrics.getMetricGroupId()).thenReturn(metricGroupId); + when(metrics.getCounter(DropOperator.Counters.onNext.toString())).thenReturn(onNextCounter); + when(metrics.getCounter(DropOperator.Counters.onError.toString())).thenReturn(onErrorCounter); + when(metrics.getCounter(DropOperator.Counters.onComplete.toString())).thenReturn(onCompleteCounter); + when(metrics.getCounter(DropOperator.Counters.dropped.toString())).thenReturn(droppedCounter); + MetricsRegistry.getInstance().registerAndGet(metrics); + TestScheduler testScheduler = Schedulers.test(); + + // Events are just "0", "1", "2", ... + Observable contentObs = Observable.interval(1, TimeUnit.SECONDS, testScheduler) + .map(t -> new ServerSentEvent(Unpooled.copiedBuffer(Long.toString(t), Charset.defaultCharset()))); + + when(response.getContent()).thenReturn(contentObs); + + TestSubscriber subscriber = new TestSubscriber<>(1); + + workerConnection.streamContent(response, b -> {}, 600, "delimiter").subscribeOn(testScheduler).subscribe(subscriber); + + testScheduler.advanceTimeBy(100, TimeUnit.SECONDS); + subscriber.assertValueCount(1); + List events = subscriber.getOnNextEvents(); + assertEquals("0", events.get(0).getEventAsString()); + logger.info("next: {}", onNextCounter.value()); + logger.info("drop: {}", droppedCounter.value()); + assertTrue(onNextCounter.value() < 10); + assertTrue(droppedCounter.value() > 90); + } + + public static class CounterImpl implements Counter { + private final AtomicLong count = new AtomicLong(); + private final MetricId id; + + public CounterImpl(MetricId id) { + this.id = id; + } + + @Override + public void increment() { + count.incrementAndGet(); + } + + @Override + public void increment(long x) { + count.addAndGet(x); + } + + @Override + public long value() { + return count.get(); + } + + @Override + public long rateValue() { + return -1; + } + + @Override + public long rateTimeInMilliseconds() { + return -1; + } + + @Override + public String event() { + return null; + } + + @Override + public MetricId id() { + return id; + } + } +}