-
Notifications
You must be signed in to change notification settings - Fork 9
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Eliminate open-range GET requests #16
Conversation
This commit adds support for breaking up open-range GET requests and fetches data chunk-by-chunk. A very naive implementation of a block manager is added. Furthermore, we changed the interface of Object Client to async to allow for prefetching in the future. The Block Manager is also written with prefetching in mind. For now, we keep a limited number of IOBlocks in memory (which is of no use today) which should be very similar to how prefetching might happen in the future (except rather than keeping bytes around AFTER they've been read we will prefetch them to their respective IOBlocks BEFORE they've been read). There are improvements around testability: we abstract away some SDK constructs like GetObjectRequest, HeadObjectRequest to limit the parameter space. This might allow in the future to introduce property-based tests or fuzz testing and also makes it easier to prototype 'fake clients' or in-memory models easier. On top of unit tests we add a new reference test on a full sequential read and compare with our in-memory model. It seems to pass! Next steps are to eliminate the byte-by-byte reading and refactor read() calls into the more efficient read(b, off, len) style.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@CsengerG Thank you for setting up the building blocks, this is really great to see. I have done a first pass review, not looked at tests at all.
I'll be spending some more time reviewing this code (and look at tests), but wanted to provide feedback I had so far. Nothing major, directionally things look good.
|
||
this.objectClient = objectClient; | ||
this.uri = uri; | ||
public S3SeekableInputStream(BlockManager blockManager) throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This S3SeekableInputStream
should only take a URI as an argument IMO. A calling application has no concept of a block manager and should be able to do inputStream = new S3SeekableInputStream(new S3URI(bucket, key)).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I totally agree with that -- this was mainly motivated by testability. I know some people use @VisibleForTesting
, will see if that has a use here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You'll want to remove the BlockManager from here ultimately, so can we not do that now? instead of having @VisibleForTesting . I think VisibleForTesting is used more for private methods that you need to change scope for testing ..
int byteRead = stream.read(); | ||
|
||
if (byteRead < 0) { | ||
if (this.position >= contentLength()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
S3A does (this.contentLength == 0 || (nextReadPos >= contentLength))
, see here. I think this.position >= contentLength()
is sufficient and will cover the this.contentLength == 0
, but wanted to point this out .. in case anyone can think of an edge case we may be missing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Huh! Very interesting! They look logically identical to me, I can introduce an explicit test case for this in the next revision.
throw new IOException(String.format("Unable to seek to position %s", pos)); | ||
} | ||
public void seek(long pos) { | ||
Preconditions.checkState(pos >= 0, "position must be non-negative"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to think about exceptions we throw, you probably want to throw EOFs here for both cases. We should also revisit the exception translation discussion we had. We need some mechanism that will allow applications to translate our exceptions into what they expect (for S3A, these will be Hadoop exceptions)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the exception translation can probably wait for now, but I will change this into an EOF in the next revision.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why EOF when position
is negative?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is what S3A does https://github.com/apache/hadoop/blob/branch-3.3.5/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java#L309
Which is probably motivated by some HDFS standard , though I'm not 100% sure.
*/ | ||
public class BlockManager implements AutoCloseable { | ||
private static final int MAX_BLOCKS_TO_KEEP_AROUND = 10; | ||
private static final long EIGHT_MB_CHUNK_IN_BYTES = 8 * 1024 * 1024; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Prefer:
8_MB = 8 * 1024 * 1024
DEFAULT_BLOCK_SIZE = 8_MB
You may want to move this in a common constants file somewhere, I suspect these numbers will get used a lot across implementation and tests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it, will fix.
* it is holding. | ||
*/ | ||
public class BlockManager implements AutoCloseable { | ||
private static final int MAX_BLOCKS_TO_KEEP_AROUND = 10; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Something like MAX_BLOCKS_CAPACITY/COUNT
is easier to understand
/** | ||
* A circular buffer of fixed capacity. Closes its elements before removing them. Not thread-safe. | ||
*/ | ||
public class AutoClosingCircularBuffer<T extends Closeable> implements Closeable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the future, we will need something like this for sure, but we won't be doing eviction based on oldestIndex, and eviction won't be circular.
When you do prefetching, blocks will come in async, so you won't want to evict by oldest index. You'll probably want to maintain some state of the block. Something like: "FULLY_READ, PREFETCHING, READY". And eviction should be based on this state.
I am mentioning this because I feel the functionality of this class will soon change and so maybe the name isn't quite right, and I don't think we should spend too much time here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the future, we will need something like this for sure, but we won't be doing eviction based on oldestIndex, and eviction won't be circular.
That's fine. The main problem in Java-land is that I couldn't find an abstract data type that actually closes things (or calls a function on what it evicts) when they are released. So oldestIndex
can change, circularity can change, the container probably won't and we will need something that does this.
When you do prefetching, blocks will come in async, so you won't want to evict by oldest index. You'll probably want to maintain some state of the block. Something like: "FULLY_READ, PREFETCHING, READY". And eviction should be based on this state.
Ack and agree. Let's change this when that asynchrony is added.
I am mentioning this because I feel the functionality of this class will soon change and so maybe the name isn't quite right, and I don't think we should spend too much time here.
This is a good comment. This probably shouldn't be a public
class and then we don't need to care about the name either. I think my response here is just: everything here is a 2-way door and when you (or someone else) get(s) here, let's not be afraid of walking through it whichever direction and changing things.
} | ||
|
||
private Optional<IOBlock> lookupBlockForPosition(long pos) { | ||
return ioBlocks.stream().filter(ioBlock -> ioBlock.contains(pos)).findFirst(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how about using block numbers instead? Basically you have your position, and you have your size of block. So the blockNumber = currentPos / block_size.
Store the block number in the IoBlock. Then just search for that block number. It's essentially the same thing, but easier to reason about imo.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how about using block numbers instead? Basically you have your position, and you have your size of block. So the blockNumber = currentPos / block_size.
But the block size might be variable, right? Simple case is when it goes until EOF, but longer term the metadata-aware prefetching might also use variably sized blocks.
It's essentially the same thing, but easier to reason about imo.
Interesting, I am the other way around! :) I think contains
abstracts away some details here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah good point, I hadn't thought about the variable block sizes that may come later.
} | ||
|
||
public int getByte(long pos) throws IOException { | ||
if (pos < position) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
took me a while to work out why we would hit this condition. A comment with an example would be super helpful. Would also recommend changing name of position
to something like positionInCurrentBuffer
. Having both pos
and position
is confusing
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, actually, I should have noticed this when writing the code. It's even harder here because there is no purple highlight for class-variables. Will fix in next rev.
return readByte(pos); | ||
} | ||
|
||
private int readByte(long pos) throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why don't we just read the block in the buffer in one go in the constructor instead?
content.join().getStream.read(blockContent, 0, numBytesToRead)
Especially with the CRT, that entire block of 8MB will be available at once. So what is the benefit of reading byte by byte form the S3 input stream like this?
This will prepare us for implementing read(b, off, len)
also.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like this. I think this still won't overcomplicate things so will try to do this in the next rev.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is some complication here: I tried to do this in a naive way and confirmed with microbenchmarks that it's a great improvement (improved sequential read 80x), HOWEVER, I think we can do even better in a non-blocking async-style way (like: I think there is a low-lift way to make this constructor not blocking that will make our prefetching story basically trivial).
I would rather try to not add this complexity here but introduce it later (for example: tactically it makes sense for me to do: 1\ microbenchmarks, 2\ evaluate if I want to apply your improvement or the async one). What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that's fine, thanks
|
||
private int readByte(long pos) throws IOException { | ||
Preconditions.checkState( | ||
position <= pos, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not immediately clear to me when this condition can happen, could you please provide an example.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Basically, if you do a backwards seek on already fetched data then this condition just checks that we should not issue a read but serve the byte from memory.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought about this some more. Didn't review tests yet, as I'd like to align on implementation first, let's discuss once you're back.
|
||
/** | ||
* Creates a new instance of {@link S3SeekableInputStream}. | ||
* | ||
* @param objectClient an instance of {@link ObjectClient}. | ||
* @param uri location of the S3 object this stream is fetching data from | ||
* @param blockManager an instance of {@link BlockManager}. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, I am mostly aligned with these changes. But I suggest simplifying IOBlock.
I don't think it should implement any read methods. It should only be responsible for:
- Storing the actual buffer of data that it represents.
- Keeping track of position in the buffer it holds, and some functionality to change position in this buffer. (similar to what you have already,
pos - bufferStartPos
.)
Then, when you have a read()
- The blockManager should talk in blocks, and not return a byte. That is, it should return an IOBlock. The block managers only responsibility should be to ensure it can return the right block of data.
do something like blockManger.getBlock(blockNumber) , where blockNumber is defined by blockNumber = currentPos / block_size.
This is basically getBlockForPosition()
that you have already.
then here, you have direct access to the current buffer that holds data for the currentPosition.
When you implement read(buffer, off, len), that should happen here too. Keep asking for blocks of data from the block manager till you read the requested length. Something like:
while (dataRead < len) {
if (pos in currentBuffer) {
// read into buffer from currentBuffer till end of current buffer
} else {
currentBuffer = getBlock(blockNumber).getBuffer()
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good points, I have a few questions.
- If the blocks are not fixed in size (sounds like we need to align around this), then that blockNumber-style indexing will not work. Is this right? Or do we not care about variable-sized blocks?
- What happens when we abandon this primitive
read()
in a more mature implementation and start doingread(byte[], off, len)
-style calls? The range requested could span multiple buffers. To me it feels as though the abstraction ofgetBlockForPosition()
and latergetBlocksForRange()
is appropriate and we should not expose the index calculation. Then, another question is: once we've got (let's say) 3 blocks representing a range, do we expect someone to assemble these IOBlocks outside of the BlockManager? How do bytes leave the block manager and who owns them?
When bytes leave the block manager, do they get copied in the caller's buffer? If not, I am a little bit afraid that we can end up in a situation where the Block Manager closes an IOBlock still in use when it gets evicted.
- You looked into VectoredIO. Do you have any foresight into what we might need there? Does it work better with fixed vs variably-sized blocks? Is it easier to merge ranges if we expose or hide the IOBlocks? What layer would VectoredIO live at (above or below block manager)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
1/ Yup, I hadn't thought about variable-sized blocks. I don't think this blockNumber style indexing will work in that case.
2/ Let's discuss offline quickly and align.
3/ Good q, I don't know enough about VectoredIO request patterns to say anything with confidence. From what I understand, if you're doing VectoredIO you probably don't need or benefit from prefetching. So I think we would treat it separately, and it would be below the block manager, but don't quote me on this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Couple of minor comments, all good otherwise.
@@ -41,10 +53,12 @@ public int read() throws IOException { | |||
} | |||
|
|||
@Override | |||
public void seek(long pos) { | |||
public void seek(long pos) throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add a TODO here to look at if we need to throw and EOF for negative pos?
return readByte(pos); | ||
} | ||
|
||
private int readByte(long pos) throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that's fine, thanks
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks, all good
This commit adds support for breaking up open-range GET requests and fetches data chunk-by-chunk. A very naive implementation of a block manager is added.
Furthermore, we changed the interface of Object Client to async to allow for prefetching in the future. The Block Manager is also written with prefetching in mind. For now, we keep a limited number of IOBlocks in memory (which is of no use today) which should be very similar to how prefetching might happen in the future (except rather than keeping bytes around AFTER they've been read we will prefetch them to their respective IOBlocks BEFORE they've been read).
There are improvements around testability: we abstract away some SDK constructs like GetObjectRequest, HeadObjectRequest to limit the parameter space. This might allow in the future to introduce property-based tests or fuzz testing and also makes it easier to prototype 'fake clients' or in-memory models easier.
On top of unit tests we add a new reference test on a full sequential read and compare with our in-memory model. It seems to pass!
Next steps are to eliminate the byte-by-byte reading and refactor read() calls into the more efficient read(b, off, len) style.
By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.