diff --git a/build.gradle.kts b/build.gradle.kts index 2bbe220..6dca13b 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -73,6 +73,8 @@ dependencies { testImplementation("com.google.guava:guava:32.0.1-jre") testImplementation("junit:junit:4.12") testImplementation("org.hamcrest:hamcrest-all:1.3") + // Inspect private fields in tests. + testImplementation("org.easytesting:fest-reflect:1.4.1") } checkstyle { diff --git a/src/main/java/com/launchdarkly/eventsource/ConnectStrategy.java b/src/main/java/com/launchdarkly/eventsource/ConnectStrategy.java index 31b3239..af16edd 100644 --- a/src/main/java/com/launchdarkly/eventsource/ConnectStrategy.java +++ b/src/main/java/com/launchdarkly/eventsource/ConnectStrategy.java @@ -1,6 +1,7 @@ package com.launchdarkly.eventsource; import com.launchdarkly.logging.LDLogger; +import okhttp3.HttpUrl; import java.io.Closeable; import java.io.InputStream; @@ -8,8 +9,6 @@ import java.net.URISyntaxException; import java.net.URL; -import okhttp3.HttpUrl; - /** * An abstraction of how {@link EventSource} should obtain an input stream. *

@@ -64,21 +63,39 @@ public static abstract class Client implements Closeable { public static class Result { private final InputStream inputStream; private final URI origin; - private final Closeable closer; - + private final Closeable connectionCloser; + private final Closeable responseCloser; + /** * Creates an instance. * * @param inputStream see {@link #getInputStream()} * @param origin see {@link #getOrigin()} - * @param closer see {@link #getCloser()} + * @param connectionCloser see {@link #getConnectionCloser()} */ - public Result(InputStream inputStream, URI origin, Closeable closer) { + public Result(InputStream inputStream, URI origin, Closeable connectionCloser) { this.inputStream = inputStream; this.origin = origin; - this.closer = closer; + this.connectionCloser = connectionCloser; + this.responseCloser = null; } - + + /** + * Creates an instance. + * + * @param inputStream see {@link #getInputStream()} + * @param origin see {@link #getOrigin()} + * @param connectionCloser see {@link #getConnectionCloser()} + * @param responseCloser see {@link #getResponseCloser()} + */ + public Result(InputStream inputStream, URI origin, Closeable connectionCloser, Closeable responseCloser) { + this.inputStream = inputStream; + this.origin = origin; + this.connectionCloser = connectionCloser; + this.responseCloser = responseCloser; + } + + /** * The input stream that {@link EventSource} should read from. * @@ -115,8 +132,20 @@ public URI getOrigin() { * * @return a Closeable object or null */ - public Closeable getCloser() { - return closer; + public Closeable getConnectionCloser() { + return connectionCloser; + } + + /** + * An object that {@link EventSource} can use to close the streaming response. + * If this is not null, its {@link Closeable#close()} method will be + * called whenever the current streaming response is stopped either due to an + * error or because the caller explicitly closed the stream. + * + * @return a Closeable object or null + */ + public Closeable getResponseCloser() { + return responseCloser; } } diff --git a/src/main/java/com/launchdarkly/eventsource/EventSource.java b/src/main/java/com/launchdarkly/eventsource/EventSource.java index cf276d9..2690de9 100644 --- a/src/main/java/com/launchdarkly/eventsource/EventSource.java +++ b/src/main/java/com/launchdarkly/eventsource/EventSource.java @@ -97,6 +97,7 @@ public class EventSource implements Closeable { // be modified from other threads if they call stop() or interrupt(). We // use AtomicReference because we need atomicity in updates. private final AtomicReference connectionCloser = new AtomicReference<>(); + private final AtomicReference responseCloser = new AtomicReference<>(); private final AtomicReference readingThread = new AtomicReference<>(); private final AtomicReference readyState; @@ -308,12 +309,13 @@ private FaultEvent tryStart(boolean canReturnFaultEvent) throws StreamException // transparently. continue; } - // The ErrorStrategy told us to THROW rather than CONTINUE. + // The ErrorStrategy told us to THROW rather than CONTINUE. throw exception; } - connectionCloser.set(clientResult.getCloser()); + connectionCloser.set(clientResult.getConnectionCloser()); + responseCloser.set(clientResult.getResponseCloser()); origin = clientResult.getOrigin() == null ? client.getOrigin() : clientResult.getOrigin(); connectedTime = System.currentTimeMillis(); logger.debug("Connected to SSE stream"); @@ -681,7 +683,20 @@ private boolean closeCurrentStream(boolean deliberatelyInterrupted, boolean shou } } if (oldReadingThread == Thread.currentThread()) { + Closeable oldResponseCloser = this.responseCloser.getAndSet(null); eventParser = null; + // Response can only be closed from reading thread. Otherwise, it will cause + // java.lang.IllegalStateException: Unbalanced enter/exit raised from okhttp + // since closing response will drain remaining chunks if exists, resulting in concurrent buffer source reading. + // which may conflict with reading thread. + if (oldResponseCloser != null) { + try { + oldResponseCloser.close(); + logger.debug("Closed response"); + } catch (IOException e) { + logger.warn("Unexpected error when closing response: {}", LogValues.exceptionSummary(e)); + } + } readyState.compareAndSet(ReadyState.OPEN, ReadyState.CLOSED); readyState.compareAndSet(ReadyState.CONNECTING, ReadyState.CLOSED); // If the current thread is not the reading thread, these fields will be updated the @@ -936,7 +951,7 @@ public Builder readBufferSize(int readBufferSize) { * // import com.launchdarkly.logging.*; * * builder.logger( - * LDLogger.withAdapter(Logs.basic(), "logname") + * LDLogger.withAdapter(Logs.basic(), "logname") * ); * *

@@ -975,16 +990,16 @@ public Builder logger(LDLogger logger) { * first and {@code event:} second, {@link MessageEvent#getEventName()} will not contain the value of * {@code event:} but will be {@link MessageEvent#DEFAULT_EVENT_NAME} instead; similarly, an {@code id:} field will * be ignored if it appears after {@code data:} in this mode. Therefore, you should only use this mode if the - * server's behavior is predictable in this regard. + * server's behavior is predictable in this regard. *

  • The SSE protocol specifies that an event should be processed only if it is terminated by a blank line, but * in this mode the handler will receive the event as soon as a {@code data:} field appears-- so, if the stream * happens to cut off abnormally without a trailing blank line, technically you will be receiving an incomplete * event that should have been ignored. You will know this has happened ifbecause reading from the Reader throws * a {@link StreamClosedWithIncompleteMessageException}.
  • - * + * * * @param streamEventData true if events should be dispatched immediately with asynchronous data rather than - * read fully before dispatch + * read fully before dispatch * @return the builder * @see #expectFields(String...) * @since 2.6.0 diff --git a/src/main/java/com/launchdarkly/eventsource/HttpConnectStrategy.java b/src/main/java/com/launchdarkly/eventsource/HttpConnectStrategy.java index 0d535a4..520525f 100644 --- a/src/main/java/com/launchdarkly/eventsource/HttpConnectStrategy.java +++ b/src/main/java/com/launchdarkly/eventsource/HttpConnectStrategy.java @@ -465,8 +465,11 @@ public Result connect(String lastEventId) throws StreamException { return new Result( responseBody.byteStream(), uri, - new RequestCloser(call) - ); + new RequestCloser(call), + // prevent from connection leak warning from okhttp. + // see: okhttp3.internal.connection.RealConnectionPool.pruneAndGetAllocationCount + new ResponseCloser(response) + ); } public void close() { @@ -555,6 +558,19 @@ public void close() throws IOException { } } + private static class ResponseCloser implements Closeable { + private final Response response; + + public ResponseCloser(Response response) { + this.response = response; + } + + @Override + public void close() throws IOException { + this.response.close(); + } + } + private static X509TrustManager defaultTrustManager() throws GeneralSecurityException { TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance( TrustManagerFactory.getDefaultAlgorithm()); diff --git a/src/test/java/com/launchdarkly/eventsource/EventSourceConnectStrategyUsageTest.java b/src/test/java/com/launchdarkly/eventsource/EventSourceConnectStrategyUsageTest.java index dacba4d..cf1261b 100644 --- a/src/test/java/com/launchdarkly/eventsource/EventSourceConnectStrategyUsageTest.java +++ b/src/test/java/com/launchdarkly/eventsource/EventSourceConnectStrategyUsageTest.java @@ -345,6 +345,41 @@ public void close() throws IOException { assertThat(calledClose.tryAcquire(1, TimeUnit.MILLISECONDS), is(true)); } + @Test + public void exceptionFromCloserDoesNotPreventClosingResponse() throws Exception { + Semaphore calledClose = new Semaphore(0); + ConnectStrategy cs = new ConnectStrategy() { + @Override + public Client createClient(LDLogger logger) { + return new ClientBaseImpl() { + @Override + public Result connect(String lastEventId) throws StreamException { + return new Result(makeEmptyInputStream(), null, new Closeable() { + @Override + public void close() throws IOException { + + } + }, new Closeable() { + @Override + public void close() throws IOException { + calledClose.release(); + throw new IOException("fake error"); + } + }); + } + }; + } + }; + + try (EventSource es = new EventSource.Builder(cs).build()) { + es.start(); + + assertThat(calledClose.tryAcquire(1, TimeUnit.MILLISECONDS), is(false)); + } + assertThat(calledClose.tryAcquire(1, TimeUnit.MILLISECONDS), is(true)); + } + + @Test public void exceptionFromCloserDoesNotPreventClosingStream() throws Exception { Semaphore calledClose = new Semaphore(0); diff --git a/src/test/java/com/launchdarkly/eventsource/HttpConnectStrategyTest.java b/src/test/java/com/launchdarkly/eventsource/HttpConnectStrategyTest.java index c2a8e9e..30b283d 100644 --- a/src/test/java/com/launchdarkly/eventsource/HttpConnectStrategyTest.java +++ b/src/test/java/com/launchdarkly/eventsource/HttpConnectStrategyTest.java @@ -319,7 +319,7 @@ public void clientCanForceStreamToClose() throws Exception { int n = stream.read(b, 0, 5); assertThat(n, equalTo(5)); - result.getCloser().close(); + result.getConnectionCloser().close(); // This causes us to call the OkHttp method Call.cancel(). The InputStream is // expected to throw an IOException on the next read, but it would also be // acceptable for it to return EOF (-1). @@ -371,7 +371,7 @@ private RequestInfo doRequestFrom( try (HttpServer server = HttpServer.start(Handlers.status(200))) { try (ConnectStrategy.Client client = hcs.uri(server.getUri()).createClient(testLogger.getLogger())) { ConnectStrategy.Client.Result result = client.connect(lastEventId); - result.getCloser().close(); + result.getConnectionCloser().close(); return server.getRecorder().requireRequest(); } diff --git a/src/test/java/com/launchdarkly/eventsource/HttpConnectStrategyWithEventSourceTest.java b/src/test/java/com/launchdarkly/eventsource/HttpConnectStrategyWithEventSourceTest.java index 3162f0c..e8b2939 100644 --- a/src/test/java/com/launchdarkly/eventsource/HttpConnectStrategyWithEventSourceTest.java +++ b/src/test/java/com/launchdarkly/eventsource/HttpConnectStrategyWithEventSourceTest.java @@ -4,12 +4,18 @@ import com.launchdarkly.testhelpers.httptest.Handlers; import com.launchdarkly.testhelpers.httptest.HttpServer; +import org.fest.reflect.reference.TypeRef; import org.junit.Rule; import org.junit.Test; +import java.io.Closeable; +import java.util.concurrent.atomic.AtomicReference; + import static com.launchdarkly.eventsource.TestUtils.interruptOnAnotherThread; +import static com.launchdarkly.eventsource.TestUtils.interruptOnAnotherThreadAfterDelay; +import static org.fest.reflect.core.Reflection.field; import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.*; /** * Tests of basic EventSource behavior using real HTTP requests. @@ -26,14 +32,14 @@ public void eventSourceReadsEvents() throws Exception { Handlers.SSE.event("b", "data2"), Handlers.hang() ); - + try (HttpServer server = HttpServer.start(response)) { try (EventSource es = new EventSource.Builder(server.getUri()).build()) { es.start(); - + assertThat(es.readAnyEvent(), equalTo( new MessageEvent("a", "data1", null, es.getOrigin()))); - + assertThat(es.readAnyEvent(), equalTo( new MessageEvent("b", "data2", null, es.getOrigin()))); } @@ -59,13 +65,13 @@ public void eventSourceReconnectsAfterSocketClosed() throws Exception { .retryDelay(1, null) .build()) { es.start(); - + assertThat(es.readAnyEvent(), equalTo(new MessageEvent("message", "first", null, es.getOrigin()))); - + assertThat(es.readAnyEvent(), equalTo(new FaultEvent(new StreamClosedByServerException()))); - + assertThat(es.readAnyEvent(), equalTo(new StartedEvent())); - + assertThat(es.readAnyEvent(), equalTo(new MessageEvent("message", "second", null, es.getOrigin()))); } } @@ -91,17 +97,80 @@ public void eventSourceReconnectsAfterExternallyInterrupted() throws Exception { .retryDelay(1, null) .build()) { es.start(); - + assertThat(es.readAnyEvent(), equalTo(new MessageEvent("message", "first", null, es.getOrigin()))); - + interruptOnAnotherThread(es); assertThat(es.readAnyEvent(), equalTo(new FaultEvent(new StreamClosedByCallerException()))); - + + assertThat(es.readAnyEvent(), equalTo(new StartedEvent())); + + assertThat(es.readAnyEvent(), equalTo(new MessageEvent("message", "second", null, es.getOrigin()))); + } + } + } + + @Test + public void eventSourceReconnectsAfterExternallyInterruptedWithNewEventParser() throws Exception { + Handler response1 = Handlers.all( + Handlers.SSE.start(), + Handlers.SSE.event("message", "first"), + Handlers.SSE.leaveOpen() + ); + Handler response2 = Handlers.all( + Handlers.SSE.start(), + Handlers.SSE.event("message", "second"), + Handlers.SSE.leaveOpen() + ); + Handler allResponses = Handlers.sequential(response1, response2); + + try (HttpServer server = HttpServer.start(allResponses)) { + AtomicReference holder = new AtomicReference<>(); + try (EventSource es = new EventSource.Builder(server.getUri()) + .errorStrategy(ErrorStrategy.alwaysContinue()) + .retryDelay(1, null) + .build()) { + es.start(); + holder.set(es); + AtomicReference connectionCloser = field("connectionCloser").ofType(new TypeRef>() { + }).in(es).get(); + AtomicReference responseCloser = field("responseCloser").ofType(new TypeRef>() { + }).in(es).get(); + + assertThat(es.readAnyEvent(), equalTo(new MessageEvent("message", "first", null, es.getOrigin()))); + + interruptOnAnotherThread(es).join(); + + assertThat(connectionCloser.get(), nullValue()); + // Response should be closed with reading thread. + assertThat(responseCloser.get(), notNullValue()); + + assertThat(es.readAnyEvent(), equalTo(new FaultEvent(new StreamClosedByCallerException()))); + + // All closed. + assertThat(connectionCloser.get(), nullValue()); + + assertThat(responseCloser.get(), nullValue()); + + assertThat(es.readAnyEvent(), equalTo(new StartedEvent())); - + // All recreated + assertThat(connectionCloser.get(), notNullValue()); + + assertThat(responseCloser.get(), notNullValue()); + assertThat(es.readAnyEvent(), equalTo(new MessageEvent("message", "second", null, es.getOrigin()))); } + AtomicReference connectionCloser = field("connectionCloser").ofType(new TypeRef>() { + }).in(holder.get()).get(); + AtomicReference responseCloser = field("responseCloser").ofType(new TypeRef>() { + }).in(holder.get()).get(); + + // All closed by try-with-resource statement + assertThat(connectionCloser.get(), nullValue()); + + assertThat(responseCloser.get(), nullValue()); } } } diff --git a/src/test/java/com/launchdarkly/eventsource/TestUtils.java b/src/test/java/com/launchdarkly/eventsource/TestUtils.java index 3103c81..165ce45 100644 --- a/src/test/java/com/launchdarkly/eventsource/TestUtils.java +++ b/src/test/java/com/launchdarkly/eventsource/TestUtils.java @@ -13,21 +13,24 @@ public static String makeStringOfLength(int n) { return sb.toString(); } - public static void interruptOnAnotherThreadAfterDelay(EventSource es, long delayMillis) { - new Thread(new Runnable() { + public static Thread interruptOnAnotherThreadAfterDelay(EventSource es, long delayMillis) { + Thread t = new Thread(new Runnable() { public void run() { try { if (delayMillis > 0) { Thread.sleep(delayMillis); } - } catch (InterruptedException e) {} + } catch (InterruptedException e) { + } es.interrupt(); } - }).start(); + }); + t.start(); + return t; } - public static void interruptOnAnotherThread(EventSource es) { - interruptOnAnotherThreadAfterDelay(es, 0); + public static Thread interruptOnAnotherThread(EventSource es) { + return interruptOnAnotherThreadAfterDelay(es, 0); } public static void interruptThisThreadFromAnotherThreadAfterDelay(long delayMillis) {