Skip to content

Commit

Permalink
Eliminate open-range GET requests (#16)
Browse files Browse the repository at this point in the history
This commit adds support for breaking up open-range GET requests and
fetches data chunk-by-chunk. A very naive implementation of a block
manager is added.

Furthermore, we changed the interface of Object Client
to async to allow for prefetching in the future. The Block Manager is
also written with prefetching in mind. For now, we keep a limited number
of IOBlocks in memory (which is of no use today) which should be very
similar to how prefetching might happen in the future (except rather
than keeping bytes around AFTER they've been read we will prefetch them
to their respective IOBlocks BEFORE they've been read).

There are improvements around testability: we abstract away some SDK
constructs like GetObjectRequest, HeadObjectRequest to limit the
parameter space. This might allow in the future to introduce
property-based tests or fuzz testing and also makes it easier to
prototype 'fake clients' or in-memory models easier.

On top of unit tests we add a new reference test on a full sequential
read and compare with our in-memory model. It seems to pass!

Next steps are to eliminate the byte-by-byte reading and refactor
read() calls into the more efficient read(b, off, len) style.
  • Loading branch information
CsengerG authored Apr 30, 2024
1 parent 0a277c4 commit a63908c
Show file tree
Hide file tree
Showing 19 changed files with 807 additions and 153 deletions.
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
package com.amazon.connector.s3;

import com.amazon.connector.s3.blockmanager.BlockManager;
import com.amazon.connector.s3.util.S3URI;
import com.google.common.base.Preconditions;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.util.Objects;
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
import lombok.NonNull;

/**
* High throughput seekable stream used to read data from Amazon S3.
Expand All @@ -16,49 +16,53 @@
* undefined.
*/
public class S3SeekableInputStream extends SeekableInputStream {
private final ObjectClient objectClient;
private final S3URI uri;

private final BlockManager blockManager;
private long position;
private InputStream stream;

/**
* Creates a new instance of {@link S3SeekableInputStream}.
* Creates a new instance of {@link S3SeekableInputStream}. This version of the constructor
* initialises the stream with sensible defaults.
*
* @param objectClient an instance of {@link ObjectClient}.
* @param uri location of the S3 object this stream is fetching data from
* @param s3URI the object's S3 URI
*/
public S3SeekableInputStream(ObjectClient objectClient, S3URI uri) throws IOException {
Preconditions.checkNotNull(objectClient, "objectClient must not be null");
Preconditions.checkNotNull(uri, "S3 URI must not be null");

this.objectClient = objectClient;
this.uri = uri;
public S3SeekableInputStream(@NonNull S3URI s3URI) {
this(new BlockManager(new S3SdkObjectClient(null), s3URI));
}

/**
* Given a Block Manager, creates a new instance of {@link S3SeekableInputStream}. This version of
* the constructor is useful for testing as it allows dependency injection.
*
* @param blockManager already initialised Block Manager
*/
public S3SeekableInputStream(@NonNull BlockManager blockManager) {
this.blockManager = blockManager;
this.position = 0;
requestBytes(position);
}

@Override
public int read() throws IOException {
int byteRead = stream.read();

if (byteRead < 0) {
if (this.position >= contentLength()) {
return -1;
}

int byteRead = this.blockManager.readByte(this.position);
this.position++;
return byteRead;
}

@Override
public void seek(long pos) throws IOException {
try {
requestBytes(pos);
this.position = pos;
} catch (Exception e) {
throw new IOException(String.format("Unable to seek to position %s", pos));
// TODO: https://app.asana.com/0/1206885953994785/1207207312934251/f
// S3A throws an EOFException here, S3FileIO does IllegalArgumentException
Preconditions.checkState(pos >= 0, "position must be non-negative");

if (pos >= contentLength()) {
throw new EOFException("zero-indexed seek position must be less than the object size");
}

this.position = pos;
}

@Override
Expand All @@ -69,20 +73,10 @@ public long getPos() {
@Override
public void close() throws IOException {
super.close();
this.stream.close();
this.blockManager.close();
}

private void requestBytes(long pos) throws IOException {
if (Objects.nonNull(this.stream)) {
this.stream.close();
}

this.stream =
this.objectClient.getObject(
GetObjectRequest.builder()
.bucket(uri.getBucket())
.key(uri.getKey())
.range(String.format("bytes=%s-", pos))
.build());
private long contentLength() {
return this.blockManager.getMetadata().join().getContentLength();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package com.amazon.connector.s3.blockmanager;

import com.google.common.base.Preconditions;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.stream.Stream;

/**
* A circular buffer of fixed capacity. Closes its elements before removing them. Not thread-safe.
*/
public class AutoClosingCircularBuffer<T extends Closeable> implements Closeable {

private final ArrayList<T> buffer;
private final int capacity;
private int oldestIndex;

/**
* Creates an instance of AutoClosingCircularBuffer.
*
* @param maxCapacity The maximum capacity of the buffer.
*/
public AutoClosingCircularBuffer(int maxCapacity) {
Preconditions.checkState(0 < maxCapacity, "maxCapacity should be positive");

this.oldestIndex = 0;
this.capacity = maxCapacity;
this.buffer = new ArrayList(maxCapacity);
}

/**
* Adds an element to the buffer, potentially replacing another element if the maximum capacity
* has been reached. Calls 'close' on elements before eviciting them.
*
* @param element The new element to add to the buffer.
*/
public void add(T element) {
if (buffer.size() < capacity) {
buffer.add(element);
} else {
tryClose(buffer.get(oldestIndex));
buffer.set(oldestIndex, element);
oldestIndex = (oldestIndex + 1) % capacity;
}
}

private void tryClose(T t) {
try {
t.close();
} catch (IOException e) {
throw new RuntimeException("Unable to close element of circular buffer", e);
}
}

/** Returns a conventional Java stream of the underlying objects */
public Stream<T> stream() {
return buffer.stream();
}

/** Closes the buffer, freeing up all underlying resources. */
@Override
public void close() {
this.buffer.stream().forEach(this::tryClose);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package com.amazon.connector.s3.blockmanager;

import com.amazon.connector.s3.ObjectClient;
import com.amazon.connector.s3.object.ObjectContent;
import com.amazon.connector.s3.object.ObjectMetadata;
import com.amazon.connector.s3.request.GetRequest;
import com.amazon.connector.s3.request.HeadRequest;
import com.amazon.connector.s3.request.Range;
import com.amazon.connector.s3.util.S3URI;
import java.io.IOException;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import lombok.Getter;
import lombok.NonNull;

/**
* A block manager in charge of fetching bytes from an object store. Currently: - Block Manager
* fetches bytes in 8MB chunks - IO blocks are fixed in size (at most 8MB) and do not grow beyond
* their original size - Block Manager keeps the last 10 blocks alive in memory -- technically
* speaking this is caching, but we should be able to naturally extend this logic into prefetching.
* - If an 11th chunk is requested, then the oldest chunk is released along with all the resources
* it is holding.
*/
public class BlockManager implements AutoCloseable {
private static final int MAX_BLOCK_COUNT = 10;
private static final long EIGHT_MB_IN_BYTES = 8 * 1024 * 1024;
private static final long DEFAULT_BLOCK_SIZE = EIGHT_MB_IN_BYTES;

@Getter private final CompletableFuture<ObjectMetadata> metadata;
private final AutoClosingCircularBuffer<IOBlock> ioBlocks;

private final ObjectClient objectClient;
private final S3URI s3URI;

/**
* Creates an instance of block manager.
*
* @param objectClient the Object Client to use to fetch the data
* @param s3URI the location of the object
*/
public BlockManager(@NonNull ObjectClient objectClient, @NonNull S3URI s3URI) {
this.objectClient = objectClient;
this.s3URI = s3URI;
this.metadata =
objectClient.headObject(
HeadRequest.builder().bucket(s3URI.getBucket()).key(s3URI.getKey()).build());

this.ioBlocks = new AutoClosingCircularBuffer<>(MAX_BLOCK_COUNT);
}

/**
* Reads a byte from the underlying object
*
* @param pos The position to read
* @return an unsigned int representing the byte that was read
*/
public int readByte(long pos) throws IOException {
return getBlockForPosition(pos).getByte(pos);
}

private IOBlock getBlockForPosition(long pos) {
return lookupBlockForPosition(pos).orElseGet(() -> createBlockStartingAt(pos));
}

private Optional<IOBlock> lookupBlockForPosition(long pos) {
return ioBlocks.stream().filter(ioBlock -> ioBlock.contains(pos)).findFirst();
}

private IOBlock createBlockStartingAt(long start) {
long end = Math.min(start + DEFAULT_BLOCK_SIZE, getLastObjectByte());

CompletableFuture<ObjectContent> objectContent =
this.objectClient.getObject(
GetRequest.builder()
.bucket(s3URI.getBucket())
.key(s3URI.getKey())
.range(Range.builder().start(start).end(end).build())
.build());

IOBlock ioBlock = new IOBlock(start, end, objectContent);
ioBlocks.add(ioBlock);
return ioBlock;
}

private long getLastObjectByte() {
return this.metadata.join().getContentLength() - 1;
}

@Override
public void close() throws IOException {
this.ioBlocks.close();
this.objectClient.close();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package com.amazon.connector.s3.blockmanager;

import com.amazon.connector.s3.object.ObjectContent;
import com.google.common.base.Preconditions;
import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import lombok.NonNull;

class IOBlock implements Closeable {
private final long start;
private final long end;
private long positionInCurrentBuffer;
private CompletableFuture<ObjectContent> content;
private final byte[] blockContent;

public IOBlock(long start, long end, @NonNull CompletableFuture<ObjectContent> objectContent) {
Preconditions.checkState(start >= 0, "start must be non-negative");
Preconditions.checkState(end >= 0, "end must be non-negative");
Preconditions.checkState(start <= end, "start must not be bigger than end");

this.start = start;
this.end = end;
this.positionInCurrentBuffer = start;
this.content = objectContent;

this.blockContent = new byte[(int) size()];
}

public int getByte(long pos) throws IOException {
if (pos < positionInCurrentBuffer) {
return Byte.toUnsignedInt(this.blockContent[positionToOffset(pos)]);
}

return readByte(pos);
}

private int readByte(long pos) throws IOException {
Preconditions.checkState(
positionInCurrentBuffer <= pos,
String.format(
"byte at position %s was fetched already and should have been served via 'getByte'",
pos));
Preconditions.checkState(pos <= end, "pos must be less than end");

for (; positionInCurrentBuffer <= pos; ++positionInCurrentBuffer) {
int byteRead = this.content.join().getStream().read();

if (byteRead < 0) {
throw new IOException(
String.format(
"Premature end of file. Did not expect to read -1 at position %s",
positionInCurrentBuffer));
}

this.blockContent[positionToOffset(positionInCurrentBuffer)] = (byte) byteRead;
}

return Byte.toUnsignedInt(this.blockContent[positionToOffset(pos)]);
}

public boolean contains(long pos) {
return start <= pos && pos <= end;
}

public long size() {
return end - start + 1;
}

/** A mapping between object byte locations and byte buffer byte locations */
private int positionToOffset(long pos) {
return (int) (pos - start);
}

@Override
public void close() throws IOException {
this.content.join().getStream().close();
}
}
Loading

0 comments on commit a63908c

Please sign in to comment.