Skip to content

Commit

Permalink
Disable sequential prefetching for non Parquet objects
Browse files Browse the repository at this point in the history
  • Loading branch information
vaahujaw committed Feb 14, 2025
1 parent 5368e3f commit c9dcfdb
Show file tree
Hide file tree
Showing 10 changed files with 282 additions and 78 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
* information and callbacks when opening the file.
*/
@Value
@Builder
@Builder(toBuilder = true)
public class OpenFileInformation {
StreamContext streamContext;
ObjectMetadata objectMetadata;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import software.amazon.s3.analyticsaccelerator.io.physical.impl.PhysicalIOImpl;
import software.amazon.s3.analyticsaccelerator.request.ObjectClient;
import software.amazon.s3.analyticsaccelerator.request.ObjectMetadata;
import software.amazon.s3.analyticsaccelerator.util.InputPolicy;
import software.amazon.s3.analyticsaccelerator.util.ObjectFormatSelector;
import software.amazon.s3.analyticsaccelerator.util.OpenFileInformation;
import software.amazon.s3.analyticsaccelerator.util.S3URI;
Expand Down Expand Up @@ -117,31 +118,27 @@ LogicalIO createLogicalIO(S3URI s3URI) throws IOException {
return createLogicalIO(s3URI, OpenFileInformation.DEFAULT);
}

LogicalIO createLogicalIO(S3URI s3URI, OpenFileInformation openFileInformation)
LogicalIO createLogicalIO(S3URI s3URI, @NonNull OpenFileInformation openFileInformation)
throws IOException {
switch (objectFormatSelector.getObjectFormat(s3URI, openFileInformation)) {
case PARQUET:
return new ParquetLogicalIOImpl(
s3URI,
new PhysicalIOImpl(
s3URI,
objectMetadataStore,
objectBlobStore,
telemetry,
openFileInformation.getStreamContext()),
s3URI, objectMetadataStore, objectBlobStore, telemetry, openFileInformation),
telemetry,
configuration.getLogicalIOConfiguration(),
parquetColumnPrefetchStore);

default:
OpenFileInformation effectiveInfo = openFileInformation;
if (effectiveInfo.getInputPolicy() == null) {
effectiveInfo = effectiveInfo.toBuilder().inputPolicy(InputPolicy.Sequential).build();
}
return new DefaultLogicalIOImpl(
s3URI,
new PhysicalIOImpl(
s3URI,
objectMetadataStore,
objectBlobStore,
telemetry,
openFileInformation.getStreamContext()),
s3URI, objectMetadataStore, objectBlobStore, telemetry, effectiveInfo),
telemetry);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import software.amazon.s3.analyticsaccelerator.request.ObjectMetadata;
import software.amazon.s3.analyticsaccelerator.request.StreamContext;
import software.amazon.s3.analyticsaccelerator.util.ObjectKey;
import software.amazon.s3.analyticsaccelerator.util.OpenFileInformation;

/** A BlobStore is a container for Blobs and functions as a data cache. */
@SuppressFBWarnings(
Expand Down Expand Up @@ -69,17 +70,22 @@ protected boolean removeEldestEntry(final Map.Entry<ObjectKey, Blob> eldest) {
* @param objectKey the etag and S3 URI of the object
* @param metadata the metadata for the object we are computing
* @param streamContext contains audit headers to be attached in the request header
* @param openFileInformation known file information including input policy
* @return the blob representing the object from the BlobStore
*/
public Blob get(ObjectKey objectKey, ObjectMetadata metadata, StreamContext streamContext) {
public Blob get(
ObjectKey objectKey,
ObjectMetadata metadata,
StreamContext streamContext,
OpenFileInformation openFileInformation) {
return blobMap.computeIfAbsent(
objectKey,
uri ->
new Blob(
uri,
metadata,
new BlockManager(
uri, objectClient, metadata, telemetry, configuration, streamContext),
uri, objectClient, metadata, telemetry, configuration, openFileInformation),
telemetry));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@
import software.amazon.s3.analyticsaccelerator.request.Range;
import software.amazon.s3.analyticsaccelerator.request.ReadMode;
import software.amazon.s3.analyticsaccelerator.request.StreamContext;
import software.amazon.s3.analyticsaccelerator.util.InputPolicy;
import software.amazon.s3.analyticsaccelerator.util.ObjectKey;
import software.amazon.s3.analyticsaccelerator.util.OpenFileInformation;
import software.amazon.s3.analyticsaccelerator.util.StreamAttributes;

/** Implements a Block Manager responsible for planning and scheduling reads on a key. */
Expand All @@ -48,7 +50,7 @@ public class BlockManager implements Closeable {
private final PhysicalIOConfiguration configuration;
private final RangeOptimiser rangeOptimiser;
private StreamContext streamContext;

private final boolean isSequential;
private static final String OPERATION_MAKE_RANGE_AVAILABLE = "block.manager.make.range.available";

/**
Expand All @@ -60,14 +62,6 @@ public class BlockManager implements Closeable {
* @param metadata the metadata for the object we are reading
* @param configuration the physicalIO configuration
*/
public BlockManager(
@NonNull ObjectKey objectKey,
@NonNull ObjectClient objectClient,
@NonNull ObjectMetadata metadata,
@NonNull Telemetry telemetry,
@NonNull PhysicalIOConfiguration configuration) {
this(objectKey, objectClient, metadata, telemetry, configuration, null);
}

/**
* Constructs a new BlockManager.
Expand All @@ -77,15 +71,15 @@ public BlockManager(
* @param telemetry an instance of {@link Telemetry} to use
* @param metadata the metadata for the object
* @param configuration the physicalIO configuration
* @param streamContext contains audit headers to be attached in the request header
* @param openFileInformation known file information including input policy
*/
public BlockManager(
@NonNull ObjectKey objectKey,
@NonNull ObjectClient objectClient,
@NonNull ObjectMetadata metadata,
@NonNull Telemetry telemetry,
@NonNull PhysicalIOConfiguration configuration,
StreamContext streamContext) {
OpenFileInformation openFileInformation) {
this.objectKey = objectKey;
this.objectClient = objectClient;
this.metadata = metadata;
Expand All @@ -96,7 +90,10 @@ public BlockManager(
this.sequentialReadProgression = new SequentialReadProgression(configuration);
this.ioPlanner = new IOPlanner(blockStore);
this.rangeOptimiser = new RangeOptimiser(configuration);
this.streamContext = streamContext;
this.streamContext = openFileInformation.getStreamContext();
this.isSequential =
openFileInformation.getInputPolicy() != null
&& openFileInformation.getInputPolicy() == InputPolicy.Sequential;
}

/**
Expand Down Expand Up @@ -170,7 +167,7 @@ public synchronized void makeRangeAvailable(long pos, long len, ReadMode readMod
// TODO: Improve readModes, as tracked in
// https://github.com/awslabs/analytics-accelerator-s3/issues/195
final long generation;
if (readMode != ReadMode.ASYNC && patternDetector.isSequentialRead(pos)) {
if (isSequential && readMode != ReadMode.ASYNC && patternDetector.isSequentialRead(pos)) {
generation = patternDetector.getGeneration(pos);
effectiveEnd =
Math.max(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import software.amazon.s3.analyticsaccelerator.request.ObjectMetadata;
import software.amazon.s3.analyticsaccelerator.request.StreamContext;
import software.amazon.s3.analyticsaccelerator.util.ObjectKey;
import software.amazon.s3.analyticsaccelerator.util.OpenFileInformation;
import software.amazon.s3.analyticsaccelerator.util.S3URI;
import software.amazon.s3.analyticsaccelerator.util.StreamAttributes;

Expand All @@ -39,7 +40,7 @@ public class PhysicalIOImpl implements PhysicalIO {
private final StreamContext streamContext;
private ObjectKey objectKey;
private final ObjectMetadata metadata;

private final OpenFileInformation openFileInformation;
private final long physicalIOBirth = System.nanoTime();

private static final String OPERATION_READ = "physical.io.read";
Expand All @@ -61,7 +62,7 @@ public PhysicalIOImpl(
@NonNull BlobStore blobStore,
@NonNull Telemetry telemetry)
throws IOException {
this(s3URI, metadataStore, blobStore, telemetry, null);
this(s3URI, metadataStore, blobStore, telemetry, OpenFileInformation.DEFAULT);
}

/**
Expand All @@ -71,19 +72,20 @@ public PhysicalIOImpl(
* @param metadataStore a metadata cache
* @param blobStore a data cache
* @param telemetry The {@link Telemetry} to use to report measurements.
* @param streamContext contains audit headers to be attached in the request header
* @param openFileInformation known file information including input policy
*/
public PhysicalIOImpl(
@NonNull S3URI s3URI,
@NonNull MetadataStore metadataStore,
@NonNull BlobStore blobStore,
@NonNull Telemetry telemetry,
StreamContext streamContext)
OpenFileInformation openFileInformation)
throws IOException {
this.metadataStore = metadataStore;
this.blobStore = blobStore;
this.telemetry = telemetry;
this.streamContext = streamContext;
this.openFileInformation = openFileInformation;
this.streamContext = openFileInformation.getStreamContext();
this.metadata = this.metadataStore.get(s3URI);
this.objectKey = ObjectKey.builder().s3URI(s3URI).etag(metadata.getEtag()).build();
}
Expand Down Expand Up @@ -122,7 +124,10 @@ public int read(long pos) throws IOException {
StreamAttributes.physicalIORelativeTimestamp(
System.nanoTime() - physicalIOBirth))
.build(),
() -> blobStore.get(this.objectKey, this.metadata, streamContext).read(pos));
() ->
blobStore
.get(this.objectKey, this.metadata, streamContext, openFileInformation)
.read(pos));
} catch (Exception e) {
handleOperationExceptions(e);
throw e;
Expand Down Expand Up @@ -159,7 +164,10 @@ public int read(byte[] buf, int off, int len, long pos) throws IOException {
StreamAttributes.physicalIORelativeTimestamp(
System.nanoTime() - physicalIOBirth))
.build(),
() -> blobStore.get(objectKey, this.metadata, streamContext).read(buf, off, len, pos));
() ->
blobStore
.get(objectKey, this.metadata, streamContext, openFileInformation)
.read(buf, off, len, pos));
} catch (Exception e) {
handleOperationExceptions(e);
throw e;
Expand Down Expand Up @@ -195,7 +203,7 @@ public int readTail(byte[] buf, int off, int len) throws IOException {
.build(),
() ->
blobStore
.get(objectKey, this.metadata, streamContext)
.get(objectKey, this.metadata, streamContext, openFileInformation)
.read(buf, off, len, contentLength - len));
} catch (Exception e) {
handleOperationExceptions(e);
Expand All @@ -222,7 +230,10 @@ public IOPlanExecution execute(IOPlan ioPlan) {
StreamAttributes.physicalIORelativeTimestamp(
System.nanoTime() - physicalIOBirth))
.build(),
() -> blobStore.get(objectKey, this.metadata, streamContext).execute(ioPlan));
() ->
blobStore
.get(objectKey, this.metadata, streamContext, openFileInformation)
.execute(ioPlan));
}

private void handleOperationExceptions(Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@

import static org.junit.jupiter.api.Assertions.*;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.*;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.io.IOException;
Expand All @@ -40,6 +39,7 @@
import software.amazon.s3.analyticsaccelerator.io.logical.impl.ParquetLogicalIOImpl;
import software.amazon.s3.analyticsaccelerator.request.ObjectClient;
import software.amazon.s3.analyticsaccelerator.request.ObjectMetadata;
import software.amazon.s3.analyticsaccelerator.util.InputPolicy;
import software.amazon.s3.analyticsaccelerator.util.OpenFileInformation;
import software.amazon.s3.analyticsaccelerator.util.S3URI;

Expand All @@ -54,6 +54,19 @@ public class S3SeekableInputStreamFactoryTest {

private static final S3URI TEST_URI = S3URI.of("test-bucket", "test-key");

private static OpenFileInformation createMockOpenFileInfo() {
OpenFileInformation mockObj = mock(OpenFileInformation.class);
OpenFileInformation.OpenFileInformationBuilder mockBuilder =
mock(OpenFileInformation.OpenFileInformationBuilder.class);

when(mockObj.getInputPolicy()).thenReturn(null);
when(mockBuilder.inputPolicy(any(InputPolicy.class))).thenReturn(mockBuilder);
when(mockBuilder.build()).thenReturn(mockObj);
when(mockObj.toBuilder()).thenReturn(mockBuilder);

return mockObj;
}

@Test
void testConstructor() {
ObjectClient objectClient = mock(ObjectClient.class);
Expand Down Expand Up @@ -99,7 +112,7 @@ void testCreateDefaultStream() throws IOException {

inputStream =
s3SeekableInputStreamFactory.createStream(
S3URI.of("bucket", "key"), mock(OpenFileInformation.class));
S3URI.of("bucket", "key"), createMockOpenFileInfo());
assertNotNull(inputStream);
}

Expand Down Expand Up @@ -155,8 +168,7 @@ void testCreateIndependentStream() throws IOException {

S3SeekableInputStream inputStream = s3SeekableInputStreamFactory.createStream(s3URI);
assertNotNull(inputStream);

inputStream = s3SeekableInputStreamFactory.createStream(s3URI, mock(OpenFileInformation.class));
inputStream = s3SeekableInputStreamFactory.createStream(s3URI, createMockOpenFileInfo());
assertNotNull(inputStream);
}

Expand Down Expand Up @@ -203,20 +215,19 @@ void testCreateLogicalIO() throws IOException {
s3SeekableInputStreamFactory
.getObjectMetadataStore()
.storeObjectMetadata(testURITXT, objectMetadata);

OpenFileInformation mockFileInfo = createMockOpenFileInfo();
assertTrue(
s3SeekableInputStreamFactory.createLogicalIO(
testURIParquet, mock(OpenFileInformation.class))
s3SeekableInputStreamFactory.createLogicalIO(testURIParquet, mockFileInfo)
instanceof ParquetLogicalIOImpl);
assertTrue(
s3SeekableInputStreamFactory.createLogicalIO(testURIKEYPAR, mock(OpenFileInformation.class))
s3SeekableInputStreamFactory.createLogicalIO(testURIKEYPAR, mockFileInfo)
instanceof ParquetLogicalIOImpl);

assertTrue(
s3SeekableInputStreamFactory.createLogicalIO(testURIJAVA, mock(OpenFileInformation.class))
s3SeekableInputStreamFactory.createLogicalIO(testURIJAVA, mockFileInfo)
instanceof DefaultLogicalIOImpl);
assertTrue(
s3SeekableInputStreamFactory.createLogicalIO(testURITXT, mock(OpenFileInformation.class))
s3SeekableInputStreamFactory.createLogicalIO(testURITXT, mockFileInfo)
instanceof DefaultLogicalIOImpl);
}

Expand Down Expand Up @@ -266,8 +277,7 @@ private static void assertInputStreamReadExceptions(
S3SeekableInputStreamFactory factory =
new S3SeekableInputStreamFactory(
new S3SdkObjectClient(mockS3AsyncClient), S3SeekableInputStreamConfiguration.DEFAULT);
S3SeekableInputStream inputStream =
factory.createStream(TEST_URI, mock(OpenFileInformation.class));
S3SeekableInputStream inputStream = factory.createStream(TEST_URI, createMockOpenFileInfo());
Exception thrownException = assertThrows(Exception.class, inputStream::read);
assertInstanceOf(IOException.class, thrownException);
Optional.ofNullable(thrownException.getCause())
Expand All @@ -283,7 +293,7 @@ private static void assertInputStreamHeadException(
new S3SdkObjectClient(mockS3AsyncClient), S3SeekableInputStreamConfiguration.DEFAULT);
Exception thrownException =
assertThrows(
Exception.class, () -> factory.createStream(TEST_URI, mock(OpenFileInformation.class)));
Exception.class, () -> factory.createStream(TEST_URI, createMockOpenFileInfo()));
assertInstanceOf(IOException.class, thrownException);
Optional.ofNullable(thrownException.getCause())
.ifPresent(
Expand All @@ -294,4 +304,25 @@ private static void assertInputStreamHeadException(
private static Exception[] exceptions() {
return ExceptionHandler.getSampleExceptions();
}

@Test
void testCreateLogicalIO_NullOpenFileInformationThrowsException() {
// Setup
ObjectClient objectClient = mock(ObjectClient.class);
S3SeekableInputStreamFactory factory =
new S3SeekableInputStreamFactory(
objectClient,
S3SeekableInputStreamConfiguration.builder()
.logicalIOConfiguration(
LogicalIOConfiguration.builder().prefetchFooterEnabled(false).build())
.build());

S3URI uri = S3URI.of("bucket", "test.txt");

// Test that null OpenFileInformation throws NullPointerException
assertThrows(
NullPointerException.class,
() -> factory.createLogicalIO(uri, null),
"createLogicalIO should throw NullPointerException when OpenFileInformation is null");
}
}
Loading

0 comments on commit c9dcfdb

Please sign in to comment.