Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Disable sequential prefetching for non Parquet objects #225

Open
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

vaibhav5140
Copy link

Description of change

Added support for selective sequential prefetching based on file type. This change enables sequential prefetching specifically for Parquet files and disabling it for non Parquet

Does this contribution introduce any breaking changes to the existing APIs or behaviours?

Yes, Disabling sequential prefetching for non Parquet objects

Does this contribution introduce any new public APIs or behaviours?

No

How was the contribution tested?

Updated unit tests for BlockManager, PhysicalIOImpl and S3SeekableInputStreamFactory
Verified behaviour with Parquet and non-Parquet files
Modified existing tests to handle OpenFileInformation

Does this contribution need a changelog entry?

No

Copy link
Collaborator

@ahmarsuhail ahmarsuhail left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks good overall,

i'm not sure if block store tests cases are quite right, or maybe i misunderstood them. let's discuss

telemetry,
configuration.getLogicalIOConfiguration(),
parquetColumnPrefetchStore);

default:
OpenFileInformation effectiveInfo = openFileInformation;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you move this to a private function in utils, call it something like setInputPolicy(), and then just call it

         new PhysicalIOImpl(
                s3URI, objectMetadataStore, objectBlobStore, telemetry, setInputPolicy())

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need to create a new OpenFileInformation object again, can we not set the input policy in the existing object itself?

this.streamContext = streamContext;
this.streamContext = openFileInformation.getStreamContext();
this.isSequential =
openFileInformation.getInputPolicy() != null
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

move to a function, maybe in utils. keeps the constructor clean of any logic code

@@ -54,6 +54,19 @@ public class S3SeekableInputStreamFactoryTest {

private static final S3URI TEST_URI = S3URI.of("test-bucket", "test-key");

private static OpenFileInformation createMockOpenFileInfo() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do you need a mock object here? you can just create OpenFileInformation object

@@ -99,7 +112,7 @@ void testCreateDefaultStream() throws IOException {

inputStream =
s3SeekableInputStreamFactory.createStream(
S3URI.of("bucket", "key"), mock(OpenFileInformation.class));
S3URI.of("bucket", "key"), createMockOpenFileInfo());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

basically here you can just pass in OpenFileInformation.DEFAULT, instead of the mock object

}

@Test
void testNonParquetBehavior() throws IOException {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a good start, but I'm not sure if you're testing your desired behaviour here.

What I think you want to test for is when you do

csvBlockManager.makeRangeAvailable(0L, 100L, ReadMode.SYNC);
csvBlockManager.makeRangeAvailable(101L, 500L, ReadMode.SYNC);
csvBlockManager.makeRangeAvailable(501L, 500L, ReadMode.SYNC);

that is request sequential blocks, no prefetching requests are made. So only 3 GET requests should ever be made, with the exact ranges you asked for

public Blob get(
ObjectKey objectKey,
ObjectMetadata metadata,
StreamContext streamContext,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

streamContext This is no longer being passed to BlockManager right? Should we remove it?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Comment on lines +306 to +406
// Given: Test setup for CSV file
S3URI csvUri = S3URI.of("bucket", "test.csv"); // Non-parquet extension
ObjectClient objectClient = mock(ObjectClient.class);
ObjectMetadata metadata =
ObjectMetadata.builder().contentLength(16L * ONE_MB).etag(ETAG).build();

PhysicalIOConfiguration testConfig =
PhysicalIOConfiguration.builder()
.readAheadBytes(ONE_KB)
.sequentialPrefetchBase(2.0)
.build();

// For CSV file
OpenFileInformation csvInfo = OpenFileInformation.builder().objectMetadata(metadata).build();

// Create BlockManager for CSV
BlockManager csvBlockManager =
new BlockManager(
ObjectKey.builder().s3URI(csvUri).etag(ETAG).build(),
objectClient,
metadata,
TestTelemetry.DEFAULT,
testConfig,
csvInfo);

// Setup mock response
when(objectClient.getObject(any(GetRequest.class), any()))
.thenReturn(
CompletableFuture.completedFuture(
ObjectContent.builder().stream(new ByteArrayInputStream(new byte[1024])).build()));

// Make sequential requests
ArgumentCaptor<GetRequest> requestCaptor = ArgumentCaptor.forClass(GetRequest.class);
csvBlockManager.makeRangeAvailable(0L, 100L, ReadMode.SYNC);
csvBlockManager.makeRangeAvailable(512L, 100L, ReadMode.SYNC);

verify(objectClient, atLeast(1)).getObject(requestCaptor.capture(), any());

// Verify all requests are limited to readAheadBytes
requestCaptor
.getAllValues()
.forEach(
request ->
assertTrue(
request.getRange().getLength() <= testConfig.getReadAheadBytes(),
"Non-Parquet requests should not exceed readAheadBytes"));
}

@Test
void testParquetBehavior() throws IOException {
// Given: Test setup for Parquet file
S3URI parquetUri = S3URI.of("bucket", "test.parquet"); // Parquet extension
ObjectClient objectClient = mock(ObjectClient.class);
ObjectMetadata metadata =
ObjectMetadata.builder().contentLength(16L * ONE_MB).etag(ETAG).build();

PhysicalIOConfiguration testConfig =
PhysicalIOConfiguration.builder()
.readAheadBytes(ONE_KB)
.sequentialPrefetchBase(2.0)
.build();

// For Parquet file - set Sequential policy
OpenFileInformation parquetInfo =
OpenFileInformation.builder()
.objectMetadata(metadata)
.inputPolicy(InputPolicy.Sequential) // This is the key change
.build();

// Create BlockManager for Parquet
BlockManager parquetBlockManager =
new BlockManager(
ObjectKey.builder().s3URI(parquetUri).etag(ETAG).build(),
objectClient,
metadata,
TestTelemetry.DEFAULT,
testConfig,
parquetInfo);

// Setup mock response
when(objectClient.getObject(any(GetRequest.class), any()))
.thenReturn(
CompletableFuture.completedFuture(
ObjectContent.builder().stream(new ByteArrayInputStream(new byte[1024])).build()));

// Make sequential requests
ArgumentCaptor<GetRequest> requestCaptor = ArgumentCaptor.forClass(GetRequest.class);
parquetBlockManager.makeRangeAvailable(0L, 100L, ReadMode.SYNC);
parquetBlockManager.makeRangeAvailable(512L, 100L, ReadMode.SYNC);

verify(objectClient, atLeast(1)).getObject(requestCaptor.capture(), any());

// Verify requests use sequential prefetching
requestCaptor
.getAllValues()
.forEach(
request ->
assertTrue(
request.getRange().getLength() <= testConfig.getReadAheadBytes(),
"Parquet requests should show sequential prefetching"));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should move the common code between these tests into separate functions - so that tests will be easy to follow

@@ -25,7 +25,7 @@
* information and callbacks when opening the file.
*/
@Value
@Builder
@Builder(toBuilder = true)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what does this do? and why are we doing it?

@@ -17,8 +17,7 @@

import static org.junit.jupiter.api.Assertions.*;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.*;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

*/
public PhysicalIOImpl(
@NonNull S3URI s3URI,
@NonNull MetadataStore metadataStore,
@NonNull BlobStore blobStore,
@NonNull Telemetry telemetry,
StreamContext streamContext)
OpenFileInformation openFileInformation)
Copy link
Contributor

@rajdchak rajdchak Feb 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add the @nonnull check

throws IOException {
this.metadataStore = metadataStore;
this.blobStore = blobStore;
this.telemetry = telemetry;
this.streamContext = streamContext;
this.openFileInformation = openFileInformation;
this.streamContext = openFileInformation.getStreamContext();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can remove streamContext from here, only pass openFileInformation to blobStore which will have the streamContext.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants