diff --git a/src/main/java/com/google/devtools/build/lib/remote/Chunker.java b/src/main/java/com/google/devtools/build/lib/remote/Chunker.java index edc23b8bce1342..5bbbac882a7cb7 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/Chunker.java +++ b/src/main/java/com/google/devtools/build/lib/remote/Chunker.java @@ -23,7 +23,6 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.io.ByteStreams; import com.google.devtools.build.lib.remote.zstd.ZstdCompressingInputStream; -import com.google.devtools.build.lib.vfs.Path; import com.google.errorprone.annotations.CanIgnoreReturnValue; import com.google.protobuf.ByteString; import java.io.ByteArrayInputStream; @@ -294,27 +293,20 @@ public static class Builder { protected ChunkDataSupplier inputStream; @CanIgnoreReturnValue - public Builder setInput(byte[] data) { - checkState(inputStream == null); - size = data.length; - setInputSupplier(() -> new ByteArrayInputStream(data)); - return this; - } - - @CanIgnoreReturnValue - public Builder setInput(long size, InputStream in) { + public Builder setInput(long size, ChunkDataSupplier in) { checkState(inputStream == null); checkNotNull(in); this.size = size; - inputStream = () -> in; + inputStream = in; return this; } @CanIgnoreReturnValue - public Builder setInput(long size, Path file) { + @VisibleForTesting + public Builder setInput(byte[] data) { checkState(inputStream == null); - this.size = size; - inputStream = file::getInputStream; + size = data.length; + setInputSupplier(() -> new ByteArrayInputStream(data)); return this; } diff --git a/src/main/java/com/google/devtools/build/lib/remote/GrpcCacheClient.java b/src/main/java/com/google/devtools/build/lib/remote/GrpcCacheClient.java index b8f8c111ed453e..362fa23e7c60e0 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/GrpcCacheClient.java +++ b/src/main/java/com/google/devtools/build/lib/remote/GrpcCacheClient.java @@ -59,7 +59,6 @@ import com.google.devtools.build.lib.remote.util.TracingMetadataUtils; import com.google.devtools.build.lib.remote.util.Utils; import com.google.devtools.build.lib.remote.zstd.ZstdDecompressingOutputStream; -import com.google.devtools.build.lib.vfs.Path; import com.google.protobuf.ByteString; import io.grpc.Channel; import io.grpc.Status; @@ -68,6 +67,7 @@ import io.grpc.stub.ClientCallStreamObserver; import io.grpc.stub.ClientResponseObserver; import java.io.IOException; +import java.io.InputStream; import java.io.OutputStream; import java.util.ArrayList; import java.util.List; @@ -483,26 +483,14 @@ private void releaseOut() { return future; } - @Override - public ListenableFuture uploadFile( - RemoteActionExecutionContext context, Digest digest, Path path) { - return uploadChunker( - context, - digest, - Chunker.builder() - .setInput(digest.getSizeBytes(), path) - .setCompressed(shouldCompress(digest)) - .build()); - } - @Override public ListenableFuture uploadBlob( - RemoteActionExecutionContext context, Digest digest, ByteString data) { + RemoteActionExecutionContext context, Digest digest, Supplier data) { return uploadChunker( context, digest, Chunker.builder() - .setInput(data.toByteArray()) + .setInput(digest.getSizeBytes(), data::get) .setCompressed(shouldCompress(digest)) .build()); } diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteExecutionCache.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteExecutionCache.java index 9dfb151afb1aa3..e246febbedf3b2 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/RemoteExecutionCache.java +++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteExecutionCache.java @@ -190,7 +190,8 @@ private ListenableFuture uploadBlob( case ContentSource.VirtualActionInputSource(VirtualActionInput virtualActionInput) -> // TODO: Avoid materializing the entire file in memory. This requires changing the // upload to be driven by an OutputStream rather than consuming an InputStream. - remoteCacheClient.uploadBlob(context, digest, virtualActionInput.getBytes()); + remoteCacheClient.uploadBlob( + context, digest, () -> virtualActionInput.getBytes().newInput()); case ContentSource.PathSource(Path path) -> { try { if (remotePathChecker.isRemote(context, path)) { diff --git a/src/main/java/com/google/devtools/build/lib/remote/common/RemoteCacheClient.java b/src/main/java/com/google/devtools/build/lib/remote/common/RemoteCacheClient.java index 0e23bd1267d5b0..5b5d4ce0893a7c 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/common/RemoteCacheClient.java +++ b/src/main/java/com/google/devtools/build/lib/remote/common/RemoteCacheClient.java @@ -20,12 +20,13 @@ import build.bazel.remote.execution.v2.ServerCapabilities; import com.google.common.base.Preconditions; import com.google.common.util.concurrent.ListenableFuture; -import com.google.devtools.build.lib.actions.cache.VirtualActionInput; import com.google.devtools.build.lib.vfs.Path; import com.google.protobuf.ByteString; import java.io.IOException; +import java.io.InputStream; import java.io.OutputStream; import java.util.Set; +import java.util.function.Supplier; /** * An interface for a remote caching protocol. @@ -112,6 +113,18 @@ ListenableFuture uploadActionResult( ListenableFuture downloadBlob( RemoteActionExecutionContext context, Digest digest, OutputStream out); + /** + * Uploads a blob to the CAS. + * + * @param context the context for the action. + * @param digest The digest of the blob. + * @param data A supplier for the data to upload. Will be called at most once and as close as as + * possible in time to the actual upload. + * @return A future representing pending completion of the upload. + */ + ListenableFuture uploadBlob( + RemoteActionExecutionContext context, Digest digest, Supplier data); + /** * Uploads a {@code file} to the CAS. * @@ -120,7 +133,10 @@ ListenableFuture downloadBlob( * @param file The file to upload. * @return A future representing pending completion of the upload. */ - ListenableFuture uploadFile(RemoteActionExecutionContext context, Digest digest, Path file); + default ListenableFuture uploadFile( + RemoteActionExecutionContext context, Digest digest, Path file) { + return uploadBlob(context, digest, () -> new LazyFileInputStream(file)); + } /** * Uploads a BLOB to the CAS. @@ -130,8 +146,10 @@ ListenableFuture downloadBlob( * @param data The BLOB to upload. * @return A future representing pending completion of the upload. */ - ListenableFuture uploadBlob( - RemoteActionExecutionContext context, Digest digest, ByteString data); + default ListenableFuture uploadBlob( + RemoteActionExecutionContext context, Digest digest, ByteString data) { + return uploadBlob(context, digest, data::newInput); + } /** Close resources associated with the remote cache. */ void close(); diff --git a/src/main/java/com/google/devtools/build/lib/remote/http/HttpCacheClient.java b/src/main/java/com/google/devtools/build/lib/remote/http/HttpCacheClient.java index e6a661ddbe357a..cb7b872aede4d3 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/http/HttpCacheClient.java +++ b/src/main/java/com/google/devtools/build/lib/remote/http/HttpCacheClient.java @@ -37,7 +37,6 @@ import com.google.devtools.build.lib.remote.util.DigestOutputStream; import com.google.devtools.build.lib.remote.util.DigestUtil; import com.google.devtools.build.lib.remote.util.Utils; -import com.google.devtools.build.lib.vfs.Path; import com.google.protobuf.ByteString; import io.netty.bootstrap.Bootstrap; import io.netty.channel.Channel; @@ -93,6 +92,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Function; +import java.util.function.Supplier; import java.util.regex.Pattern; import javax.annotation.Nullable; import javax.annotation.concurrent.GuardedBy; @@ -718,25 +718,12 @@ private void uploadAfterCredentialRefresh(UploadCommand upload, SettableFuture uploadFile( - RemoteActionExecutionContext context, Digest digest, Path file) { - return retrier.executeAsync( - () -> - uploadAsync( - digest.getHash(), - digest.getSizeBytes(), - new LazyFileInputStream(file), - /* casUpload= */ true)); - } - @Override public ListenableFuture uploadBlob( - RemoteActionExecutionContext context, Digest digest, ByteString data) { + RemoteActionExecutionContext context, Digest digest, Supplier in) { return retrier.executeAsync( () -> - uploadAsync( - digest.getHash(), digest.getSizeBytes(), data.newInput(), /* casUpload= */ true)); + uploadAsync(digest.getHash(), digest.getSizeBytes(), in.get(), /* casUpload= */ true)); } @Override diff --git a/src/test/java/com/google/devtools/build/lib/remote/BUILD b/src/test/java/com/google/devtools/build/lib/remote/BUILD index 9e80fb69e0a5a2..cb3ae8cf37ef37 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/BUILD +++ b/src/test/java/com/google/devtools/build/lib/remote/BUILD @@ -121,6 +121,7 @@ java_library( "//src/main/java/com/google/devtools/build/lib/remote/merkletree", "//src/main/java/com/google/devtools/build/lib/remote/options", "//src/main/java/com/google/devtools/build/lib/remote/util", + "//src/main/java/com/google/devtools/build/lib/remote/util:digest_utils", "//src/main/java/com/google/devtools/build/lib/runtime/commands", "//src/main/java/com/google/devtools/build/lib/skyframe:tree_artifact_value", "//src/main/java/com/google/devtools/build/lib/testing/vfs:spied_filesystem", @@ -242,6 +243,7 @@ java_test( "//src/main/java/com/google/devtools/build/lib/dynamic", "//src/main/java/com/google/devtools/build/lib/remote", "//src/main/java/com/google/devtools/build/lib/remote/util", + "//src/main/java/com/google/devtools/build/lib/remote/util:digest_utils", "//src/main/java/com/google/devtools/build/lib/standalone", "//src/main/java/com/google/devtools/build/lib/util:os", "//src/main/java/com/google/devtools/build/lib/vfs", @@ -268,6 +270,7 @@ java_test( "//src/main/java/com/google/devtools/build/lib/remote", "//src/main/java/com/google/devtools/build/lib/remote:store", "//src/main/java/com/google/devtools/build/lib/remote/util", + "//src/main/java/com/google/devtools/build/lib/remote/util:digest_utils", "//src/main/java/com/google/devtools/build/lib/standalone", "//src/main/java/com/google/devtools/build/lib/vfs", "//src/main/java/com/google/devtools/build/lib/vfs:pathfragment", diff --git a/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java b/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java index 9b0d421c1f7442..9b039d3d90a489 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java +++ b/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java @@ -693,7 +693,8 @@ public void earlyWriteResponseShouldCompleteUpload() throws Exception { // provide only enough data to write a single chunk InputStream in = new ByteArrayInputStream(blob, 0, CHUNK_SIZE); - Chunker chunker = Chunker.builder().setInput(blob.length, in).setChunkSize(CHUNK_SIZE).build(); + Chunker chunker = + Chunker.builder().setInput(blob.length, () -> in).setChunkSize(CHUNK_SIZE).build(); Digest digest = DIGEST_UTIL.compute(blob); serviceRegistry.addService( diff --git a/src/test/java/com/google/devtools/build/lib/remote/ChunkerTest.java b/src/test/java/com/google/devtools/build/lib/remote/ChunkerTest.java index 7c693480df1482..1964904acc0c9d 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/ChunkerTest.java +++ b/src/test/java/com/google/devtools/build/lib/remote/ChunkerTest.java @@ -94,7 +94,7 @@ public void close() throws IOException { super.close(); } }; - Chunker chunker = Chunker.builder().setInput(0, inp).build(); + Chunker chunker = Chunker.builder().setInput(0, () -> inp).build(); assertThat(chunker.hasNext()).isTrue(); diff --git a/src/test/java/com/google/devtools/build/lib/remote/CombinedCacheTest.java b/src/test/java/com/google/devtools/build/lib/remote/CombinedCacheTest.java index 251d04a8ec5176..24bf0944c28bf7 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/CombinedCacheTest.java +++ b/src/test/java/com/google/devtools/build/lib/remote/CombinedCacheTest.java @@ -73,6 +73,7 @@ import com.google.devtools.common.options.Options; import com.google.protobuf.ByteString; import java.io.IOException; +import java.io.InputStream; import java.io.OutputStream; import java.nio.charset.StandardCharsets; import java.util.Deque; @@ -88,6 +89,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Supplier; import org.junit.After; import org.junit.Before; import org.junit.Rule; @@ -397,7 +399,7 @@ public void ensureInputsPresent_interruptedDuringUploadBlobs_cancelInProgressUpl return future; }) .when(cacheProtocol) - .uploadBlob(any(), any(), any()); + .uploadBlob(any(), any(), (Supplier) any()); doAnswer( invocationOnMock -> { SettableFuture future = SettableFuture.create(); @@ -472,7 +474,7 @@ public void ensureInputsPresent_interruptedDuringUploadBlobs_cancelInProgressUpl return future; }) .when(cacheProtocol) - .uploadBlob(any(), any(), any()); + .uploadBlob(any(), any(), (Supplier) any()); doAnswer( invocationOnMock -> { SettableFuture future = SettableFuture.create(); @@ -553,7 +555,7 @@ public void ensureInputsPresent_interruptedDuringUploadBlobs_cancelInProgressUpl return future; }) .when(cacheProtocol) - .uploadBlob(any(), any(), any()); + .uploadBlob(any(), any(), (Supplier) any()); doAnswer( invocationOnMock -> { Path file = invocationOnMock.getArgument(2, Path.class); @@ -652,7 +654,7 @@ public void ensureInputsPresent_uploadFailed_propagateErrors() throws Exception RemoteCacheClient cacheProtocol = spy(new InMemoryCacheClient()); doAnswer(invocationOnMock -> Futures.immediateFailedFuture(new IOException("upload failed"))) .when(cacheProtocol) - .uploadBlob(any(), any(), any()); + .uploadBlob(any(), any(), (Supplier) any()); doAnswer(invocationOnMock -> Futures.immediateFailedFuture(new IOException("upload failed"))) .when(cacheProtocol) .uploadFile(any(), any(), any()); diff --git a/src/test/java/com/google/devtools/build/lib/remote/RemoteExecutionServiceTest.java b/src/test/java/com/google/devtools/build/lib/remote/RemoteExecutionServiceTest.java index 100e069b6ab4d1..135092e33a7d51 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/RemoteExecutionServiceTest.java +++ b/src/test/java/com/google/devtools/build/lib/remote/RemoteExecutionServiceTest.java @@ -132,6 +132,7 @@ import com.google.testing.junit.testparameterinjector.TestParameter; import com.google.testing.junit.testparameterinjector.TestParameterInjector; import java.io.IOException; +import java.io.InputStream; import java.util.Collection; import java.util.Map; import java.util.Random; @@ -140,6 +141,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Supplier; import javax.annotation.Nullable; import org.junit.Before; import org.junit.Rule; @@ -2162,7 +2164,7 @@ public void uploadInputsIfNotPresent_interrupted_requestCancelled() throws Excep return future; }) .when(cache.remoteCacheClient) - .uploadBlob(any(), any(), any()); + .uploadBlob(any(), any(), (Supplier) any()); ActionInput input = ActionInputHelper.fromPath("inputs/foo"); fakeFileCache.createScratchInput(input, "input-foo"); RemoteExecutionService service = newRemoteExecutionService(); diff --git a/src/test/java/com/google/devtools/build/lib/remote/disk/BUILD b/src/test/java/com/google/devtools/build/lib/remote/disk/BUILD index 7f09d8d86a2202..fa5cf2b4008345 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/disk/BUILD +++ b/src/test/java/com/google/devtools/build/lib/remote/disk/BUILD @@ -23,6 +23,7 @@ java_test( "//src/main/java/com/google/devtools/build/lib/remote/common:cache_not_found_exception", "//src/main/java/com/google/devtools/build/lib/remote/disk", "//src/main/java/com/google/devtools/build/lib/remote/util", + "//src/main/java/com/google/devtools/build/lib/remote/util:digest_utils", "//src/main/java/com/google/devtools/build/lib/vfs", "//src/main/java/com/google/devtools/build/lib/vfs/bazel", "//src/main/java/com/google/devtools/build/lib/vfs/inmemoryfs", diff --git a/src/test/java/com/google/devtools/build/lib/remote/downloader/BUILD b/src/test/java/com/google/devtools/build/lib/remote/downloader/BUILD index 2d450d41cf3423..3dee6d7f751131 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/downloader/BUILD +++ b/src/test/java/com/google/devtools/build/lib/remote/downloader/BUILD @@ -31,6 +31,7 @@ java_library( "//src/main/java/com/google/devtools/build/lib/remote/downloader", "//src/main/java/com/google/devtools/build/lib/remote/options", "//src/main/java/com/google/devtools/build/lib/remote/util", + "//src/main/java/com/google/devtools/build/lib/remote/util:digest_utils", "//src/main/java/com/google/devtools/build/lib/vfs", "//src/main/java/com/google/devtools/common/options", "//src/test/java/com/google/devtools/build/lib/remote/util", diff --git a/src/test/java/com/google/devtools/build/lib/remote/http/BUILD b/src/test/java/com/google/devtools/build/lib/remote/http/BUILD index 16914a57fa8091..91b6c87d52e66e 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/http/BUILD +++ b/src/test/java/com/google/devtools/build/lib/remote/http/BUILD @@ -28,6 +28,7 @@ java_test( "//src/main/java/com/google/devtools/build/lib/remote/common", "//src/main/java/com/google/devtools/build/lib/remote/http", "//src/main/java/com/google/devtools/build/lib/remote/util", + "//src/main/java/com/google/devtools/build/lib/remote/util:digest_utils", "//src/main/java/com/google/devtools/build/lib/vfs", "//src/main/java/com/google/devtools/common/options", "//src/test/java/com/google/devtools/build/lib:test_runner", diff --git a/src/test/java/com/google/devtools/build/lib/remote/logging/BUILD b/src/test/java/com/google/devtools/build/lib/remote/logging/BUILD index 3720b4bb0a5777..86596fd00085d2 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/logging/BUILD +++ b/src/test/java/com/google/devtools/build/lib/remote/logging/BUILD @@ -20,6 +20,7 @@ java_test( deps = [ "//src/main/java/com/google/devtools/build/lib/remote/logging", "//src/main/java/com/google/devtools/build/lib/remote/util", + "//src/main/java/com/google/devtools/build/lib/remote/util:digest_utils", "//src/main/java/com/google/devtools/build/lib/util/io", "//src/main/protobuf:remote_execution_log_java_proto", "//src/test/java/com/google/devtools/build/lib:test_runner", diff --git a/src/test/java/com/google/devtools/build/lib/remote/util/InMemoryCacheClient.java b/src/test/java/com/google/devtools/build/lib/remote/util/InMemoryCacheClient.java index f9de62f38d6bb8..182e3eb1780d87 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/util/InMemoryCacheClient.java +++ b/src/test/java/com/google/devtools/build/lib/remote/util/InMemoryCacheClient.java @@ -20,7 +20,6 @@ import build.bazel.remote.execution.v2.ServerCapabilities; import build.bazel.remote.execution.v2.SymlinkAbsolutePathStrategy; import com.google.common.collect.ImmutableSet; -import com.google.common.io.ByteStreams; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; @@ -28,7 +27,6 @@ import com.google.devtools.build.lib.remote.common.CacheNotFoundException; import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContext; import com.google.devtools.build.lib.remote.common.RemoteCacheClient; -import com.google.devtools.build.lib.vfs.Path; import com.google.protobuf.ByteString; import java.io.IOException; import java.io.InputStream; @@ -40,6 +38,7 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Supplier; import java.util.stream.Collectors; /** A {@link RemoteCacheClient} that stores its contents in memory. */ @@ -142,23 +141,11 @@ public ListenableFuture uploadActionResult( ac.put(actionKey, actionResult); return Futures.immediateFuture(null); } - - @Override - public ListenableFuture uploadFile( - RemoteActionExecutionContext context, Digest digest, Path file) { - try (InputStream in = file.getInputStream()) { - cas.put(digest, ByteStreams.toByteArray(in)); - } catch (IOException e) { - return Futures.immediateFailedFuture(e); - } - return Futures.immediateFuture(null); - } - @Override public ListenableFuture uploadBlob( - RemoteActionExecutionContext context, Digest digest, ByteString data) { - try (InputStream in = data.newInput()) { - cas.put(digest, data.toByteArray()); + RemoteActionExecutionContext context, Digest digest, Supplier data) { + try { + cas.put(digest, data.get().readAllBytes()); } catch (IOException e) { return Futures.immediateFailedFuture(e); } diff --git a/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/BUILD b/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/BUILD index e40a00034cd743..606ff0b7b89c38 100644 --- a/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/BUILD +++ b/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/BUILD @@ -40,6 +40,7 @@ java_library( "//src/main/java/com/google/devtools/build/lib/remote/disk", "//src/main/java/com/google/devtools/build/lib/remote/options", "//src/main/java/com/google/devtools/build/lib/remote/util", + "//src/main/java/com/google/devtools/build/lib/remote/util:digest_utils", "//src/main/java/com/google/devtools/build/lib/sandbox:linux_sandbox_command_line_builder", "//src/main/java/com/google/devtools/build/lib/shell", "//src/main/java/com/google/devtools/build/lib/util:os", diff --git a/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/ByteStreamServer.java b/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/ByteStreamServer.java index 0fecbb33ec7091..15080fa9280d3f 100644 --- a/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/ByteStreamServer.java +++ b/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/ByteStreamServer.java @@ -34,6 +34,7 @@ import io.grpc.Status; import io.grpc.protobuf.StatusProto; import io.grpc.stub.StreamObserver; +import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.OutputStream; import java.util.UUID; @@ -83,8 +84,9 @@ public void read(ReadRequest request, StreamObserver responseObser try { // This still relies on the blob size to be small enough to fit in memory. // TODO(olaola): refactor to fix this if the need arises. + byte[] bytes = getFromFuture(cache.downloadBlob(context, digest)); Chunker c = - Chunker.builder().setInput(getFromFuture(cache.downloadBlob(context, digest))).build(); + Chunker.builder().setInput(bytes.length, () -> new ByteArrayInputStream(bytes)).build(); while (c.hasNext()) { responseObserver.onNext( ReadResponse.newBuilder().setData(c.next().getData()).build());