-
Notifications
You must be signed in to change notification settings - Fork 9
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Disable sequential prefetching for non Parquet objects #225
base: main
Are you sure you want to change the base?
Disable sequential prefetching for non Parquet objects #225
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks good overall,
i'm not sure if block store tests cases are quite right, or maybe i misunderstood them. let's discuss
telemetry, | ||
configuration.getLogicalIOConfiguration(), | ||
parquetColumnPrefetchStore); | ||
|
||
default: | ||
OpenFileInformation effectiveInfo = openFileInformation; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you move this to a private function in utils, call it something like setInputPolicy()
, and then just call it
new PhysicalIOImpl(
s3URI, objectMetadataStore, objectBlobStore, telemetry, setInputPolicy())
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we need to create a new OpenFileInformation object again, can we not set the input policy in the existing object itself?
this.streamContext = streamContext; | ||
this.streamContext = openFileInformation.getStreamContext(); | ||
this.isSequential = | ||
openFileInformation.getInputPolicy() != null |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
move to a function, maybe in utils. keeps the constructor clean of any logic code
@@ -54,6 +54,19 @@ public class S3SeekableInputStreamFactoryTest { | |||
|
|||
private static final S3URI TEST_URI = S3URI.of("test-bucket", "test-key"); | |||
|
|||
private static OpenFileInformation createMockOpenFileInfo() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do you need a mock object here? you can just create OpenFileInformation object
@@ -99,7 +112,7 @@ void testCreateDefaultStream() throws IOException { | |||
|
|||
inputStream = | |||
s3SeekableInputStreamFactory.createStream( | |||
S3URI.of("bucket", "key"), mock(OpenFileInformation.class)); | |||
S3URI.of("bucket", "key"), createMockOpenFileInfo()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
basically here you can just pass in OpenFileInformation.DEFAULT, instead of the mock object
} | ||
|
||
@Test | ||
void testNonParquetBehavior() throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is a good start, but I'm not sure if you're testing your desired behaviour here.
What I think you want to test for is when you do
csvBlockManager.makeRangeAvailable(0L, 100L, ReadMode.SYNC);
csvBlockManager.makeRangeAvailable(101L, 500L, ReadMode.SYNC);
csvBlockManager.makeRangeAvailable(501L, 500L, ReadMode.SYNC);
that is request sequential blocks, no prefetching requests are made. So only 3 GET requests should ever be made, with the exact ranges you asked for
public Blob get( | ||
ObjectKey objectKey, | ||
ObjectMetadata metadata, | ||
StreamContext streamContext, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
streamContext
This is no longer being passed to BlockManager right? Should we remove it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
// Given: Test setup for CSV file | ||
S3URI csvUri = S3URI.of("bucket", "test.csv"); // Non-parquet extension | ||
ObjectClient objectClient = mock(ObjectClient.class); | ||
ObjectMetadata metadata = | ||
ObjectMetadata.builder().contentLength(16L * ONE_MB).etag(ETAG).build(); | ||
|
||
PhysicalIOConfiguration testConfig = | ||
PhysicalIOConfiguration.builder() | ||
.readAheadBytes(ONE_KB) | ||
.sequentialPrefetchBase(2.0) | ||
.build(); | ||
|
||
// For CSV file | ||
OpenFileInformation csvInfo = OpenFileInformation.builder().objectMetadata(metadata).build(); | ||
|
||
// Create BlockManager for CSV | ||
BlockManager csvBlockManager = | ||
new BlockManager( | ||
ObjectKey.builder().s3URI(csvUri).etag(ETAG).build(), | ||
objectClient, | ||
metadata, | ||
TestTelemetry.DEFAULT, | ||
testConfig, | ||
csvInfo); | ||
|
||
// Setup mock response | ||
when(objectClient.getObject(any(GetRequest.class), any())) | ||
.thenReturn( | ||
CompletableFuture.completedFuture( | ||
ObjectContent.builder().stream(new ByteArrayInputStream(new byte[1024])).build())); | ||
|
||
// Make sequential requests | ||
ArgumentCaptor<GetRequest> requestCaptor = ArgumentCaptor.forClass(GetRequest.class); | ||
csvBlockManager.makeRangeAvailable(0L, 100L, ReadMode.SYNC); | ||
csvBlockManager.makeRangeAvailable(512L, 100L, ReadMode.SYNC); | ||
|
||
verify(objectClient, atLeast(1)).getObject(requestCaptor.capture(), any()); | ||
|
||
// Verify all requests are limited to readAheadBytes | ||
requestCaptor | ||
.getAllValues() | ||
.forEach( | ||
request -> | ||
assertTrue( | ||
request.getRange().getLength() <= testConfig.getReadAheadBytes(), | ||
"Non-Parquet requests should not exceed readAheadBytes")); | ||
} | ||
|
||
@Test | ||
void testParquetBehavior() throws IOException { | ||
// Given: Test setup for Parquet file | ||
S3URI parquetUri = S3URI.of("bucket", "test.parquet"); // Parquet extension | ||
ObjectClient objectClient = mock(ObjectClient.class); | ||
ObjectMetadata metadata = | ||
ObjectMetadata.builder().contentLength(16L * ONE_MB).etag(ETAG).build(); | ||
|
||
PhysicalIOConfiguration testConfig = | ||
PhysicalIOConfiguration.builder() | ||
.readAheadBytes(ONE_KB) | ||
.sequentialPrefetchBase(2.0) | ||
.build(); | ||
|
||
// For Parquet file - set Sequential policy | ||
OpenFileInformation parquetInfo = | ||
OpenFileInformation.builder() | ||
.objectMetadata(metadata) | ||
.inputPolicy(InputPolicy.Sequential) // This is the key change | ||
.build(); | ||
|
||
// Create BlockManager for Parquet | ||
BlockManager parquetBlockManager = | ||
new BlockManager( | ||
ObjectKey.builder().s3URI(parquetUri).etag(ETAG).build(), | ||
objectClient, | ||
metadata, | ||
TestTelemetry.DEFAULT, | ||
testConfig, | ||
parquetInfo); | ||
|
||
// Setup mock response | ||
when(objectClient.getObject(any(GetRequest.class), any())) | ||
.thenReturn( | ||
CompletableFuture.completedFuture( | ||
ObjectContent.builder().stream(new ByteArrayInputStream(new byte[1024])).build())); | ||
|
||
// Make sequential requests | ||
ArgumentCaptor<GetRequest> requestCaptor = ArgumentCaptor.forClass(GetRequest.class); | ||
parquetBlockManager.makeRangeAvailable(0L, 100L, ReadMode.SYNC); | ||
parquetBlockManager.makeRangeAvailable(512L, 100L, ReadMode.SYNC); | ||
|
||
verify(objectClient, atLeast(1)).getObject(requestCaptor.capture(), any()); | ||
|
||
// Verify requests use sequential prefetching | ||
requestCaptor | ||
.getAllValues() | ||
.forEach( | ||
request -> | ||
assertTrue( | ||
request.getRange().getLength() <= testConfig.getReadAheadBytes(), | ||
"Parquet requests should show sequential prefetching")); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should move the common code between these tests into separate functions - so that tests will be easy to follow
@@ -25,7 +25,7 @@ | |||
* information and callbacks when opening the file. | |||
*/ | |||
@Value | |||
@Builder | |||
@Builder(toBuilder = true) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what does this do? and why are we doing it?
@@ -17,8 +17,7 @@ | |||
|
|||
import static org.junit.jupiter.api.Assertions.*; | |||
import static org.mockito.ArgumentMatchers.any; | |||
import static org.mockito.Mockito.mock; | |||
import static org.mockito.Mockito.when; | |||
import static org.mockito.Mockito.*; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
don't use * imports turn this off in intellij
https://stackoverflow.com/questions/3348816/intellij-never-use-wildcard-imports
*/ | ||
public PhysicalIOImpl( | ||
@NonNull S3URI s3URI, | ||
@NonNull MetadataStore metadataStore, | ||
@NonNull BlobStore blobStore, | ||
@NonNull Telemetry telemetry, | ||
StreamContext streamContext) | ||
OpenFileInformation openFileInformation) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add the @nonnull check
throws IOException { | ||
this.metadataStore = metadataStore; | ||
this.blobStore = blobStore; | ||
this.telemetry = telemetry; | ||
this.streamContext = streamContext; | ||
this.openFileInformation = openFileInformation; | ||
this.streamContext = openFileInformation.getStreamContext(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can remove streamContext from here, only pass openFileInformation to blobStore which will have the streamContext.
Description of change
Added support for selective sequential prefetching based on file type. This change enables sequential prefetching specifically for Parquet files and disabling it for non Parquet
Does this contribution introduce any breaking changes to the existing APIs or behaviours?
Yes, Disabling sequential prefetching for non Parquet objects
Does this contribution introduce any new public APIs or behaviours?
No
How was the contribution tested?
Updated unit tests for BlockManager, PhysicalIOImpl and S3SeekableInputStreamFactory
Verified behaviour with Parquet and non-Parquet files
Modified existing tests to handle OpenFileInformation
Does this contribution need a changelog entry?
No