diff --git a/build.gradle.kts b/build.gradle.kts
index 2bbe220..6dca13b 100644
--- a/build.gradle.kts
+++ b/build.gradle.kts
@@ -73,6 +73,8 @@ dependencies {
testImplementation("com.google.guava:guava:32.0.1-jre")
testImplementation("junit:junit:4.12")
testImplementation("org.hamcrest:hamcrest-all:1.3")
+ // Inspect private fields in tests.
+ testImplementation("org.easytesting:fest-reflect:1.4.1")
}
checkstyle {
diff --git a/src/main/java/com/launchdarkly/eventsource/ConnectStrategy.java b/src/main/java/com/launchdarkly/eventsource/ConnectStrategy.java
index 31b3239..af16edd 100644
--- a/src/main/java/com/launchdarkly/eventsource/ConnectStrategy.java
+++ b/src/main/java/com/launchdarkly/eventsource/ConnectStrategy.java
@@ -1,6 +1,7 @@
package com.launchdarkly.eventsource;
import com.launchdarkly.logging.LDLogger;
+import okhttp3.HttpUrl;
import java.io.Closeable;
import java.io.InputStream;
@@ -8,8 +9,6 @@
import java.net.URISyntaxException;
import java.net.URL;
-import okhttp3.HttpUrl;
-
/**
* An abstraction of how {@link EventSource} should obtain an input stream.
*
@@ -64,21 +63,39 @@ public static abstract class Client implements Closeable {
public static class Result {
private final InputStream inputStream;
private final URI origin;
- private final Closeable closer;
-
+ private final Closeable connectionCloser;
+ private final Closeable responseCloser;
+
/**
* Creates an instance.
*
* @param inputStream see {@link #getInputStream()}
* @param origin see {@link #getOrigin()}
- * @param closer see {@link #getCloser()}
+ * @param connectionCloser see {@link #getConnectionCloser()}
*/
- public Result(InputStream inputStream, URI origin, Closeable closer) {
+ public Result(InputStream inputStream, URI origin, Closeable connectionCloser) {
this.inputStream = inputStream;
this.origin = origin;
- this.closer = closer;
+ this.connectionCloser = connectionCloser;
+ this.responseCloser = null;
}
-
+
+ /**
+ * Creates an instance.
+ *
+ * @param inputStream see {@link #getInputStream()}
+ * @param origin see {@link #getOrigin()}
+ * @param connectionCloser see {@link #getConnectionCloser()}
+ * @param responseCloser see {@link #getResponseCloser()}
+ */
+ public Result(InputStream inputStream, URI origin, Closeable connectionCloser, Closeable responseCloser) {
+ this.inputStream = inputStream;
+ this.origin = origin;
+ this.connectionCloser = connectionCloser;
+ this.responseCloser = responseCloser;
+ }
+
+
/**
* The input stream that {@link EventSource} should read from.
*
@@ -115,8 +132,20 @@ public URI getOrigin() {
*
* @return a Closeable object or null
*/
- public Closeable getCloser() {
- return closer;
+ public Closeable getConnectionCloser() {
+ return connectionCloser;
+ }
+
+ /**
+ * An object that {@link EventSource} can use to close the streaming response.
+ * If this is not null, its {@link Closeable#close()} method will be
+ * called whenever the current streaming response is stopped either due to an
+ * error or because the caller explicitly closed the stream.
+ *
+ * @return a Closeable object or null
+ */
+ public Closeable getResponseCloser() {
+ return responseCloser;
}
}
diff --git a/src/main/java/com/launchdarkly/eventsource/EventSource.java b/src/main/java/com/launchdarkly/eventsource/EventSource.java
index cf276d9..2690de9 100644
--- a/src/main/java/com/launchdarkly/eventsource/EventSource.java
+++ b/src/main/java/com/launchdarkly/eventsource/EventSource.java
@@ -97,6 +97,7 @@ public class EventSource implements Closeable {
// be modified from other threads if they call stop() or interrupt(). We
// use AtomicReference because we need atomicity in updates.
private final AtomicReference connectionCloser = new AtomicReference<>();
+ private final AtomicReference responseCloser = new AtomicReference<>();
private final AtomicReference readingThread = new AtomicReference<>();
private final AtomicReference readyState;
@@ -308,12 +309,13 @@ private FaultEvent tryStart(boolean canReturnFaultEvent) throws StreamException
// transparently.
continue;
}
- // The ErrorStrategy told us to THROW rather than CONTINUE.
+ // The ErrorStrategy told us to THROW rather than CONTINUE.
throw exception;
}
- connectionCloser.set(clientResult.getCloser());
+ connectionCloser.set(clientResult.getConnectionCloser());
+ responseCloser.set(clientResult.getResponseCloser());
origin = clientResult.getOrigin() == null ? client.getOrigin() : clientResult.getOrigin();
connectedTime = System.currentTimeMillis();
logger.debug("Connected to SSE stream");
@@ -681,7 +683,20 @@ private boolean closeCurrentStream(boolean deliberatelyInterrupted, boolean shou
}
}
if (oldReadingThread == Thread.currentThread()) {
+ Closeable oldResponseCloser = this.responseCloser.getAndSet(null);
eventParser = null;
+ // Response can only be closed from reading thread. Otherwise, it will cause
+ // java.lang.IllegalStateException: Unbalanced enter/exit raised from okhttp
+ // since closing response will drain remaining chunks if exists, resulting in concurrent buffer source reading.
+ // which may conflict with reading thread.
+ if (oldResponseCloser != null) {
+ try {
+ oldResponseCloser.close();
+ logger.debug("Closed response");
+ } catch (IOException e) {
+ logger.warn("Unexpected error when closing response: {}", LogValues.exceptionSummary(e));
+ }
+ }
readyState.compareAndSet(ReadyState.OPEN, ReadyState.CLOSED);
readyState.compareAndSet(ReadyState.CONNECTING, ReadyState.CLOSED);
// If the current thread is not the reading thread, these fields will be updated the
@@ -936,7 +951,7 @@ public Builder readBufferSize(int readBufferSize) {
* // import com.launchdarkly.logging.*;
*
* builder.logger(
- * LDLogger.withAdapter(Logs.basic(), "logname")
+ * LDLogger.withAdapter(Logs.basic(), "logname")
* );
*
*
@@ -975,16 +990,16 @@ public Builder logger(LDLogger logger) {
* first and {@code event:} second, {@link MessageEvent#getEventName()} will not contain the value of
* {@code event:} but will be {@link MessageEvent#DEFAULT_EVENT_NAME} instead; similarly, an {@code id:} field will
* be ignored if it appears after {@code data:} in this mode. Therefore, you should only use this mode if the
- * server's behavior is predictable in this regard.
+ * server's behavior is predictable in this regard.
*
The SSE protocol specifies that an event should be processed only if it is terminated by a blank line, but
* in this mode the handler will receive the event as soon as a {@code data:} field appears-- so, if the stream
* happens to cut off abnormally without a trailing blank line, technically you will be receiving an incomplete
* event that should have been ignored. You will know this has happened ifbecause reading from the Reader throws
* a {@link StreamClosedWithIncompleteMessageException}.
- *
+ *
*
* @param streamEventData true if events should be dispatched immediately with asynchronous data rather than
- * read fully before dispatch
+ * read fully before dispatch
* @return the builder
* @see #expectFields(String...)
* @since 2.6.0
diff --git a/src/main/java/com/launchdarkly/eventsource/HttpConnectStrategy.java b/src/main/java/com/launchdarkly/eventsource/HttpConnectStrategy.java
index 0d535a4..520525f 100644
--- a/src/main/java/com/launchdarkly/eventsource/HttpConnectStrategy.java
+++ b/src/main/java/com/launchdarkly/eventsource/HttpConnectStrategy.java
@@ -465,8 +465,11 @@ public Result connect(String lastEventId) throws StreamException {
return new Result(
responseBody.byteStream(),
uri,
- new RequestCloser(call)
- );
+ new RequestCloser(call),
+ // prevent from connection leak warning from okhttp.
+ // see: okhttp3.internal.connection.RealConnectionPool.pruneAndGetAllocationCount
+ new ResponseCloser(response)
+ );
}
public void close() {
@@ -555,6 +558,19 @@ public void close() throws IOException {
}
}
+ private static class ResponseCloser implements Closeable {
+ private final Response response;
+
+ public ResponseCloser(Response response) {
+ this.response = response;
+ }
+
+ @Override
+ public void close() throws IOException {
+ this.response.close();
+ }
+ }
+
private static X509TrustManager defaultTrustManager() throws GeneralSecurityException {
TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance(
TrustManagerFactory.getDefaultAlgorithm());
diff --git a/src/test/java/com/launchdarkly/eventsource/EventSourceConnectStrategyUsageTest.java b/src/test/java/com/launchdarkly/eventsource/EventSourceConnectStrategyUsageTest.java
index dacba4d..cf1261b 100644
--- a/src/test/java/com/launchdarkly/eventsource/EventSourceConnectStrategyUsageTest.java
+++ b/src/test/java/com/launchdarkly/eventsource/EventSourceConnectStrategyUsageTest.java
@@ -345,6 +345,41 @@ public void close() throws IOException {
assertThat(calledClose.tryAcquire(1, TimeUnit.MILLISECONDS), is(true));
}
+ @Test
+ public void exceptionFromCloserDoesNotPreventClosingResponse() throws Exception {
+ Semaphore calledClose = new Semaphore(0);
+ ConnectStrategy cs = new ConnectStrategy() {
+ @Override
+ public Client createClient(LDLogger logger) {
+ return new ClientBaseImpl() {
+ @Override
+ public Result connect(String lastEventId) throws StreamException {
+ return new Result(makeEmptyInputStream(), null, new Closeable() {
+ @Override
+ public void close() throws IOException {
+
+ }
+ }, new Closeable() {
+ @Override
+ public void close() throws IOException {
+ calledClose.release();
+ throw new IOException("fake error");
+ }
+ });
+ }
+ };
+ }
+ };
+
+ try (EventSource es = new EventSource.Builder(cs).build()) {
+ es.start();
+
+ assertThat(calledClose.tryAcquire(1, TimeUnit.MILLISECONDS), is(false));
+ }
+ assertThat(calledClose.tryAcquire(1, TimeUnit.MILLISECONDS), is(true));
+ }
+
+
@Test
public void exceptionFromCloserDoesNotPreventClosingStream() throws Exception {
Semaphore calledClose = new Semaphore(0);
diff --git a/src/test/java/com/launchdarkly/eventsource/HttpConnectStrategyTest.java b/src/test/java/com/launchdarkly/eventsource/HttpConnectStrategyTest.java
index c2a8e9e..30b283d 100644
--- a/src/test/java/com/launchdarkly/eventsource/HttpConnectStrategyTest.java
+++ b/src/test/java/com/launchdarkly/eventsource/HttpConnectStrategyTest.java
@@ -319,7 +319,7 @@ public void clientCanForceStreamToClose() throws Exception {
int n = stream.read(b, 0, 5);
assertThat(n, equalTo(5));
- result.getCloser().close();
+ result.getConnectionCloser().close();
// This causes us to call the OkHttp method Call.cancel(). The InputStream is
// expected to throw an IOException on the next read, but it would also be
// acceptable for it to return EOF (-1).
@@ -371,7 +371,7 @@ private RequestInfo doRequestFrom(
try (HttpServer server = HttpServer.start(Handlers.status(200))) {
try (ConnectStrategy.Client client = hcs.uri(server.getUri()).createClient(testLogger.getLogger())) {
ConnectStrategy.Client.Result result = client.connect(lastEventId);
- result.getCloser().close();
+ result.getConnectionCloser().close();
return server.getRecorder().requireRequest();
}
diff --git a/src/test/java/com/launchdarkly/eventsource/HttpConnectStrategyWithEventSourceTest.java b/src/test/java/com/launchdarkly/eventsource/HttpConnectStrategyWithEventSourceTest.java
index 3162f0c..e8b2939 100644
--- a/src/test/java/com/launchdarkly/eventsource/HttpConnectStrategyWithEventSourceTest.java
+++ b/src/test/java/com/launchdarkly/eventsource/HttpConnectStrategyWithEventSourceTest.java
@@ -4,12 +4,18 @@
import com.launchdarkly.testhelpers.httptest.Handlers;
import com.launchdarkly.testhelpers.httptest.HttpServer;
+import org.fest.reflect.reference.TypeRef;
import org.junit.Rule;
import org.junit.Test;
+import java.io.Closeable;
+import java.util.concurrent.atomic.AtomicReference;
+
import static com.launchdarkly.eventsource.TestUtils.interruptOnAnotherThread;
+import static com.launchdarkly.eventsource.TestUtils.interruptOnAnotherThreadAfterDelay;
+import static org.fest.reflect.core.Reflection.field;
import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.*;
/**
* Tests of basic EventSource behavior using real HTTP requests.
@@ -26,14 +32,14 @@ public void eventSourceReadsEvents() throws Exception {
Handlers.SSE.event("b", "data2"),
Handlers.hang()
);
-
+
try (HttpServer server = HttpServer.start(response)) {
try (EventSource es = new EventSource.Builder(server.getUri()).build()) {
es.start();
-
+
assertThat(es.readAnyEvent(), equalTo(
new MessageEvent("a", "data1", null, es.getOrigin())));
-
+
assertThat(es.readAnyEvent(), equalTo(
new MessageEvent("b", "data2", null, es.getOrigin())));
}
@@ -59,13 +65,13 @@ public void eventSourceReconnectsAfterSocketClosed() throws Exception {
.retryDelay(1, null)
.build()) {
es.start();
-
+
assertThat(es.readAnyEvent(), equalTo(new MessageEvent("message", "first", null, es.getOrigin())));
-
+
assertThat(es.readAnyEvent(), equalTo(new FaultEvent(new StreamClosedByServerException())));
-
+
assertThat(es.readAnyEvent(), equalTo(new StartedEvent()));
-
+
assertThat(es.readAnyEvent(), equalTo(new MessageEvent("message", "second", null, es.getOrigin())));
}
}
@@ -91,17 +97,80 @@ public void eventSourceReconnectsAfterExternallyInterrupted() throws Exception {
.retryDelay(1, null)
.build()) {
es.start();
-
+
assertThat(es.readAnyEvent(), equalTo(new MessageEvent("message", "first", null, es.getOrigin())));
-
+
interruptOnAnotherThread(es);
assertThat(es.readAnyEvent(), equalTo(new FaultEvent(new StreamClosedByCallerException())));
-
+
+ assertThat(es.readAnyEvent(), equalTo(new StartedEvent()));
+
+ assertThat(es.readAnyEvent(), equalTo(new MessageEvent("message", "second", null, es.getOrigin())));
+ }
+ }
+ }
+
+ @Test
+ public void eventSourceReconnectsAfterExternallyInterruptedWithNewEventParser() throws Exception {
+ Handler response1 = Handlers.all(
+ Handlers.SSE.start(),
+ Handlers.SSE.event("message", "first"),
+ Handlers.SSE.leaveOpen()
+ );
+ Handler response2 = Handlers.all(
+ Handlers.SSE.start(),
+ Handlers.SSE.event("message", "second"),
+ Handlers.SSE.leaveOpen()
+ );
+ Handler allResponses = Handlers.sequential(response1, response2);
+
+ try (HttpServer server = HttpServer.start(allResponses)) {
+ AtomicReference holder = new AtomicReference<>();
+ try (EventSource es = new EventSource.Builder(server.getUri())
+ .errorStrategy(ErrorStrategy.alwaysContinue())
+ .retryDelay(1, null)
+ .build()) {
+ es.start();
+ holder.set(es);
+ AtomicReference connectionCloser = field("connectionCloser").ofType(new TypeRef>() {
+ }).in(es).get();
+ AtomicReference responseCloser = field("responseCloser").ofType(new TypeRef>() {
+ }).in(es).get();
+
+ assertThat(es.readAnyEvent(), equalTo(new MessageEvent("message", "first", null, es.getOrigin())));
+
+ interruptOnAnotherThread(es).join();
+
+ assertThat(connectionCloser.get(), nullValue());
+ // Response should be closed with reading thread.
+ assertThat(responseCloser.get(), notNullValue());
+
+ assertThat(es.readAnyEvent(), equalTo(new FaultEvent(new StreamClosedByCallerException())));
+
+ // All closed.
+ assertThat(connectionCloser.get(), nullValue());
+
+ assertThat(responseCloser.get(), nullValue());
+
+
assertThat(es.readAnyEvent(), equalTo(new StartedEvent()));
-
+ // All recreated
+ assertThat(connectionCloser.get(), notNullValue());
+
+ assertThat(responseCloser.get(), notNullValue());
+
assertThat(es.readAnyEvent(), equalTo(new MessageEvent("message", "second", null, es.getOrigin())));
}
+ AtomicReference connectionCloser = field("connectionCloser").ofType(new TypeRef>() {
+ }).in(holder.get()).get();
+ AtomicReference responseCloser = field("responseCloser").ofType(new TypeRef>() {
+ }).in(holder.get()).get();
+
+ // All closed by try-with-resource statement
+ assertThat(connectionCloser.get(), nullValue());
+
+ assertThat(responseCloser.get(), nullValue());
}
}
}
diff --git a/src/test/java/com/launchdarkly/eventsource/TestUtils.java b/src/test/java/com/launchdarkly/eventsource/TestUtils.java
index 3103c81..165ce45 100644
--- a/src/test/java/com/launchdarkly/eventsource/TestUtils.java
+++ b/src/test/java/com/launchdarkly/eventsource/TestUtils.java
@@ -13,21 +13,24 @@ public static String makeStringOfLength(int n) {
return sb.toString();
}
- public static void interruptOnAnotherThreadAfterDelay(EventSource es, long delayMillis) {
- new Thread(new Runnable() {
+ public static Thread interruptOnAnotherThreadAfterDelay(EventSource es, long delayMillis) {
+ Thread t = new Thread(new Runnable() {
public void run() {
try {
if (delayMillis > 0) {
Thread.sleep(delayMillis);
}
- } catch (InterruptedException e) {}
+ } catch (InterruptedException e) {
+ }
es.interrupt();
}
- }).start();
+ });
+ t.start();
+ return t;
}
- public static void interruptOnAnotherThread(EventSource es) {
- interruptOnAnotherThreadAfterDelay(es, 0);
+ public static Thread interruptOnAnotherThread(EventSource es) {
+ return interruptOnAnotherThreadAfterDelay(es, 0);
}
public static void interruptThisThreadFromAnotherThreadAfterDelay(long delayMillis) {