From d38a511591beecd679432614f5b8991d0ccfc2d9 Mon Sep 17 00:00:00 2001 From: LinZong Date: Sun, 7 Jan 2024 01:48:05 +0800 Subject: [PATCH 1/6] bugfix: Fix connection leak warning from okhttp connection pool. --- .../eventsource/ConnectStrategy.java | 49 +++++-- .../launchdarkly/eventsource/EventSource.java | 133 ++++++++++-------- .../eventsource/HttpConnectStrategy.java | 16 ++- .../eventsource/HttpConnectStrategyTest.java | 4 +- 4 files changed, 131 insertions(+), 71 deletions(-) diff --git a/src/main/java/com/launchdarkly/eventsource/ConnectStrategy.java b/src/main/java/com/launchdarkly/eventsource/ConnectStrategy.java index 31b3239..af16edd 100644 --- a/src/main/java/com/launchdarkly/eventsource/ConnectStrategy.java +++ b/src/main/java/com/launchdarkly/eventsource/ConnectStrategy.java @@ -1,6 +1,7 @@ package com.launchdarkly.eventsource; import com.launchdarkly.logging.LDLogger; +import okhttp3.HttpUrl; import java.io.Closeable; import java.io.InputStream; @@ -8,8 +9,6 @@ import java.net.URISyntaxException; import java.net.URL; -import okhttp3.HttpUrl; - /** * An abstraction of how {@link EventSource} should obtain an input stream. *

@@ -64,21 +63,39 @@ public static abstract class Client implements Closeable { public static class Result { private final InputStream inputStream; private final URI origin; - private final Closeable closer; - + private final Closeable connectionCloser; + private final Closeable responseCloser; + /** * Creates an instance. * * @param inputStream see {@link #getInputStream()} * @param origin see {@link #getOrigin()} - * @param closer see {@link #getCloser()} + * @param connectionCloser see {@link #getConnectionCloser()} */ - public Result(InputStream inputStream, URI origin, Closeable closer) { + public Result(InputStream inputStream, URI origin, Closeable connectionCloser) { this.inputStream = inputStream; this.origin = origin; - this.closer = closer; + this.connectionCloser = connectionCloser; + this.responseCloser = null; } - + + /** + * Creates an instance. + * + * @param inputStream see {@link #getInputStream()} + * @param origin see {@link #getOrigin()} + * @param connectionCloser see {@link #getConnectionCloser()} + * @param responseCloser see {@link #getResponseCloser()} + */ + public Result(InputStream inputStream, URI origin, Closeable connectionCloser, Closeable responseCloser) { + this.inputStream = inputStream; + this.origin = origin; + this.connectionCloser = connectionCloser; + this.responseCloser = responseCloser; + } + + /** * The input stream that {@link EventSource} should read from. * @@ -115,8 +132,20 @@ public URI getOrigin() { * * @return a Closeable object or null */ - public Closeable getCloser() { - return closer; + public Closeable getConnectionCloser() { + return connectionCloser; + } + + /** + * An object that {@link EventSource} can use to close the streaming response. + * If this is not null, its {@link Closeable#close()} method will be + * called whenever the current streaming response is stopped either due to an + * error or because the caller explicitly closed the stream. + * + * @return a Closeable object or null + */ + public Closeable getResponseCloser() { + return responseCloser; } } diff --git a/src/main/java/com/launchdarkly/eventsource/EventSource.java b/src/main/java/com/launchdarkly/eventsource/EventSource.java index cf276d9..6b94edb 100644 --- a/src/main/java/com/launchdarkly/eventsource/EventSource.java +++ b/src/main/java/com/launchdarkly/eventsource/EventSource.java @@ -32,7 +32,7 @@ * The client uses a pull model where the caller starts the EventSource and then requests * data from it synchronously on a single thread. The initial connection attempt is made * when you call {@link #start()}, or when you first attempt to read an event. - * + * * To read events from the stream, you can either request them one at a time by calling * {@link #readMessage()} or {@link #readAnyEvent()}, or consume them in a loop by calling * {@link #messages()} or {@link #anyEvents()}. The "message" methods assume you are only @@ -70,11 +70,11 @@ public class EventSource implements Closeable { * The default value for {@link Builder#readBufferSize(int)}. */ public static final int DEFAULT_READ_BUFFER_SIZE = 1000; - + // Note that some fields have package-private visibility for tests. - + private final Object sleepNotifier = new Object(); - + // The following final fields are set from the configuration builder. private final ConnectStrategy.Client client; final int readBufferSize; @@ -83,7 +83,7 @@ public class EventSource implements Closeable { final long retryDelayResetThresholdMillis; final boolean streamEventData; final Set expectFields; - + // The following mutable fields are not volatile because they should only be // accessed from the thread that is reading from EventSource. private EventParser eventParser; @@ -97,6 +97,7 @@ public class EventSource implements Closeable { // be modified from other threads if they call stop() or interrupt(). We // use AtomicReference because we need atomicity in updates. private final AtomicReference connectionCloser = new AtomicReference<>(); + private final AtomicReference responseCloser = new AtomicReference<>(); private final AtomicReference readingThread = new AtomicReference<>(); private final AtomicReference readyState; @@ -104,7 +105,7 @@ public class EventSource implements Closeable { // and are read by the thread that is reading the stream. private volatile boolean deliberatelyClosedConnection; private volatile boolean calledStop; - + // These fields are written by the thread that is reading the stream, and can // be read by other threads to inspect the state of the stream. volatile long baseRetryDelayMillis; // set at config time but may be changed by a "retry:" value @@ -139,7 +140,7 @@ public class EventSource implements Closeable { public URI getOrigin() { return origin; } - + /** * Returns the logger that this EventSource is using. * @@ -166,7 +167,7 @@ public ReadyState getState() { * This can be set initially with {@link Builder#lastEventId(String)}, and is updated whenever an event * is received that has an ID. Whether event IDs are supported depends on the server; it may ignore this * value. - * + * * @return the last known event ID, or null * @see Builder#lastEventId(String) * @since 2.0.0 @@ -192,7 +193,7 @@ public String getLastEventId() { public long getBaseRetryDelayMillis() { return baseRetryDelayMillis; } - + /** * Returns the retry delay that will be used for the next reconnection, if the * stream has failed. @@ -212,7 +213,7 @@ public long getBaseRetryDelayMillis() { public long getNextRetryDelayMillis() { return nextReconnectDelayMillis; } - + /** * Attempts to start the stream if it is not already active. *

@@ -244,7 +245,7 @@ public long getNextRetryDelayMillis() { public void start() throws StreamException { tryStart(false); } - + private FaultEvent tryStart(boolean canReturnFaultEvent) throws StreamException { if (eventParser != null) { return null; @@ -253,7 +254,7 @@ private FaultEvent tryStart(boolean canReturnFaultEvent) throws StreamException while (true) { StreamException exception = null; - + if (nextReconnectDelayMillis > 0) { long delayNow = disconnectedTime == 0 ? nextReconnectDelayMillis : (nextReconnectDelayMillis - (System.currentTimeMillis() - disconnectedTime)); @@ -279,22 +280,22 @@ private FaultEvent tryStart(boolean canReturnFaultEvent) throws StreamException } } } - + ConnectStrategy.Client.Result clientResult = null; - + if (exception == null) { readyState.set(ReadyState.CONNECTING); - + connectedTime = 0; deliberatelyClosedConnection = calledStop = false; - + try { clientResult = client.connect(lastEventId); } catch (StreamException e) { exception = e; } } - + if (exception != null) { disconnectedTime = System.currentTimeMillis(); computeReconnectDelay(); @@ -308,16 +309,18 @@ private FaultEvent tryStart(boolean canReturnFaultEvent) throws StreamException // transparently. continue; } - // The ErrorStrategy told us to THROW rather than CONTINUE. + // The ErrorStrategy told us to THROW rather than CONTINUE. throw exception; } - - - connectionCloser.set(clientResult.getCloser()); + + + connectionCloser.set(clientResult.getConnectionCloser()); + responseCloser.set(clientResult.getResponseCloser()); + origin = clientResult.getOrigin() == null ? client.getOrigin() : clientResult.getOrigin(); connectedTime = System.currentTimeMillis(); logger.debug("Connected to SSE stream"); - + eventParser = new EventParser( clientResult.getInputStream(), clientResult.getOrigin(), @@ -326,14 +329,14 @@ private FaultEvent tryStart(boolean canReturnFaultEvent) throws StreamException expectFields, logger ); - + readyState.set(ReadyState.OPEN); currentErrorStrategy = baseErrorStrategy; return null; } } - + /** * Attempts to receive a message from the stream. *

@@ -347,7 +350,7 @@ private FaultEvent tryStart(boolean canReturnFaultEvent) throws StreamException *

* This method must be called from the same thread that first started using the * stream (that is, the thread that called {@link #start()} or read the first event). - * + * * @return an SSE message * @throws StreamException if there is an error and retry is not enabled * @see #readAnyEvent() @@ -362,7 +365,7 @@ public MessageEvent readMessage() throws StreamException { } } } - + /** * Attempts to receive an event of any kind from the stream. *

@@ -417,7 +420,7 @@ public Iterator iterator() { } }; } - + /** * Returns an iterable sequence of events. *

@@ -528,7 +531,7 @@ public void close() { if (currentState == SHUTDOWN) { return; } - + closeCurrentStream(true, true); try { client.close(); @@ -552,12 +555,12 @@ public boolean awaitClosed(long timeout, TimeUnit timeUnit) throws InterruptedEx // Iterator implementation used by messages() and anyEvents() private class IteratorImpl implements Iterator { private final Class filterClass; - + IteratorImpl(Class filterClass) { this.filterClass = filterClass; calledStop = false; } - + public boolean hasNext() { while (true) { if (nextEvent != null && filterClass.isAssignableFrom(nextEvent.getClass())) { @@ -574,7 +577,7 @@ public boolean hasNext() { } } } - + public T next() { while (nextEvent == null || !filterClass.isAssignableFrom(nextEvent.getClass()) && hasNext()) {} @SuppressWarnings("unchecked") @@ -586,7 +589,7 @@ public T next() { private StreamEvent requireEvent() throws StreamException { readingThread.set(Thread.currentThread()); - + try { while (true) { // Reading an event implies starting the stream if it isn't already started. @@ -629,7 +632,7 @@ private StreamEvent requireEvent() throws StreamException { throw e; } } - + private void resetRetryDelayStrategy() { logger.debug("Resetting retry delay strategy to initial state"); currentRetryDelayStrategy = baseRetryDelayStrategy; @@ -642,7 +645,7 @@ private ErrorStrategy.Action applyErrorStrategy(StreamException e) { } return errorStrategyResult.getAction(); } - + private void computeReconnectDelay() { if (retryDelayResetThresholdMillis > 0 && connectedTime != 0) { long connectionDurationMillis = System.currentTimeMillis() - connectedTime; @@ -660,11 +663,12 @@ private void computeReconnectDelay() { private boolean closeCurrentStream(boolean deliberatelyInterrupted, boolean shouldStopIterating) { Closeable oldConnectionCloser = this.connectionCloser.getAndSet(null); + Thread oldReadingThread = readingThread.getAndSet(null); if (oldConnectionCloser == null && oldReadingThread == null) { return false; } - + synchronized (sleepNotifier) { // this synchronization prevents a race condition in start() if (deliberatelyInterrupted) { this.deliberatelyClosedConnection = true; @@ -681,18 +685,31 @@ private boolean closeCurrentStream(boolean deliberatelyInterrupted, boolean shou } } if (oldReadingThread == Thread.currentThread()) { + Closeable oldResponseCloser = this.responseCloser.getAndSet(null); eventParser = null; + // Response can only be closed from reading thread. Otherwise, it will cause + // java.lang.IllegalStateException: Unbalanced enter/exit raised from okhttp + // since closing response will drain remaining chunks if exists, resulting in concurrent buffer source reading. + // which may conflict with reading thread. + if (oldResponseCloser != null) { + try { + oldResponseCloser.close(); + logger.debug("Closed response"); + } catch (IOException e) { + logger.warn("Unexpected error when closing response: {}", LogValues.exceptionSummary(e)); + } + } readyState.compareAndSet(ReadyState.OPEN, ReadyState.CLOSED); readyState.compareAndSet(ReadyState.CONNECTING, ReadyState.CLOSED); // If the current thread is not the reading thread, these fields will be updated the // next time the reading thread tries to do a read. } - + sleepNotifier.notify(); // in case we're sleeping in a reconnect delay, wake us up } return true; } - + /** * Builder for configuring {@link EventSource}. */ @@ -725,7 +742,7 @@ public static final class Builder { *

* Or, if you want to consume an input stream from some other source, you can * create your own subclass of {@link ConnectStrategy}. - * + * * @param connectStrategy the object that will manage the input stream; * must not be null * @since 4.0.0 @@ -779,11 +796,11 @@ public Builder(URL url) { *

* This is the same as {@link #Builder(URI)}, but using the OkHttp type * {@link HttpUrl}. - * + * * @param url the stream URL * @throws IllegalArgumentException if the argument is null, or if the endpoint * is not HTTP or HTTPS - * + * * @since 1.9.0 * @see #Builder(ConnectStrategy) * @see #Builder(URI) @@ -792,7 +809,7 @@ public Builder(URL url) { public Builder(HttpUrl url) { this(ConnectStrategy.http(url)); } - + /** * Specifies a strategy for determining whether to handle errors transparently * or throw them as exceptions. @@ -802,7 +819,7 @@ public Builder(HttpUrl url) { * may instead use alternate {@link ErrorStrategy} implementations, such as * {@link ErrorStrategy#alwaysContinue()}, or a custom implementation, to allow * EventSource to continue after an error. - * + * * @param errorStrategy the object that will control error handling; if null, * defaults to {@link ErrorStrategy#alwaysThrow()} * @return the builder @@ -812,7 +829,7 @@ public Builder errorStrategy(ErrorStrategy errorStrategy) { this.errorStrategy = errorStrategy; return this; } - + /** * Sets the ID value of the last event received. *

@@ -820,7 +837,7 @@ public Builder errorStrategy(ErrorStrategy errorStrategy) { * skip past previously sent events if it supports this behavior. Once the connection is established, * this value will be updated whenever an event is received that has an ID. Whether event IDs are * supported depends on the server; it may ignore this value. - * + * * @param lastEventId the last event identifier * @return the builder * @since 2.0.0 @@ -841,7 +858,7 @@ public Builder lastEventId(String lastEventId) { * If you set the base delay to zero, the backoff logic will not apply-- multiplying by * zero gives zero every time. Therefore, use a zero delay with caution since it could * cause a reconnect storm during a service interruption. - * + * * @param retryDelay the base delay, in whatever time unit is specified by {@code timeUnit} * @param timeUnit the time unit, or {@code TimeUnit.MILLISECONDS} if null * @return the builder @@ -864,7 +881,7 @@ public Builder retryDelay(long retryDelay, TimeUnit timeUnit) { * is to apply an exponential backoff and jitter. You may instead use a modified * version of {@link DefaultRetryDelayStrategy} to customize the backoff and * jitter, or a custom implementation with any other logic. - * + * * @param retryDelayStrategy the object that will control retry delays; if null, * defaults to {@link RetryDelayStrategy#defaultStrategy()} * @return the builder @@ -886,7 +903,7 @@ public Builder retryDelayStrategy(RetryDelayStrategy retryDelayStrategy) { * connection lasted longer than the threshold, in which case the delay will start over at the * initial minimum value. This prevents long delays from occurring on connections that are only * rarely restarted. - * + * * @param retryDelayResetThreshold the minimum time that a connection must stay open to avoid resetting * the delay, in whatever time unit is specified by {@code timeUnit} * @param timeUnit the time unit, or {@code TimeUnit.MILLISECONDS} if null @@ -909,7 +926,7 @@ public Builder retryDelayResetThreshold(long retryDelayResetThreshold, TimeUnit * Therefore, if an application expects to see many lines in the stream that are longer * than {@link EventSource#DEFAULT_READ_BUFFER_SIZE}, it can specify a larger buffer size * to avoid unnecessary heap allocations. - * + * * @param readBufferSize the buffer size * @return the builder * @throws IllegalArgumentException if the size is less than or equal to zero @@ -923,7 +940,7 @@ public Builder readBufferSize(int readBufferSize) { this.readBufferSize = readBufferSize; return this; } - + /** * Specifies a custom logger to receive EventSource logging. *

@@ -934,14 +951,14 @@ public Builder readBufferSize(int readBufferSize) { * the basic console logging implementation, and to tag the output with the name "logname": *


      *   // import com.launchdarkly.logging.*;
-     *   
+     *
      *   builder.logger(
-     *      LDLogger.withAdapter(Logs.basic(), "logname") 
+     *      LDLogger.withAdapter(Logs.basic(), "logname")
      *   );
      * 
*

* If you do not provide a logger, the default is there is no log output. - * + * * @param logger an {@link LDLogger} implementation, or null for no logging * @return the builder * @since 2.7.0 @@ -975,16 +992,16 @@ public Builder logger(LDLogger logger) { * first and {@code event:} second, {@link MessageEvent#getEventName()} will not contain the value of * {@code event:} but will be {@link MessageEvent#DEFAULT_EVENT_NAME} instead; similarly, an {@code id:} field will * be ignored if it appears after {@code data:} in this mode. Therefore, you should only use this mode if the - * server's behavior is predictable in this regard. + * server's behavior is predictable in this regard. *

  • The SSE protocol specifies that an event should be processed only if it is terminated by a blank line, but * in this mode the handler will receive the event as soon as a {@code data:} field appears-- so, if the stream * happens to cut off abnormally without a trailing blank line, technically you will be receiving an incomplete * event that should have been ignored. You will know this has happened ifbecause reading from the Reader throws * a {@link StreamClosedWithIncompleteMessageException}.
  • - * - * + * + * * @param streamEventData true if events should be dispatched immediately with asynchronous data rather than - * read fully before dispatch + * read fully before dispatch * @return the builder * @see #expectFields(String...) * @since 2.6.0 @@ -1017,7 +1034,7 @@ public Builder streamEventData(boolean streamEventData) { *

    * Such behavior is not automatic because in some applications, there might never be an {@code event:} field, * and EventSource has no way to anticipate this. - * + * * @param fieldNames a list of SSE field names (case-sensitive; any names other than "event" and "id" are ignored) * @return the builder * @see #streamEventData(boolean) @@ -1036,7 +1053,7 @@ public Builder expectFields(String... fieldNames) { } return this; } - + /** * Constructs an {@link EventSource} using the builder's current properties. * @return the new EventSource instance diff --git a/src/main/java/com/launchdarkly/eventsource/HttpConnectStrategy.java b/src/main/java/com/launchdarkly/eventsource/HttpConnectStrategy.java index 0d535a4..975f787 100644 --- a/src/main/java/com/launchdarkly/eventsource/HttpConnectStrategy.java +++ b/src/main/java/com/launchdarkly/eventsource/HttpConnectStrategy.java @@ -465,7 +465,8 @@ public Result connect(String lastEventId) throws StreamException { return new Result( responseBody.byteStream(), uri, - new RequestCloser(call) + new RequestCloser(call), + new ResponseCloser(response) ); } @@ -555,6 +556,19 @@ public void close() throws IOException { } } + private static class ResponseCloser implements Closeable { + private final Response response; + + public ResponseCloser(Response response) { + this.response = response; + } + + @Override + public void close() throws IOException { + this.response.close(); + } + } + private static X509TrustManager defaultTrustManager() throws GeneralSecurityException { TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance( TrustManagerFactory.getDefaultAlgorithm()); diff --git a/src/test/java/com/launchdarkly/eventsource/HttpConnectStrategyTest.java b/src/test/java/com/launchdarkly/eventsource/HttpConnectStrategyTest.java index c2a8e9e..30b283d 100644 --- a/src/test/java/com/launchdarkly/eventsource/HttpConnectStrategyTest.java +++ b/src/test/java/com/launchdarkly/eventsource/HttpConnectStrategyTest.java @@ -319,7 +319,7 @@ public void clientCanForceStreamToClose() throws Exception { int n = stream.read(b, 0, 5); assertThat(n, equalTo(5)); - result.getCloser().close(); + result.getConnectionCloser().close(); // This causes us to call the OkHttp method Call.cancel(). The InputStream is // expected to throw an IOException on the next read, but it would also be // acceptable for it to return EOF (-1). @@ -371,7 +371,7 @@ private RequestInfo doRequestFrom( try (HttpServer server = HttpServer.start(Handlers.status(200))) { try (ConnectStrategy.Client client = hcs.uri(server.getUri()).createClient(testLogger.getLogger())) { ConnectStrategy.Client.Result result = client.connect(lastEventId); - result.getCloser().close(); + result.getConnectionCloser().close(); return server.getRecorder().requireRequest(); } From 6cc637fec0a674948fa594d0131c0773ef46f871 Mon Sep 17 00:00:00 2001 From: LinZong Date: Sun, 7 Jan 2024 01:58:16 +0800 Subject: [PATCH 2/6] chore: Add comment for ResponseCloser describes its meaning --- .../java/com/launchdarkly/eventsource/HttpConnectStrategy.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/main/java/com/launchdarkly/eventsource/HttpConnectStrategy.java b/src/main/java/com/launchdarkly/eventsource/HttpConnectStrategy.java index 975f787..d2a62e9 100644 --- a/src/main/java/com/launchdarkly/eventsource/HttpConnectStrategy.java +++ b/src/main/java/com/launchdarkly/eventsource/HttpConnectStrategy.java @@ -466,6 +466,8 @@ public Result connect(String lastEventId) throws StreamException { responseBody.byteStream(), uri, new RequestCloser(call), + // prevent from connection leak warning from okhttp. + // see: okhttp3.internal.connection.RealConnectionPool.pruneAndGetAllocationCount new ResponseCloser(response) ); } From d02e0edb7d396061cbb8b74b6566758b6bfcedef Mon Sep 17 00:00:00 2001 From: LinZong Date: Sun, 7 Jan 2024 02:17:30 +0800 Subject: [PATCH 3/6] test: Add test for responseCloser --- build.gradle.kts | 1 + ...ttpConnectStrategyWithEventSourceTest.java | 93 ++++++++++++++++--- .../launchdarkly/eventsource/TestUtils.java | 15 +-- 3 files changed, 91 insertions(+), 18 deletions(-) diff --git a/build.gradle.kts b/build.gradle.kts index 2bbe220..f4d6567 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -73,6 +73,7 @@ dependencies { testImplementation("com.google.guava:guava:32.0.1-jre") testImplementation("junit:junit:4.12") testImplementation("org.hamcrest:hamcrest-all:1.3") + testImplementation("org.easytesting:fest-reflect:1.4.1") } checkstyle { diff --git a/src/test/java/com/launchdarkly/eventsource/HttpConnectStrategyWithEventSourceTest.java b/src/test/java/com/launchdarkly/eventsource/HttpConnectStrategyWithEventSourceTest.java index 3162f0c..e8b2939 100644 --- a/src/test/java/com/launchdarkly/eventsource/HttpConnectStrategyWithEventSourceTest.java +++ b/src/test/java/com/launchdarkly/eventsource/HttpConnectStrategyWithEventSourceTest.java @@ -4,12 +4,18 @@ import com.launchdarkly.testhelpers.httptest.Handlers; import com.launchdarkly.testhelpers.httptest.HttpServer; +import org.fest.reflect.reference.TypeRef; import org.junit.Rule; import org.junit.Test; +import java.io.Closeable; +import java.util.concurrent.atomic.AtomicReference; + import static com.launchdarkly.eventsource.TestUtils.interruptOnAnotherThread; +import static com.launchdarkly.eventsource.TestUtils.interruptOnAnotherThreadAfterDelay; +import static org.fest.reflect.core.Reflection.field; import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.*; /** * Tests of basic EventSource behavior using real HTTP requests. @@ -26,14 +32,14 @@ public void eventSourceReadsEvents() throws Exception { Handlers.SSE.event("b", "data2"), Handlers.hang() ); - + try (HttpServer server = HttpServer.start(response)) { try (EventSource es = new EventSource.Builder(server.getUri()).build()) { es.start(); - + assertThat(es.readAnyEvent(), equalTo( new MessageEvent("a", "data1", null, es.getOrigin()))); - + assertThat(es.readAnyEvent(), equalTo( new MessageEvent("b", "data2", null, es.getOrigin()))); } @@ -59,13 +65,13 @@ public void eventSourceReconnectsAfterSocketClosed() throws Exception { .retryDelay(1, null) .build()) { es.start(); - + assertThat(es.readAnyEvent(), equalTo(new MessageEvent("message", "first", null, es.getOrigin()))); - + assertThat(es.readAnyEvent(), equalTo(new FaultEvent(new StreamClosedByServerException()))); - + assertThat(es.readAnyEvent(), equalTo(new StartedEvent())); - + assertThat(es.readAnyEvent(), equalTo(new MessageEvent("message", "second", null, es.getOrigin()))); } } @@ -91,17 +97,80 @@ public void eventSourceReconnectsAfterExternallyInterrupted() throws Exception { .retryDelay(1, null) .build()) { es.start(); - + assertThat(es.readAnyEvent(), equalTo(new MessageEvent("message", "first", null, es.getOrigin()))); - + interruptOnAnotherThread(es); assertThat(es.readAnyEvent(), equalTo(new FaultEvent(new StreamClosedByCallerException()))); - + + assertThat(es.readAnyEvent(), equalTo(new StartedEvent())); + + assertThat(es.readAnyEvent(), equalTo(new MessageEvent("message", "second", null, es.getOrigin()))); + } + } + } + + @Test + public void eventSourceReconnectsAfterExternallyInterruptedWithNewEventParser() throws Exception { + Handler response1 = Handlers.all( + Handlers.SSE.start(), + Handlers.SSE.event("message", "first"), + Handlers.SSE.leaveOpen() + ); + Handler response2 = Handlers.all( + Handlers.SSE.start(), + Handlers.SSE.event("message", "second"), + Handlers.SSE.leaveOpen() + ); + Handler allResponses = Handlers.sequential(response1, response2); + + try (HttpServer server = HttpServer.start(allResponses)) { + AtomicReference holder = new AtomicReference<>(); + try (EventSource es = new EventSource.Builder(server.getUri()) + .errorStrategy(ErrorStrategy.alwaysContinue()) + .retryDelay(1, null) + .build()) { + es.start(); + holder.set(es); + AtomicReference connectionCloser = field("connectionCloser").ofType(new TypeRef>() { + }).in(es).get(); + AtomicReference responseCloser = field("responseCloser").ofType(new TypeRef>() { + }).in(es).get(); + + assertThat(es.readAnyEvent(), equalTo(new MessageEvent("message", "first", null, es.getOrigin()))); + + interruptOnAnotherThread(es).join(); + + assertThat(connectionCloser.get(), nullValue()); + // Response should be closed with reading thread. + assertThat(responseCloser.get(), notNullValue()); + + assertThat(es.readAnyEvent(), equalTo(new FaultEvent(new StreamClosedByCallerException()))); + + // All closed. + assertThat(connectionCloser.get(), nullValue()); + + assertThat(responseCloser.get(), nullValue()); + + assertThat(es.readAnyEvent(), equalTo(new StartedEvent())); - + // All recreated + assertThat(connectionCloser.get(), notNullValue()); + + assertThat(responseCloser.get(), notNullValue()); + assertThat(es.readAnyEvent(), equalTo(new MessageEvent("message", "second", null, es.getOrigin()))); } + AtomicReference connectionCloser = field("connectionCloser").ofType(new TypeRef>() { + }).in(holder.get()).get(); + AtomicReference responseCloser = field("responseCloser").ofType(new TypeRef>() { + }).in(holder.get()).get(); + + // All closed by try-with-resource statement + assertThat(connectionCloser.get(), nullValue()); + + assertThat(responseCloser.get(), nullValue()); } } } diff --git a/src/test/java/com/launchdarkly/eventsource/TestUtils.java b/src/test/java/com/launchdarkly/eventsource/TestUtils.java index 3103c81..165ce45 100644 --- a/src/test/java/com/launchdarkly/eventsource/TestUtils.java +++ b/src/test/java/com/launchdarkly/eventsource/TestUtils.java @@ -13,21 +13,24 @@ public static String makeStringOfLength(int n) { return sb.toString(); } - public static void interruptOnAnotherThreadAfterDelay(EventSource es, long delayMillis) { - new Thread(new Runnable() { + public static Thread interruptOnAnotherThreadAfterDelay(EventSource es, long delayMillis) { + Thread t = new Thread(new Runnable() { public void run() { try { if (delayMillis > 0) { Thread.sleep(delayMillis); } - } catch (InterruptedException e) {} + } catch (InterruptedException e) { + } es.interrupt(); } - }).start(); + }); + t.start(); + return t; } - public static void interruptOnAnotherThread(EventSource es) { - interruptOnAnotherThreadAfterDelay(es, 0); + public static Thread interruptOnAnotherThread(EventSource es) { + return interruptOnAnotherThreadAfterDelay(es, 0); } public static void interruptThisThreadFromAnotherThreadAfterDelay(long delayMillis) { From 5b3a022422810c7c1cb28deda9ba44fb702283e4 Mon Sep 17 00:00:00 2001 From: LinZong Date: Sun, 7 Jan 2024 09:50:42 +0800 Subject: [PATCH 4/6] chore: Revert unnecessary indent changes --- .../launchdarkly/eventsource/EventSource.java | 120 +++++++++--------- .../eventsource/HttpConnectStrategy.java | 8 +- 2 files changed, 63 insertions(+), 65 deletions(-) diff --git a/src/main/java/com/launchdarkly/eventsource/EventSource.java b/src/main/java/com/launchdarkly/eventsource/EventSource.java index 6b94edb..5295d03 100644 --- a/src/main/java/com/launchdarkly/eventsource/EventSource.java +++ b/src/main/java/com/launchdarkly/eventsource/EventSource.java @@ -32,7 +32,7 @@ * The client uses a pull model where the caller starts the EventSource and then requests * data from it synchronously on a single thread. The initial connection attempt is made * when you call {@link #start()}, or when you first attempt to read an event. - * + * * To read events from the stream, you can either request them one at a time by calling * {@link #readMessage()} or {@link #readAnyEvent()}, or consume them in a loop by calling * {@link #messages()} or {@link #anyEvents()}. The "message" methods assume you are only @@ -70,11 +70,11 @@ public class EventSource implements Closeable { * The default value for {@link Builder#readBufferSize(int)}. */ public static final int DEFAULT_READ_BUFFER_SIZE = 1000; - + // Note that some fields have package-private visibility for tests. - + private final Object sleepNotifier = new Object(); - + // The following final fields are set from the configuration builder. private final ConnectStrategy.Client client; final int readBufferSize; @@ -83,7 +83,7 @@ public class EventSource implements Closeable { final long retryDelayResetThresholdMillis; final boolean streamEventData; final Set expectFields; - + // The following mutable fields are not volatile because they should only be // accessed from the thread that is reading from EventSource. private EventParser eventParser; @@ -105,7 +105,7 @@ public class EventSource implements Closeable { // and are read by the thread that is reading the stream. private volatile boolean deliberatelyClosedConnection; private volatile boolean calledStop; - + // These fields are written by the thread that is reading the stream, and can // be read by other threads to inspect the state of the stream. volatile long baseRetryDelayMillis; // set at config time but may be changed by a "retry:" value @@ -140,7 +140,7 @@ public class EventSource implements Closeable { public URI getOrigin() { return origin; } - + /** * Returns the logger that this EventSource is using. * @@ -167,7 +167,7 @@ public ReadyState getState() { * This can be set initially with {@link Builder#lastEventId(String)}, and is updated whenever an event * is received that has an ID. Whether event IDs are supported depends on the server; it may ignore this * value. - * + * * @return the last known event ID, or null * @see Builder#lastEventId(String) * @since 2.0.0 @@ -193,7 +193,7 @@ public String getLastEventId() { public long getBaseRetryDelayMillis() { return baseRetryDelayMillis; } - + /** * Returns the retry delay that will be used for the next reconnection, if the * stream has failed. @@ -213,7 +213,7 @@ public long getBaseRetryDelayMillis() { public long getNextRetryDelayMillis() { return nextReconnectDelayMillis; } - + /** * Attempts to start the stream if it is not already active. *

    @@ -245,7 +245,7 @@ public long getNextRetryDelayMillis() { public void start() throws StreamException { tryStart(false); } - + private FaultEvent tryStart(boolean canReturnFaultEvent) throws StreamException { if (eventParser != null) { return null; @@ -254,7 +254,7 @@ private FaultEvent tryStart(boolean canReturnFaultEvent) throws StreamException while (true) { StreamException exception = null; - + if (nextReconnectDelayMillis > 0) { long delayNow = disconnectedTime == 0 ? nextReconnectDelayMillis : (nextReconnectDelayMillis - (System.currentTimeMillis() - disconnectedTime)); @@ -280,22 +280,22 @@ private FaultEvent tryStart(boolean canReturnFaultEvent) throws StreamException } } } - + ConnectStrategy.Client.Result clientResult = null; - + if (exception == null) { readyState.set(ReadyState.CONNECTING); - + connectedTime = 0; deliberatelyClosedConnection = calledStop = false; - + try { clientResult = client.connect(lastEventId); } catch (StreamException e) { exception = e; } } - + if (exception != null) { disconnectedTime = System.currentTimeMillis(); computeReconnectDelay(); @@ -309,18 +309,16 @@ private FaultEvent tryStart(boolean canReturnFaultEvent) throws StreamException // transparently. continue; } - // The ErrorStrategy told us to THROW rather than CONTINUE. + // The ErrorStrategy told us to THROW rather than CONTINUE. throw exception; } - - - connectionCloser.set(clientResult.getConnectionCloser()); - responseCloser.set(clientResult.getResponseCloser()); - + + + connectionCloser.set(clientResult.getCloser()); origin = clientResult.getOrigin() == null ? client.getOrigin() : clientResult.getOrigin(); connectedTime = System.currentTimeMillis(); logger.debug("Connected to SSE stream"); - + eventParser = new EventParser( clientResult.getInputStream(), clientResult.getOrigin(), @@ -329,14 +327,14 @@ private FaultEvent tryStart(boolean canReturnFaultEvent) throws StreamException expectFields, logger ); - + readyState.set(ReadyState.OPEN); currentErrorStrategy = baseErrorStrategy; return null; } } - + /** * Attempts to receive a message from the stream. *

    @@ -350,7 +348,7 @@ private FaultEvent tryStart(boolean canReturnFaultEvent) throws StreamException *

    * This method must be called from the same thread that first started using the * stream (that is, the thread that called {@link #start()} or read the first event). - * + * * @return an SSE message * @throws StreamException if there is an error and retry is not enabled * @see #readAnyEvent() @@ -365,7 +363,7 @@ public MessageEvent readMessage() throws StreamException { } } } - + /** * Attempts to receive an event of any kind from the stream. *

    @@ -420,7 +418,7 @@ public Iterator iterator() { } }; } - + /** * Returns an iterable sequence of events. *

    @@ -531,7 +529,7 @@ public void close() { if (currentState == SHUTDOWN) { return; } - + closeCurrentStream(true, true); try { client.close(); @@ -555,12 +553,12 @@ public boolean awaitClosed(long timeout, TimeUnit timeUnit) throws InterruptedEx // Iterator implementation used by messages() and anyEvents() private class IteratorImpl implements Iterator { private final Class filterClass; - + IteratorImpl(Class filterClass) { this.filterClass = filterClass; calledStop = false; } - + public boolean hasNext() { while (true) { if (nextEvent != null && filterClass.isAssignableFrom(nextEvent.getClass())) { @@ -577,7 +575,7 @@ public boolean hasNext() { } } } - + public T next() { while (nextEvent == null || !filterClass.isAssignableFrom(nextEvent.getClass()) && hasNext()) {} @SuppressWarnings("unchecked") @@ -589,7 +587,7 @@ public T next() { private StreamEvent requireEvent() throws StreamException { readingThread.set(Thread.currentThread()); - + try { while (true) { // Reading an event implies starting the stream if it isn't already started. @@ -632,7 +630,7 @@ private StreamEvent requireEvent() throws StreamException { throw e; } } - + private void resetRetryDelayStrategy() { logger.debug("Resetting retry delay strategy to initial state"); currentRetryDelayStrategy = baseRetryDelayStrategy; @@ -645,7 +643,7 @@ private ErrorStrategy.Action applyErrorStrategy(StreamException e) { } return errorStrategyResult.getAction(); } - + private void computeReconnectDelay() { if (retryDelayResetThresholdMillis > 0 && connectedTime != 0) { long connectionDurationMillis = System.currentTimeMillis() - connectedTime; @@ -663,12 +661,11 @@ private void computeReconnectDelay() { private boolean closeCurrentStream(boolean deliberatelyInterrupted, boolean shouldStopIterating) { Closeable oldConnectionCloser = this.connectionCloser.getAndSet(null); - Thread oldReadingThread = readingThread.getAndSet(null); if (oldConnectionCloser == null && oldReadingThread == null) { return false; } - + synchronized (sleepNotifier) { // this synchronization prevents a race condition in start() if (deliberatelyInterrupted) { this.deliberatelyClosedConnection = true; @@ -699,17 +696,18 @@ private boolean closeCurrentStream(boolean deliberatelyInterrupted, boolean shou logger.warn("Unexpected error when closing response: {}", LogValues.exceptionSummary(e)); } } + eventParser = null; readyState.compareAndSet(ReadyState.OPEN, ReadyState.CLOSED); readyState.compareAndSet(ReadyState.CONNECTING, ReadyState.CLOSED); // If the current thread is not the reading thread, these fields will be updated the // next time the reading thread tries to do a read. } - + sleepNotifier.notify(); // in case we're sleeping in a reconnect delay, wake us up } return true; } - + /** * Builder for configuring {@link EventSource}. */ @@ -742,7 +740,7 @@ public static final class Builder { *

    * Or, if you want to consume an input stream from some other source, you can * create your own subclass of {@link ConnectStrategy}. - * + * * @param connectStrategy the object that will manage the input stream; * must not be null * @since 4.0.0 @@ -796,11 +794,11 @@ public Builder(URL url) { *

    * This is the same as {@link #Builder(URI)}, but using the OkHttp type * {@link HttpUrl}. - * + * * @param url the stream URL * @throws IllegalArgumentException if the argument is null, or if the endpoint * is not HTTP or HTTPS - * + * * @since 1.9.0 * @see #Builder(ConnectStrategy) * @see #Builder(URI) @@ -809,7 +807,7 @@ public Builder(URL url) { public Builder(HttpUrl url) { this(ConnectStrategy.http(url)); } - + /** * Specifies a strategy for determining whether to handle errors transparently * or throw them as exceptions. @@ -819,7 +817,7 @@ public Builder(HttpUrl url) { * may instead use alternate {@link ErrorStrategy} implementations, such as * {@link ErrorStrategy#alwaysContinue()}, or a custom implementation, to allow * EventSource to continue after an error. - * + * * @param errorStrategy the object that will control error handling; if null, * defaults to {@link ErrorStrategy#alwaysThrow()} * @return the builder @@ -829,7 +827,7 @@ public Builder errorStrategy(ErrorStrategy errorStrategy) { this.errorStrategy = errorStrategy; return this; } - + /** * Sets the ID value of the last event received. *

    @@ -837,7 +835,7 @@ public Builder errorStrategy(ErrorStrategy errorStrategy) { * skip past previously sent events if it supports this behavior. Once the connection is established, * this value will be updated whenever an event is received that has an ID. Whether event IDs are * supported depends on the server; it may ignore this value. - * + * * @param lastEventId the last event identifier * @return the builder * @since 2.0.0 @@ -858,7 +856,7 @@ public Builder lastEventId(String lastEventId) { * If you set the base delay to zero, the backoff logic will not apply-- multiplying by * zero gives zero every time. Therefore, use a zero delay with caution since it could * cause a reconnect storm during a service interruption. - * + * * @param retryDelay the base delay, in whatever time unit is specified by {@code timeUnit} * @param timeUnit the time unit, or {@code TimeUnit.MILLISECONDS} if null * @return the builder @@ -881,7 +879,7 @@ public Builder retryDelay(long retryDelay, TimeUnit timeUnit) { * is to apply an exponential backoff and jitter. You may instead use a modified * version of {@link DefaultRetryDelayStrategy} to customize the backoff and * jitter, or a custom implementation with any other logic. - * + * * @param retryDelayStrategy the object that will control retry delays; if null, * defaults to {@link RetryDelayStrategy#defaultStrategy()} * @return the builder @@ -903,7 +901,7 @@ public Builder retryDelayStrategy(RetryDelayStrategy retryDelayStrategy) { * connection lasted longer than the threshold, in which case the delay will start over at the * initial minimum value. This prevents long delays from occurring on connections that are only * rarely restarted. - * + * * @param retryDelayResetThreshold the minimum time that a connection must stay open to avoid resetting * the delay, in whatever time unit is specified by {@code timeUnit} * @param timeUnit the time unit, or {@code TimeUnit.MILLISECONDS} if null @@ -926,7 +924,7 @@ public Builder retryDelayResetThreshold(long retryDelayResetThreshold, TimeUnit * Therefore, if an application expects to see many lines in the stream that are longer * than {@link EventSource#DEFAULT_READ_BUFFER_SIZE}, it can specify a larger buffer size * to avoid unnecessary heap allocations. - * + * * @param readBufferSize the buffer size * @return the builder * @throws IllegalArgumentException if the size is less than or equal to zero @@ -940,7 +938,7 @@ public Builder readBufferSize(int readBufferSize) { this.readBufferSize = readBufferSize; return this; } - + /** * Specifies a custom logger to receive EventSource logging. *

    @@ -951,14 +949,14 @@ public Builder readBufferSize(int readBufferSize) { * the basic console logging implementation, and to tag the output with the name "logname": *

    
          *   // import com.launchdarkly.logging.*;
    -     *
    +     *   
          *   builder.logger(
    -     *      LDLogger.withAdapter(Logs.basic(), "logname")
    +     *      LDLogger.withAdapter(Logs.basic(), "logname") 
          *   );
          * 
    *

    * If you do not provide a logger, the default is there is no log output. - * + * * @param logger an {@link LDLogger} implementation, or null for no logging * @return the builder * @since 2.7.0 @@ -992,16 +990,16 @@ public Builder logger(LDLogger logger) { * first and {@code event:} second, {@link MessageEvent#getEventName()} will not contain the value of * {@code event:} but will be {@link MessageEvent#DEFAULT_EVENT_NAME} instead; similarly, an {@code id:} field will * be ignored if it appears after {@code data:} in this mode. Therefore, you should only use this mode if the - * server's behavior is predictable in this regard. + * server's behavior is predictable in this regard. *

  • The SSE protocol specifies that an event should be processed only if it is terminated by a blank line, but * in this mode the handler will receive the event as soon as a {@code data:} field appears-- so, if the stream * happens to cut off abnormally without a trailing blank line, technically you will be receiving an incomplete * event that should have been ignored. You will know this has happened ifbecause reading from the Reader throws * a {@link StreamClosedWithIncompleteMessageException}.
  • - * - * + * + * * @param streamEventData true if events should be dispatched immediately with asynchronous data rather than - * read fully before dispatch + * read fully before dispatch * @return the builder * @see #expectFields(String...) * @since 2.6.0 @@ -1034,7 +1032,7 @@ public Builder streamEventData(boolean streamEventData) { *

    * Such behavior is not automatic because in some applications, there might never be an {@code event:} field, * and EventSource has no way to anticipate this. - * + * * @param fieldNames a list of SSE field names (case-sensitive; any names other than "event" and "id" are ignored) * @return the builder * @see #streamEventData(boolean) @@ -1053,7 +1051,7 @@ public Builder expectFields(String... fieldNames) { } return this; } - + /** * Constructs an {@link EventSource} using the builder's current properties. * @return the new EventSource instance diff --git a/src/main/java/com/launchdarkly/eventsource/HttpConnectStrategy.java b/src/main/java/com/launchdarkly/eventsource/HttpConnectStrategy.java index d2a62e9..520525f 100644 --- a/src/main/java/com/launchdarkly/eventsource/HttpConnectStrategy.java +++ b/src/main/java/com/launchdarkly/eventsource/HttpConnectStrategy.java @@ -466,10 +466,10 @@ public Result connect(String lastEventId) throws StreamException { responseBody.byteStream(), uri, new RequestCloser(call), - // prevent from connection leak warning from okhttp. - // see: okhttp3.internal.connection.RealConnectionPool.pruneAndGetAllocationCount - new ResponseCloser(response) - ); + // prevent from connection leak warning from okhttp. + // see: okhttp3.internal.connection.RealConnectionPool.pruneAndGetAllocationCount + new ResponseCloser(response) + ); } public void close() { From 448640b39b39a71c089c8430a27dccbfcb427917 Mon Sep 17 00:00:00 2001 From: LinZong Date: Sun, 7 Jan 2024 09:55:14 +0800 Subject: [PATCH 5/6] chore: Revert unnecessary indent changes --- build.gradle.kts | 1 + .../com/launchdarkly/eventsource/EventSource.java | 14 +++++++------- 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/build.gradle.kts b/build.gradle.kts index f4d6567..6dca13b 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -73,6 +73,7 @@ dependencies { testImplementation("com.google.guava:guava:32.0.1-jre") testImplementation("junit:junit:4.12") testImplementation("org.hamcrest:hamcrest-all:1.3") + // Inspect private fields in tests. testImplementation("org.easytesting:fest-reflect:1.4.1") } diff --git a/src/main/java/com/launchdarkly/eventsource/EventSource.java b/src/main/java/com/launchdarkly/eventsource/EventSource.java index 5295d03..2690de9 100644 --- a/src/main/java/com/launchdarkly/eventsource/EventSource.java +++ b/src/main/java/com/launchdarkly/eventsource/EventSource.java @@ -309,12 +309,13 @@ private FaultEvent tryStart(boolean canReturnFaultEvent) throws StreamException // transparently. continue; } - // The ErrorStrategy told us to THROW rather than CONTINUE. + // The ErrorStrategy told us to THROW rather than CONTINUE. throw exception; } - connectionCloser.set(clientResult.getCloser()); + connectionCloser.set(clientResult.getConnectionCloser()); + responseCloser.set(clientResult.getResponseCloser()); origin = clientResult.getOrigin() == null ? client.getOrigin() : clientResult.getOrigin(); connectedTime = System.currentTimeMillis(); logger.debug("Connected to SSE stream"); @@ -696,7 +697,6 @@ private boolean closeCurrentStream(boolean deliberatelyInterrupted, boolean shou logger.warn("Unexpected error when closing response: {}", LogValues.exceptionSummary(e)); } } - eventParser = null; readyState.compareAndSet(ReadyState.OPEN, ReadyState.CLOSED); readyState.compareAndSet(ReadyState.CONNECTING, ReadyState.CLOSED); // If the current thread is not the reading thread, these fields will be updated the @@ -951,7 +951,7 @@ public Builder readBufferSize(int readBufferSize) { * // import com.launchdarkly.logging.*; * * builder.logger( - * LDLogger.withAdapter(Logs.basic(), "logname") + * LDLogger.withAdapter(Logs.basic(), "logname") * ); * *

    @@ -990,16 +990,16 @@ public Builder logger(LDLogger logger) { * first and {@code event:} second, {@link MessageEvent#getEventName()} will not contain the value of * {@code event:} but will be {@link MessageEvent#DEFAULT_EVENT_NAME} instead; similarly, an {@code id:} field will * be ignored if it appears after {@code data:} in this mode. Therefore, you should only use this mode if the - * server's behavior is predictable in this regard. + * server's behavior is predictable in this regard. *

  • The SSE protocol specifies that an event should be processed only if it is terminated by a blank line, but * in this mode the handler will receive the event as soon as a {@code data:} field appears-- so, if the stream * happens to cut off abnormally without a trailing blank line, technically you will be receiving an incomplete * event that should have been ignored. You will know this has happened ifbecause reading from the Reader throws * a {@link StreamClosedWithIncompleteMessageException}.
  • - * + * * * @param streamEventData true if events should be dispatched immediately with asynchronous data rather than - * read fully before dispatch + * read fully before dispatch * @return the builder * @see #expectFields(String...) * @since 2.6.0 From c94682ca593eb24cd5ebdc231e78b02ea48afc41 Mon Sep 17 00:00:00 2001 From: LinZong Date: Sun, 7 Jan 2024 10:11:28 +0800 Subject: [PATCH 6/6] test: Add test for exception occurred while closing response --- .../EventSourceConnectStrategyUsageTest.java | 35 +++++++++++++++++++ 1 file changed, 35 insertions(+) diff --git a/src/test/java/com/launchdarkly/eventsource/EventSourceConnectStrategyUsageTest.java b/src/test/java/com/launchdarkly/eventsource/EventSourceConnectStrategyUsageTest.java index dacba4d..cf1261b 100644 --- a/src/test/java/com/launchdarkly/eventsource/EventSourceConnectStrategyUsageTest.java +++ b/src/test/java/com/launchdarkly/eventsource/EventSourceConnectStrategyUsageTest.java @@ -345,6 +345,41 @@ public void close() throws IOException { assertThat(calledClose.tryAcquire(1, TimeUnit.MILLISECONDS), is(true)); } + @Test + public void exceptionFromCloserDoesNotPreventClosingResponse() throws Exception { + Semaphore calledClose = new Semaphore(0); + ConnectStrategy cs = new ConnectStrategy() { + @Override + public Client createClient(LDLogger logger) { + return new ClientBaseImpl() { + @Override + public Result connect(String lastEventId) throws StreamException { + return new Result(makeEmptyInputStream(), null, new Closeable() { + @Override + public void close() throws IOException { + + } + }, new Closeable() { + @Override + public void close() throws IOException { + calledClose.release(); + throw new IOException("fake error"); + } + }); + } + }; + } + }; + + try (EventSource es = new EventSource.Builder(cs).build()) { + es.start(); + + assertThat(calledClose.tryAcquire(1, TimeUnit.MILLISECONDS), is(false)); + } + assertThat(calledClose.tryAcquire(1, TimeUnit.MILLISECONDS), is(true)); + } + + @Test public void exceptionFromCloserDoesNotPreventClosingStream() throws Exception { Semaphore calledClose = new Semaphore(0);