Skip to content

Commit

Permalink
Close correctly
Browse files Browse the repository at this point in the history
  • Loading branch information
fmeum committed Jan 11, 2025
1 parent 33b8cfa commit 3adf3ed
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.google.errorprone.annotations.CanIgnoreReturnValue;
import com.google.protobuf.ByteString;
import java.io.ByteArrayInputStream;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.io.PushbackInputStream;
Expand Down Expand Up @@ -141,7 +142,6 @@ public void reset() throws IOException {
close();
offset = 0;
initialized = false;
dataSupplier.close();
}

/**
Expand Down Expand Up @@ -184,6 +184,10 @@ private void close() throws IOException {
chunkCache = null;
}

public void closeQuietly() {
dataSupplier.close();
}

/** Attempts reading at most a full chunk and stores it in the chunkCache buffer */
private int read() throws IOException {
int count = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -501,6 +501,7 @@ ListenableFuture<Void> uploadChunker(
() -> {
try {
chunker.reset();
chunker.closeQuietly();
} catch (IOException e) {
logger.atWarning().withCause(e).log(
"failed to reset chunker uploading %s/%d", digest.getHash(), digest.getSizeBytes());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -639,7 +639,7 @@ private ListenableFuture<Void> uploadAsync(
public void close() {
// Ensure that the InputStream can't be closed somewhere in the Netty
// pipeline, so that we can support retries. The InputStream is closed in
// the finally block below.
// the listener block below.
}
};
UploadCommand upload = new UploadCommand(uri, casUpload, key, wrappedIn, length);
Expand Down Expand Up @@ -720,8 +720,12 @@ private void uploadAfterCredentialRefresh(UploadCommand upload, SettableFuture<V
public ListenableFuture<Void> uploadBlob(
RemoteActionExecutionContext context, Digest digest, CloseableBlobSupplier in) {
return retrier.executeAsync(
() ->
uploadAsync(digest.getHash(), digest.getSizeBytes(), in.get(), /* casUpload= */ true));
() -> {
var result =
uploadAsync(digest.getHash(), digest.getSizeBytes(), in.get(), /* casUpload= */ true);
result.addListener(in::close, MoreExecutors.directExecutor());
return result;
});
}

@Override
Expand Down

0 comments on commit 3adf3ed

Please sign in to comment.