Skip to content

Commit

Permalink
Feeds backpressure back to drop operator on SSE connection (#103)
Browse files Browse the repository at this point in the history

* Feeds backpressure back to drop operator on SSE connection

* Add test for serial consumption

* clean up test

Co-authored-by: Calvin Cheung <[email protected]>
  • Loading branch information
calvin681 and calvin681 authored May 10, 2021
1 parent d70af6f commit 4718691
Show file tree
Hide file tree
Showing 3 changed files with 133 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -113,10 +113,9 @@ public void onError(Throwable e) {
public void onNext(T t) {

if (requested.get() > 0) {
requested.decrementAndGet();
o.onNext(t);
next.increment();
requested.decrementAndGet();

} else {

dropped.increment();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ private void resetConnected() {
}
}

private Observable<MantisServerSentEvent> streamContent(HttpClientResponse<ServerSentEvent> response,
protected Observable<MantisServerSentEvent> streamContent(HttpClientResponse<ServerSentEvent> response,
final Action1<Boolean> updateDataRecvngStatus,
final long dataRecvTimeoutSecs, String delimiter) {
long interval = Math.max(1, dataRecvTimeoutSecs / 2);
Expand Down Expand Up @@ -297,7 +297,7 @@ private Observable<MantisServerSentEvent> streamContent(HttpClientResponse<Serve
return Observable.error(new SseException(ErrorType.Retryable, "Got error SSE event: " + t1.contentAsString()));
}
return Observable.just(t1.contentAsString());
})
}, 1)
.filter(data -> {
if (data.startsWith("ping")) {
pingCounter.increment();
Expand All @@ -308,7 +308,7 @@ private Observable<MantisServerSentEvent> streamContent(HttpClientResponse<Serve
.flatMapIterable((data) -> {
boolean useSnappy = true;
return CompressionUtils.decompressAndBase64Decode(data, compressedBinaryInputEnabled, useSnappy, delimiter);
})
}, 1)
.takeUntil(shutdownSubject)
.takeWhile((event) -> !isShutdown);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
package io.mantisrx.server.worker.client;

import io.mantisrx.common.MantisServerSentEvent;
import io.mantisrx.common.metrics.Counter;
import io.mantisrx.common.metrics.Metrics;
import io.mantisrx.common.metrics.MetricsRegistry;
import io.mantisrx.common.metrics.spectator.MetricGroupId;
import io.mantisrx.common.metrics.spectator.MetricId;
import io.netty.buffer.Unpooled;
import io.reactivx.mantis.operators.DropOperator;
import mantis.io.reactivex.netty.protocol.http.client.HttpClientResponse;
import mantis.io.reactivex.netty.protocol.http.sse.ServerSentEvent;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.observers.TestSubscriber;
import rx.schedulers.Schedulers;
import rx.schedulers.TestScheduler;

import java.nio.charset.Charset;
import java.util.List;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

public class SseWorkerConnectionTest {
private static final Logger logger = LoggerFactory.getLogger(SseWorkerConnectionTest.class);

@Test
public void testStreamContentDrops() throws Exception {
String metricGroupString = "testmetric";
MetricGroupId metricGroupId = new MetricGroupId(metricGroupString);
SseWorkerConnection workerConnection = new SseWorkerConnection("connection_type",
"hostname",
80,
b -> {},
b -> {},
t -> {},
600,
false,
new CopyOnWriteArraySet<>(),
1,
null,
true,
metricGroupId);
HttpClientResponse<ServerSentEvent> response = mock(HttpClientResponse.class);
Metrics metrics = mock(Metrics.class);
Counter onNextCounter = new CounterImpl(new MetricId(metricGroupString, DropOperator.Counters.onNext.toString()));
Counter onErrorCounter = new CounterImpl(new MetricId(metricGroupString, DropOperator.Counters.onError.toString()));
Counter onCompleteCounter = new CounterImpl(new MetricId(metricGroupString, DropOperator.Counters.onComplete.toString()));
Counter droppedCounter = new CounterImpl(new MetricId(metricGroupString, DropOperator.Counters.dropped.toString()));
when(metrics.getMetricGroupId()).thenReturn(metricGroupId);
when(metrics.getCounter(DropOperator.Counters.onNext.toString())).thenReturn(onNextCounter);
when(metrics.getCounter(DropOperator.Counters.onError.toString())).thenReturn(onErrorCounter);
when(metrics.getCounter(DropOperator.Counters.onComplete.toString())).thenReturn(onCompleteCounter);
when(metrics.getCounter(DropOperator.Counters.dropped.toString())).thenReturn(droppedCounter);
MetricsRegistry.getInstance().registerAndGet(metrics);
TestScheduler testScheduler = Schedulers.test();

// Events are just "0", "1", "2", ...
Observable<ServerSentEvent> contentObs = Observable.interval(1, TimeUnit.SECONDS, testScheduler)
.map(t -> new ServerSentEvent(Unpooled.copiedBuffer(Long.toString(t), Charset.defaultCharset())));

when(response.getContent()).thenReturn(contentObs);

TestSubscriber<MantisServerSentEvent> subscriber = new TestSubscriber<>(1);

workerConnection.streamContent(response, b -> {}, 600, "delimiter").subscribeOn(testScheduler).subscribe(subscriber);

testScheduler.advanceTimeBy(100, TimeUnit.SECONDS);
subscriber.assertValueCount(1);
List<MantisServerSentEvent> events = subscriber.getOnNextEvents();
assertEquals("0", events.get(0).getEventAsString());
logger.info("next: {}", onNextCounter.value());
logger.info("drop: {}", droppedCounter.value());
assertTrue(onNextCounter.value() < 10);
assertTrue(droppedCounter.value() > 90);
}

public static class CounterImpl implements Counter {
private final AtomicLong count = new AtomicLong();
private final MetricId id;

public CounterImpl(MetricId id) {
this.id = id;
}

@Override
public void increment() {
count.incrementAndGet();
}

@Override
public void increment(long x) {
count.addAndGet(x);
}

@Override
public long value() {
return count.get();
}

@Override
public long rateValue() {
return -1;
}

@Override
public long rateTimeInMilliseconds() {
return -1;
}

@Override
public String event() {
return null;
}

@Override
public MetricId id() {
return id;
}
}
}

0 comments on commit 4718691

Please sign in to comment.