Skip to content

Commit

Permalink
Switch to Supplier<InputStream>
Browse files Browse the repository at this point in the history
  • Loading branch information
fmeum committed Jan 10, 2025
1 parent 28935f1 commit 14916d4
Show file tree
Hide file tree
Showing 17 changed files with 63 additions and 75 deletions.
20 changes: 6 additions & 14 deletions src/main/java/com/google/devtools/build/lib/remote/Chunker.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.io.ByteStreams;
import com.google.devtools.build.lib.remote.zstd.ZstdCompressingInputStream;
import com.google.devtools.build.lib.vfs.Path;
import com.google.errorprone.annotations.CanIgnoreReturnValue;
import com.google.protobuf.ByteString;
import java.io.ByteArrayInputStream;
Expand Down Expand Up @@ -294,27 +293,20 @@ public static class Builder {
protected ChunkDataSupplier inputStream;

@CanIgnoreReturnValue
public Builder setInput(byte[] data) {
checkState(inputStream == null);
size = data.length;
setInputSupplier(() -> new ByteArrayInputStream(data));
return this;
}

@CanIgnoreReturnValue
public Builder setInput(long size, InputStream in) {
public Builder setInput(long size, ChunkDataSupplier in) {
checkState(inputStream == null);
checkNotNull(in);
this.size = size;
inputStream = () -> in;
inputStream = in;
return this;
}

@CanIgnoreReturnValue
public Builder setInput(long size, Path file) {
@VisibleForTesting
public Builder setInput(byte[] data) {
checkState(inputStream == null);
this.size = size;
inputStream = file::getInputStream;
size = data.length;
setInputSupplier(() -> new ByteArrayInputStream(data));
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@
import com.google.devtools.build.lib.remote.util.TracingMetadataUtils;
import com.google.devtools.build.lib.remote.util.Utils;
import com.google.devtools.build.lib.remote.zstd.ZstdDecompressingOutputStream;
import com.google.devtools.build.lib.vfs.Path;
import com.google.protobuf.ByteString;
import io.grpc.Channel;
import io.grpc.Status;
Expand All @@ -68,6 +67,7 @@
import io.grpc.stub.ClientCallStreamObserver;
import io.grpc.stub.ClientResponseObserver;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -483,26 +483,14 @@ private void releaseOut() {
return future;
}

@Override
public ListenableFuture<Void> uploadFile(
RemoteActionExecutionContext context, Digest digest, Path path) {
return uploadChunker(
context,
digest,
Chunker.builder()
.setInput(digest.getSizeBytes(), path)
.setCompressed(shouldCompress(digest))
.build());
}

@Override
public ListenableFuture<Void> uploadBlob(
RemoteActionExecutionContext context, Digest digest, ByteString data) {
RemoteActionExecutionContext context, Digest digest, Supplier<InputStream> data) {
return uploadChunker(
context,
digest,
Chunker.builder()
.setInput(data.toByteArray())
.setInput(digest.getSizeBytes(), data::get)
.setCompressed(shouldCompress(digest))
.build());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,8 @@ private ListenableFuture<Void> uploadBlob(
case ContentSource.VirtualActionInputSource(VirtualActionInput virtualActionInput) ->
// TODO: Avoid materializing the entire file in memory. This requires changing the
// upload to be driven by an OutputStream rather than consuming an InputStream.
remoteCacheClient.uploadBlob(context, digest, virtualActionInput.getBytes());
remoteCacheClient.uploadBlob(
context, digest, () -> virtualActionInput.getBytes().newInput());
case ContentSource.PathSource(Path path) -> {
try {
if (remotePathChecker.isRemote(context, path)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,13 @@
import build.bazel.remote.execution.v2.ServerCapabilities;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.devtools.build.lib.actions.cache.VirtualActionInput;
import com.google.devtools.build.lib.vfs.Path;
import com.google.protobuf.ByteString;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Set;
import java.util.function.Supplier;

/**
* An interface for a remote caching protocol.
Expand Down Expand Up @@ -112,6 +113,18 @@ ListenableFuture<Void> uploadActionResult(
ListenableFuture<Void> downloadBlob(
RemoteActionExecutionContext context, Digest digest, OutputStream out);

/**
* Uploads a blob to the CAS.
*
* @param context the context for the action.
* @param digest The digest of the blob.
* @param data A supplier for the data to upload. Will be called at most once and as close as as
* possible in time to the actual upload.
* @return A future representing pending completion of the upload.
*/
ListenableFuture<Void> uploadBlob(
RemoteActionExecutionContext context, Digest digest, Supplier<InputStream> data);

/**
* Uploads a {@code file} to the CAS.
*
Expand All @@ -120,7 +133,10 @@ ListenableFuture<Void> downloadBlob(
* @param file The file to upload.
* @return A future representing pending completion of the upload.
*/
ListenableFuture<Void> uploadFile(RemoteActionExecutionContext context, Digest digest, Path file);
default ListenableFuture<Void> uploadFile(
RemoteActionExecutionContext context, Digest digest, Path file) {
return uploadBlob(context, digest, () -> new LazyFileInputStream(file));
}

/**
* Uploads a BLOB to the CAS.
Expand All @@ -130,8 +146,10 @@ ListenableFuture<Void> downloadBlob(
* @param data The BLOB to upload.
* @return A future representing pending completion of the upload.
*/
ListenableFuture<Void> uploadBlob(
RemoteActionExecutionContext context, Digest digest, ByteString data);
default ListenableFuture<Void> uploadBlob(
RemoteActionExecutionContext context, Digest digest, ByteString data) {
return uploadBlob(context, digest, data::newInput);
}

/** Close resources associated with the remote cache. */
void close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import com.google.devtools.build.lib.remote.util.DigestOutputStream;
import com.google.devtools.build.lib.remote.util.DigestUtil;
import com.google.devtools.build.lib.remote.util.Utils;
import com.google.devtools.build.lib.vfs.Path;
import com.google.protobuf.ByteString;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
Expand Down Expand Up @@ -93,6 +92,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.regex.Pattern;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
Expand Down Expand Up @@ -718,25 +718,12 @@ private void uploadAfterCredentialRefresh(UploadCommand upload, SettableFuture<V
});
}

@Override
public ListenableFuture<Void> uploadFile(
RemoteActionExecutionContext context, Digest digest, Path file) {
return retrier.executeAsync(
() ->
uploadAsync(
digest.getHash(),
digest.getSizeBytes(),
new LazyFileInputStream(file),
/* casUpload= */ true));
}

@Override
public ListenableFuture<Void> uploadBlob(
RemoteActionExecutionContext context, Digest digest, ByteString data) {
RemoteActionExecutionContext context, Digest digest, Supplier<InputStream> in) {
return retrier.executeAsync(
() ->
uploadAsync(
digest.getHash(), digest.getSizeBytes(), data.newInput(), /* casUpload= */ true));
uploadAsync(digest.getHash(), digest.getSizeBytes(), in.get(), /* casUpload= */ true));
}

@Override
Expand Down
3 changes: 3 additions & 0 deletions src/test/java/com/google/devtools/build/lib/remote/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ java_library(
"//src/main/java/com/google/devtools/build/lib/remote/merkletree",
"//src/main/java/com/google/devtools/build/lib/remote/options",
"//src/main/java/com/google/devtools/build/lib/remote/util",
"//src/main/java/com/google/devtools/build/lib/remote/util:digest_utils",
"//src/main/java/com/google/devtools/build/lib/runtime/commands",
"//src/main/java/com/google/devtools/build/lib/skyframe:tree_artifact_value",
"//src/main/java/com/google/devtools/build/lib/testing/vfs:spied_filesystem",
Expand Down Expand Up @@ -242,6 +243,7 @@ java_test(
"//src/main/java/com/google/devtools/build/lib/dynamic",
"//src/main/java/com/google/devtools/build/lib/remote",
"//src/main/java/com/google/devtools/build/lib/remote/util",
"//src/main/java/com/google/devtools/build/lib/remote/util:digest_utils",
"//src/main/java/com/google/devtools/build/lib/standalone",
"//src/main/java/com/google/devtools/build/lib/util:os",
"//src/main/java/com/google/devtools/build/lib/vfs",
Expand All @@ -268,6 +270,7 @@ java_test(
"//src/main/java/com/google/devtools/build/lib/remote",
"//src/main/java/com/google/devtools/build/lib/remote:store",
"//src/main/java/com/google/devtools/build/lib/remote/util",
"//src/main/java/com/google/devtools/build/lib/remote/util:digest_utils",
"//src/main/java/com/google/devtools/build/lib/standalone",
"//src/main/java/com/google/devtools/build/lib/vfs",
"//src/main/java/com/google/devtools/build/lib/vfs:pathfragment",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -693,7 +693,8 @@ public void earlyWriteResponseShouldCompleteUpload() throws Exception {
// provide only enough data to write a single chunk
InputStream in = new ByteArrayInputStream(blob, 0, CHUNK_SIZE);

Chunker chunker = Chunker.builder().setInput(blob.length, in).setChunkSize(CHUNK_SIZE).build();
Chunker chunker =
Chunker.builder().setInput(blob.length, () -> in).setChunkSize(CHUNK_SIZE).build();
Digest digest = DIGEST_UTIL.compute(blob);

serviceRegistry.addService(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ public void close() throws IOException {
super.close();
}
};
Chunker chunker = Chunker.builder().setInput(0, inp).build();
Chunker chunker = Chunker.builder().setInput(0, () -> inp).build();

assertThat(chunker.hasNext()).isTrue();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
import com.google.devtools.common.options.Options;
import com.google.protobuf.ByteString;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.util.Deque;
Expand All @@ -88,6 +89,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
Expand Down Expand Up @@ -397,7 +399,7 @@ public void ensureInputsPresent_interruptedDuringUploadBlobs_cancelInProgressUpl
return future;
})
.when(cacheProtocol)
.uploadBlob(any(), any(), any());
.uploadBlob(any(), any(), (Supplier<InputStream>) any());
doAnswer(
invocationOnMock -> {
SettableFuture<Void> future = SettableFuture.create();
Expand Down Expand Up @@ -472,7 +474,7 @@ public void ensureInputsPresent_interruptedDuringUploadBlobs_cancelInProgressUpl
return future;
})
.when(cacheProtocol)
.uploadBlob(any(), any(), any());
.uploadBlob(any(), any(), (Supplier<InputStream>) any());
doAnswer(
invocationOnMock -> {
SettableFuture<Void> future = SettableFuture.create();
Expand Down Expand Up @@ -553,7 +555,7 @@ public void ensureInputsPresent_interruptedDuringUploadBlobs_cancelInProgressUpl
return future;
})
.when(cacheProtocol)
.uploadBlob(any(), any(), any());
.uploadBlob(any(), any(), (Supplier<InputStream>) any());
doAnswer(
invocationOnMock -> {
Path file = invocationOnMock.getArgument(2, Path.class);
Expand Down Expand Up @@ -652,7 +654,7 @@ public void ensureInputsPresent_uploadFailed_propagateErrors() throws Exception
RemoteCacheClient cacheProtocol = spy(new InMemoryCacheClient());
doAnswer(invocationOnMock -> Futures.immediateFailedFuture(new IOException("upload failed")))
.when(cacheProtocol)
.uploadBlob(any(), any(), any());
.uploadBlob(any(), any(), (Supplier<InputStream>) any());
doAnswer(invocationOnMock -> Futures.immediateFailedFuture(new IOException("upload failed")))
.when(cacheProtocol)
.uploadFile(any(), any(), any());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@
import com.google.testing.junit.testparameterinjector.TestParameter;
import com.google.testing.junit.testparameterinjector.TestParameterInjector;
import java.io.IOException;
import java.io.InputStream;
import java.util.Collection;
import java.util.Map;
import java.util.Random;
Expand All @@ -140,6 +141,7 @@
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import javax.annotation.Nullable;
import org.junit.Before;
import org.junit.Rule;
Expand Down Expand Up @@ -2162,7 +2164,7 @@ public void uploadInputsIfNotPresent_interrupted_requestCancelled() throws Excep
return future;
})
.when(cache.remoteCacheClient)
.uploadBlob(any(), any(), any());
.uploadBlob(any(), any(), (Supplier<InputStream>) any());
ActionInput input = ActionInputHelper.fromPath("inputs/foo");
fakeFileCache.createScratchInput(input, "input-foo");
RemoteExecutionService service = newRemoteExecutionService();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ java_test(
"//src/main/java/com/google/devtools/build/lib/remote/common:cache_not_found_exception",
"//src/main/java/com/google/devtools/build/lib/remote/disk",
"//src/main/java/com/google/devtools/build/lib/remote/util",
"//src/main/java/com/google/devtools/build/lib/remote/util:digest_utils",
"//src/main/java/com/google/devtools/build/lib/vfs",
"//src/main/java/com/google/devtools/build/lib/vfs/bazel",
"//src/main/java/com/google/devtools/build/lib/vfs/inmemoryfs",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ java_library(
"//src/main/java/com/google/devtools/build/lib/remote/downloader",
"//src/main/java/com/google/devtools/build/lib/remote/options",
"//src/main/java/com/google/devtools/build/lib/remote/util",
"//src/main/java/com/google/devtools/build/lib/remote/util:digest_utils",
"//src/main/java/com/google/devtools/build/lib/vfs",
"//src/main/java/com/google/devtools/common/options",
"//src/test/java/com/google/devtools/build/lib/remote/util",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ java_test(
"//src/main/java/com/google/devtools/build/lib/remote/common",
"//src/main/java/com/google/devtools/build/lib/remote/http",
"//src/main/java/com/google/devtools/build/lib/remote/util",
"//src/main/java/com/google/devtools/build/lib/remote/util:digest_utils",
"//src/main/java/com/google/devtools/build/lib/vfs",
"//src/main/java/com/google/devtools/common/options",
"//src/test/java/com/google/devtools/build/lib:test_runner",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ java_test(
deps = [
"//src/main/java/com/google/devtools/build/lib/remote/logging",
"//src/main/java/com/google/devtools/build/lib/remote/util",
"//src/main/java/com/google/devtools/build/lib/remote/util:digest_utils",
"//src/main/java/com/google/devtools/build/lib/util/io",
"//src/main/protobuf:remote_execution_log_java_proto",
"//src/test/java/com/google/devtools/build/lib:test_runner",
Expand Down
Loading

0 comments on commit 14916d4

Please sign in to comment.