Skip to content

Commit

Permalink
Implement readTail() (#20)
Browse files Browse the repository at this point in the history
This change works towards the S3FileIO - S3SeekableStream integration. This
should be the last change required to support S3FileIO.

readTail(buf, off, n) is a blocking method that reads all `n` bytes
requested before returning and does not alter the position in the seekable
stream. We had no equivalent for this so far, because if we wanted to
implement this with the existing S3SeekableStream APIs, then we'd need to
know the object size upfront -- an assumption we don't want to make
(probably shouldn't).

I see this method become useful later to implement tail pre-read: however,
a more efficient implementation would be similar to what S3FileIO is doing
today (pushing down the `bytes=-N` range into the S3 GET request). This
implementation is quick and dirty in that it relies on the content length
from the initial HeadObject we are making. We can spare this HeadObject in
the future once our integrations are running (and we have data that this is
worth doing).
  • Loading branch information
CsengerG authored May 8, 2024
1 parent 1351ee1 commit 3fc8e7d
Show file tree
Hide file tree
Showing 10 changed files with 157 additions and 10 deletions.
2 changes: 2 additions & 0 deletions input-stream/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ tasks.test {
javaLauncher = javaToolchains.launcherFor {
languageVersion = JavaLanguageVersion.of(17)
}

environment("AWS_REGION", "eu-west-1")
}

jmh {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public int read(byte[] buffer, int offset, int len) {
public void seek(long pos) throws IOException {
// TODO: https://app.asana.com/0/1206885953994785/1207207312934251/f
// S3A throws an EOFException here, S3FileIO does IllegalArgumentException
Preconditions.checkState(pos >= 0, "position must be non-negative");
Preconditions.checkArgument(pos >= 0, "position must be non-negative");

if (pos >= contentLength()) {
throw new EOFException("zero-indexed seek position must be less than the object size");
Expand All @@ -85,6 +85,11 @@ public long getPos() {
return this.position;
}

@Override
public int readTail(byte[] buf, int off, int n) {
return blockManager.readTail(buf, off, n);
}

@Override
public void close() throws IOException {
super.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,15 @@ public abstract class SeekableInputStream extends InputStream {
* @return the position in the stream
*/
public abstract long getPos();

/**
* Reads the last n bytes from the stream into a byte buffer. Blocks until end of stream is
* reached. Leaves the position of the stream unaltered.
*
* @param buf buffer to read data into
* @param off start position in buffer at which data is written
* @param n the number of bytes to read; the n-th byte should be the last byte of the stream.
* @return the total number of bytes read into the buffer
*/
public abstract int readTail(byte[] buf, int off, int n);
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public AutoClosingCircularBuffer(int maxCapacity) {

/**
* Adds an element to the buffer, potentially replacing another element if the maximum capacity
* has been reached. Calls 'close' on elements before eviciting them.
* has been reached. Calls 'close' on elements before evicting them.
*
* @param element The new element to add to the buffer.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.amazon.connector.s3.request.HeadRequest;
import com.amazon.connector.s3.request.Range;
import com.amazon.connector.s3.util.S3URI;
import com.google.common.base.Preconditions;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Optional;
Expand Down Expand Up @@ -104,6 +105,16 @@ public int readIntoBuffer(byte[] buffer, int offset, int len, long pos) {
return numBytesRead;
}

/** Reads the last n bytes from the object. */
public int readTail(byte[] buf, int off, int n) {
Preconditions.checkArgument(0 <= n, "must request a non-negative number of bytes from tail");
Preconditions.checkArgument(
n <= contentLength(), "cannot request more bytes from tail than total number of bytes");

long start = contentLength() - n;
return readIntoBuffer(buf, off, n, start);
}

private IOBlock getBlockForPosition(long pos) {
return lookupBlockForPosition(pos)
.orElseGet(
Expand Down Expand Up @@ -136,8 +147,12 @@ private IOBlock createBlockStartingAt(long start) throws IOException {
return ioBlock;
}

private long contentLength() {
return this.metadata.join().getContentLength();
}

private long getLastObjectByte() {
return this.metadata.join().getContentLength() - 1;
return contentLength() - 1;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,16 @@
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;
import lombok.Getter;
import lombok.NonNull;

class IOBlock implements Closeable {
private final long start;
private final long end;
private long positionInCurrentBuffer;
private CompletableFuture<ObjectContent> content;
private final ByteBuffer blockContent;

@Getter private final ByteBuffer blockContent;

private final int bufferSize;
private static final int ONE_MB = 1024 * 1024;
private static final int READ_BUFFER_SIZE = ONE_MB;
Expand All @@ -28,7 +30,6 @@ public IOBlock(long start, long end, @NonNull CompletableFuture<ObjectContent> o

this.start = start;
this.end = end;
this.positionInCurrentBuffer = start;
this.content = objectContent;
this.bufferSize = (int) size();
this.blockContent = ByteBuffer.allocate(this.bufferSize);
Expand All @@ -41,10 +42,6 @@ public int getByte(long pos) {
return Byte.toUnsignedInt(blockContent.get());
}

public ByteBuffer getBlockContent() {
return blockContent;
}

public void setPositionInBuffer(long pos) {
blockContent.position(positionToOffset(pos));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import java.io.EOFException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import org.junit.jupiter.api.Test;
import software.amazon.awssdk.utils.IoUtils;
Expand Down Expand Up @@ -227,4 +228,58 @@ void testReadWithBufferOutOfBounds() throws IOException {
assertEquals(2, stream.read(new byte[20], 0, TEST_DATA.length() + 20));
assertEquals(20, stream.getPos());
}

@Test
void testReadTailWithInvalidArgument() {
// Given: seekable stream
S3SeekableInputStream stream = new S3SeekableInputStream(fakeBlockManager);

// When & Then: reading tail with invalid arguments, exception is thrown
// -1 is invalid length
assertThrows(IllegalArgumentException.class, () -> stream.readTail(new byte[3], 0, -1));
// 100K is bigger than test data size
assertThrows(IllegalArgumentException.class, () -> stream.readTail(new byte[103], 0, 100));
// Requesting more data than byte buffer size
assertThrows(IllegalArgumentException.class, () -> stream.readTail(new byte[10], 0, 100));
}

@Test
void testReadTailHappyCase() throws IOException {
// Given: seekable stream
S3SeekableInputStream stream = new S3SeekableInputStream(fakeBlockManager);

// When: tail of length 10 is requested
byte[] buf = new byte[11];
int numBytesRead = stream.readTail(buf, 0, buf.length);

// Then: 10 bytes are read, 10 is returned, 10 bytes in the buffer are the same as last 10 bytes
// of test data
assertEquals(11, numBytesRead);
assertEquals("12345678910", new String(buf, StandardCharsets.UTF_8));
}

@Test
void testReadTailDoesNotAlterPosition() {
// Given: seekable stream
S3SeekableInputStream stream = new S3SeekableInputStream(fakeBlockManager);

// When: 1) we are reading from the stream, 2) reading the tail of the stream, 3) reading more
// from the stream
byte[] one = new byte[5];
byte[] two = new byte[11];
byte[] three = new byte[5];

int numBytesRead1 = stream.read(one, 0, one.length);
int numBytesRead2 = stream.readTail(two, 0, two.length);
int numBytesRead3 = stream.read(three, 0, three.length);

// Then: read #2 did not alter the position and reads #1 and #3 return subsequent bytes
assertEquals(5, numBytesRead1);
assertEquals(11, numBytesRead2);
assertEquals(5, numBytesRead3);

assertEquals("test-", new String(one, StandardCharsets.UTF_8));
assertEquals("data1", new String(three, StandardCharsets.UTF_8));
assertEquals("12345678910", new String(two, StandardCharsets.UTF_8));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import com.amazon.connector.s3.util.S3URI;
import java.io.IOException;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Collection;
import java.util.Random;
Expand Down Expand Up @@ -165,6 +166,47 @@ public void testFullRead() throws IOException {
assertEquals(seekableFullRead, inMemoryFullRead);
}

@Test
public void testTailReads() throws IOException {
Random r = new Random();
for (int i = 0, nextPos = 0; i < 10; nextPos = nextRandomPosition(r), ++i) {
s3SeekableInputStream.seek(nextPos);
inMemorySeekableStream.seek(nextPos);

assertEquals(
s3SeekableInputStream.getPos(),
inMemorySeekableStream.getPos(),
String.format("positions do not match after seeking to %s", nextPos));

int n = nextRandomSize(r);
byte[] b1 = new byte[n];
byte[] b2 = new byte[n];

assertEquals(
s3SeekableInputStream.readTail(b1, 0, n),
inMemorySeekableStream.readTail(b2, 0, n),
"number of bytes read from tail should be the same");

assertEquals(
byteBufToString(b1),
byteBufToString(b2),
String.format("returned data does not match after requesting %s bytes from tail", n));

assertEquals(
s3SeekableInputStream.read(),
inMemorySeekableStream.read(),
String.format("read() calls followed by a readTail do not return the same data"));
}
}

private String byteBufToString(byte[] b) {
return new String(b, StandardCharsets.UTF_8);
}

private int nextRandomSize(Random r) {
return Math.abs(r.nextInt()) % OBJECT_SIZE;
}

private int nextRandomPosition(Random r) {
return Math.abs(r.nextInt()) % OBJECT_SIZE;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,22 @@ public long getPos() {
return this.position;
}

@Override
public int readTail(byte[] buf, int off, int n) {
// Save position of stream
long prevPosition = this.position;

long tailStart = contentLength - n;
data.position((int) tailStart);
data.get(buf, off, n);

// Reset position
this.position = prevPosition;
data.position((int) this.position);

return n;
}

@Override
public int read() {
if (this.position >= this.contentLength) {
Expand Down
4 changes: 4 additions & 0 deletions object-client/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,7 @@ dependencies {
testImplementation(libs.mockito.junit.jupiter)
testRuntimeOnly(libs.junit.jupiter.launcher)
}

tasks.test {
environment("AWS_REGION", "eu-west-1")
}

0 comments on commit 3fc8e7d

Please sign in to comment.