Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix connection leaks after closing non-exhaustive EventSource. #91

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ dependencies {
testImplementation("com.google.guava:guava:32.0.1-jre")
testImplementation("junit:junit:4.12")
testImplementation("org.hamcrest:hamcrest-all:1.3")
// Inspect private fields in tests.
testImplementation("org.easytesting:fest-reflect:1.4.1")
}

checkstyle {
Expand Down
49 changes: 39 additions & 10 deletions src/main/java/com/launchdarkly/eventsource/ConnectStrategy.java
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
package com.launchdarkly.eventsource;

import com.launchdarkly.logging.LDLogger;
import okhttp3.HttpUrl;

import java.io.Closeable;
import java.io.InputStream;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;

import okhttp3.HttpUrl;

/**
* An abstraction of how {@link EventSource} should obtain an input stream.
* <p>
Expand Down Expand Up @@ -64,21 +63,39 @@ public static abstract class Client implements Closeable {
public static class Result {
private final InputStream inputStream;
private final URI origin;
private final Closeable closer;

private final Closeable connectionCloser;
private final Closeable responseCloser;

/**
* Creates an instance.
*
* @param inputStream see {@link #getInputStream()}
* @param origin see {@link #getOrigin()}
* @param closer see {@link #getCloser()}
* @param connectionCloser see {@link #getConnectionCloser()}
*/
public Result(InputStream inputStream, URI origin, Closeable closer) {
public Result(InputStream inputStream, URI origin, Closeable connectionCloser) {
this.inputStream = inputStream;
this.origin = origin;
this.closer = closer;
this.connectionCloser = connectionCloser;
this.responseCloser = null;
}


/**
* Creates an instance.
*
* @param inputStream see {@link #getInputStream()}
* @param origin see {@link #getOrigin()}
* @param connectionCloser see {@link #getConnectionCloser()}
* @param responseCloser see {@link #getResponseCloser()}
*/
public Result(InputStream inputStream, URI origin, Closeable connectionCloser, Closeable responseCloser) {
this.inputStream = inputStream;
this.origin = origin;
this.connectionCloser = connectionCloser;
this.responseCloser = responseCloser;
}


/**
* The input stream that {@link EventSource} should read from.
*
Expand Down Expand Up @@ -115,8 +132,20 @@ public URI getOrigin() {
*
* @return a Closeable object or null
*/
public Closeable getCloser() {
return closer;
public Closeable getConnectionCloser() {
return connectionCloser;
}

/**
* An object that {@link EventSource} can use to close the streaming response.
* If this is not null, its {@link Closeable#close()} method will be
* called whenever the current streaming response is stopped either due to an
* error or because the caller explicitly closed the stream.
*
* @return a Closeable object or null
*/
public Closeable getResponseCloser() {
return responseCloser;
}
}

Expand Down
27 changes: 21 additions & 6 deletions src/main/java/com/launchdarkly/eventsource/EventSource.java
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ public class EventSource implements Closeable {
// be modified from other threads if they call stop() or interrupt(). We
// use AtomicReference because we need atomicity in updates.
private final AtomicReference<Closeable> connectionCloser = new AtomicReference<>();
private final AtomicReference<Closeable> responseCloser = new AtomicReference<>();
private final AtomicReference<Thread> readingThread = new AtomicReference<>();
private final AtomicReference<ReadyState> readyState;

Expand Down Expand Up @@ -308,12 +309,13 @@ private FaultEvent tryStart(boolean canReturnFaultEvent) throws StreamException
// transparently.
continue;
}
// The ErrorStrategy told us to THROW rather than CONTINUE.
// The ErrorStrategy told us to THROW rather than CONTINUE.
throw exception;
}


connectionCloser.set(clientResult.getCloser());
connectionCloser.set(clientResult.getConnectionCloser());
responseCloser.set(clientResult.getResponseCloser());
origin = clientResult.getOrigin() == null ? client.getOrigin() : clientResult.getOrigin();
connectedTime = System.currentTimeMillis();
logger.debug("Connected to SSE stream");
Expand Down Expand Up @@ -681,7 +683,20 @@ private boolean closeCurrentStream(boolean deliberatelyInterrupted, boolean shou
}
}
if (oldReadingThread == Thread.currentThread()) {
Closeable oldResponseCloser = this.responseCloser.getAndSet(null);
eventParser = null;
// Response can only be closed from reading thread. Otherwise, it will cause
// java.lang.IllegalStateException: Unbalanced enter/exit raised from okhttp
// since closing response will drain remaining chunks if exists, resulting in concurrent buffer source reading.
// which may conflict with reading thread.
if (oldResponseCloser != null) {
try {
oldResponseCloser.close();
logger.debug("Closed response");
} catch (IOException e) {
logger.warn("Unexpected error when closing response: {}", LogValues.exceptionSummary(e));
}
}
readyState.compareAndSet(ReadyState.OPEN, ReadyState.CLOSED);
readyState.compareAndSet(ReadyState.CONNECTING, ReadyState.CLOSED);
// If the current thread is not the reading thread, these fields will be updated the
Expand Down Expand Up @@ -936,7 +951,7 @@ public Builder readBufferSize(int readBufferSize) {
* // import com.launchdarkly.logging.*;
*
* builder.logger(
* LDLogger.withAdapter(Logs.basic(), "logname")
* LDLogger.withAdapter(Logs.basic(), "logname")
* );
* </code></pre>
* <p>
Expand Down Expand Up @@ -975,16 +990,16 @@ public Builder logger(LDLogger logger) {
* first and {@code event:} second, {@link MessageEvent#getEventName()} will <i>not</i> contain the value of
* {@code event:} but will be {@link MessageEvent#DEFAULT_EVENT_NAME} instead; similarly, an {@code id:} field will
* be ignored if it appears after {@code data:} in this mode. Therefore, you should only use this mode if the
* server's behavior is predictable in this regard.</li>
* server's behavior is predictable in this regard.</li>
* <li> The SSE protocol specifies that an event should be processed only if it is terminated by a blank line, but
* in this mode the handler will receive the event as soon as a {@code data:} field appears-- so, if the stream
* happens to cut off abnormally without a trailing blank line, technically you will be receiving an incomplete
* event that should have been ignored. You will know this has happened ifbecause reading from the Reader throws
* a {@link StreamClosedWithIncompleteMessageException}.</li>
* </ul>
* </ul>
*
* @param streamEventData true if events should be dispatched immediately with asynchronous data rather than
* read fully before dispatch
* read fully before dispatch
* @return the builder
* @see #expectFields(String...)
* @since 2.6.0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -465,8 +465,11 @@ public Result connect(String lastEventId) throws StreamException {
return new Result(
responseBody.byteStream(),
uri,
new RequestCloser(call)
);
new RequestCloser(call),
// prevent from connection leak warning from okhttp.
// see: okhttp3.internal.connection.RealConnectionPool.pruneAndGetAllocationCount
new ResponseCloser(response)
);
}

public void close() {
Expand Down Expand Up @@ -555,6 +558,19 @@ public void close() throws IOException {
}
}

private static class ResponseCloser implements Closeable {
private final Response response;

public ResponseCloser(Response response) {
this.response = response;
}

@Override
public void close() throws IOException {
this.response.close();
}
}

private static X509TrustManager defaultTrustManager() throws GeneralSecurityException {
TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance(
TrustManagerFactory.getDefaultAlgorithm());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,41 @@ public void close() throws IOException {
assertThat(calledClose.tryAcquire(1, TimeUnit.MILLISECONDS), is(true));
}

@Test
public void exceptionFromCloserDoesNotPreventClosingResponse() throws Exception {
Semaphore calledClose = new Semaphore(0);
ConnectStrategy cs = new ConnectStrategy() {
@Override
public Client createClient(LDLogger logger) {
return new ClientBaseImpl() {
@Override
public Result connect(String lastEventId) throws StreamException {
return new Result(makeEmptyInputStream(), null, new Closeable() {
@Override
public void close() throws IOException {

}
}, new Closeable() {
@Override
public void close() throws IOException {
calledClose.release();
throw new IOException("fake error");
}
});
}
};
}
};

try (EventSource es = new EventSource.Builder(cs).build()) {
es.start();

assertThat(calledClose.tryAcquire(1, TimeUnit.MILLISECONDS), is(false));
}
assertThat(calledClose.tryAcquire(1, TimeUnit.MILLISECONDS), is(true));
}


@Test
public void exceptionFromCloserDoesNotPreventClosingStream() throws Exception {
Semaphore calledClose = new Semaphore(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ public void clientCanForceStreamToClose() throws Exception {
int n = stream.read(b, 0, 5);
assertThat(n, equalTo(5));

result.getCloser().close();
result.getConnectionCloser().close();
// This causes us to call the OkHttp method Call.cancel(). The InputStream is
// expected to throw an IOException on the next read, but it would also be
// acceptable for it to return EOF (-1).
Expand Down Expand Up @@ -371,7 +371,7 @@ private RequestInfo doRequestFrom(
try (HttpServer server = HttpServer.start(Handlers.status(200))) {
try (ConnectStrategy.Client client = hcs.uri(server.getUri()).createClient(testLogger.getLogger())) {
ConnectStrategy.Client.Result result = client.connect(lastEventId);
result.getCloser().close();
result.getConnectionCloser().close();

return server.getRecorder().requireRequest();
}
Expand Down
Loading