-
Notifications
You must be signed in to change notification settings - Fork 9
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Disable sequential prefetching for non Parquet objects #225
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -28,6 +28,7 @@ | |
import software.amazon.s3.analyticsaccelerator.io.physical.impl.PhysicalIOImpl; | ||
import software.amazon.s3.analyticsaccelerator.request.ObjectClient; | ||
import software.amazon.s3.analyticsaccelerator.request.ObjectMetadata; | ||
import software.amazon.s3.analyticsaccelerator.util.InputPolicy; | ||
import software.amazon.s3.analyticsaccelerator.util.ObjectFormatSelector; | ||
import software.amazon.s3.analyticsaccelerator.util.OpenFileInformation; | ||
import software.amazon.s3.analyticsaccelerator.util.S3URI; | ||
|
@@ -117,31 +118,27 @@ LogicalIO createLogicalIO(S3URI s3URI) throws IOException { | |
return createLogicalIO(s3URI, OpenFileInformation.DEFAULT); | ||
} | ||
|
||
LogicalIO createLogicalIO(S3URI s3URI, OpenFileInformation openFileInformation) | ||
LogicalIO createLogicalIO(S3URI s3URI, @NonNull OpenFileInformation openFileInformation) | ||
throws IOException { | ||
switch (objectFormatSelector.getObjectFormat(s3URI, openFileInformation)) { | ||
case PARQUET: | ||
return new ParquetLogicalIOImpl( | ||
s3URI, | ||
new PhysicalIOImpl( | ||
s3URI, | ||
objectMetadataStore, | ||
objectBlobStore, | ||
telemetry, | ||
openFileInformation.getStreamContext()), | ||
s3URI, objectMetadataStore, objectBlobStore, telemetry, openFileInformation), | ||
telemetry, | ||
configuration.getLogicalIOConfiguration(), | ||
parquetColumnPrefetchStore); | ||
|
||
default: | ||
OpenFileInformation effectiveInfo = openFileInformation; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can you move this to a private function in utils, call it something like
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why do we need to create a new OpenFileInformation object again, can we not set the input policy in the existing object itself? |
||
if (effectiveInfo.getInputPolicy() == null) { | ||
effectiveInfo = effectiveInfo.toBuilder().inputPolicy(InputPolicy.Sequential).build(); | ||
} | ||
return new DefaultLogicalIOImpl( | ||
s3URI, | ||
new PhysicalIOImpl( | ||
s3URI, | ||
objectMetadataStore, | ||
objectBlobStore, | ||
telemetry, | ||
openFileInformation.getStreamContext()), | ||
s3URI, objectMetadataStore, objectBlobStore, telemetry, effectiveInfo), | ||
telemetry); | ||
} | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -27,6 +27,7 @@ | |
import software.amazon.s3.analyticsaccelerator.request.ObjectMetadata; | ||
import software.amazon.s3.analyticsaccelerator.request.StreamContext; | ||
import software.amazon.s3.analyticsaccelerator.util.ObjectKey; | ||
import software.amazon.s3.analyticsaccelerator.util.OpenFileInformation; | ||
|
||
/** A BlobStore is a container for Blobs and functions as a data cache. */ | ||
@SuppressFBWarnings( | ||
|
@@ -69,17 +70,22 @@ protected boolean removeEldestEntry(final Map.Entry<ObjectKey, Blob> eldest) { | |
* @param objectKey the etag and S3 URI of the object | ||
* @param metadata the metadata for the object we are computing | ||
* @param streamContext contains audit headers to be attached in the request header | ||
* @param openFileInformation known file information including input policy | ||
* @return the blob representing the object from the BlobStore | ||
*/ | ||
public Blob get(ObjectKey objectKey, ObjectMetadata metadata, StreamContext streamContext) { | ||
public Blob get( | ||
ObjectKey objectKey, | ||
ObjectMetadata metadata, | ||
StreamContext streamContext, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. +1 |
||
OpenFileInformation openFileInformation) { | ||
return blobMap.computeIfAbsent( | ||
objectKey, | ||
uri -> | ||
new Blob( | ||
uri, | ||
metadata, | ||
new BlockManager( | ||
uri, objectClient, metadata, telemetry, configuration, streamContext), | ||
uri, objectClient, metadata, telemetry, configuration, openFileInformation), | ||
telemetry)); | ||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -32,7 +32,9 @@ | |
import software.amazon.s3.analyticsaccelerator.request.Range; | ||
import software.amazon.s3.analyticsaccelerator.request.ReadMode; | ||
import software.amazon.s3.analyticsaccelerator.request.StreamContext; | ||
import software.amazon.s3.analyticsaccelerator.util.InputPolicy; | ||
import software.amazon.s3.analyticsaccelerator.util.ObjectKey; | ||
import software.amazon.s3.analyticsaccelerator.util.OpenFileInformation; | ||
import software.amazon.s3.analyticsaccelerator.util.StreamAttributes; | ||
|
||
/** Implements a Block Manager responsible for planning and scheduling reads on a key. */ | ||
|
@@ -48,7 +50,7 @@ public class BlockManager implements Closeable { | |
private final PhysicalIOConfiguration configuration; | ||
private final RangeOptimiser rangeOptimiser; | ||
private StreamContext streamContext; | ||
|
||
private final boolean isSequential; | ||
private static final String OPERATION_MAKE_RANGE_AVAILABLE = "block.manager.make.range.available"; | ||
|
||
/** | ||
|
@@ -60,14 +62,6 @@ public class BlockManager implements Closeable { | |
* @param metadata the metadata for the object we are reading | ||
* @param configuration the physicalIO configuration | ||
*/ | ||
public BlockManager( | ||
@NonNull ObjectKey objectKey, | ||
@NonNull ObjectClient objectClient, | ||
@NonNull ObjectMetadata metadata, | ||
@NonNull Telemetry telemetry, | ||
@NonNull PhysicalIOConfiguration configuration) { | ||
this(objectKey, objectClient, metadata, telemetry, configuration, null); | ||
} | ||
|
||
/** | ||
* Constructs a new BlockManager. | ||
|
@@ -77,15 +71,15 @@ public BlockManager( | |
* @param telemetry an instance of {@link Telemetry} to use | ||
* @param metadata the metadata for the object | ||
* @param configuration the physicalIO configuration | ||
* @param streamContext contains audit headers to be attached in the request header | ||
* @param openFileInformation known file information including input policy | ||
*/ | ||
public BlockManager( | ||
@NonNull ObjectKey objectKey, | ||
@NonNull ObjectClient objectClient, | ||
@NonNull ObjectMetadata metadata, | ||
@NonNull Telemetry telemetry, | ||
@NonNull PhysicalIOConfiguration configuration, | ||
StreamContext streamContext) { | ||
OpenFileInformation openFileInformation) { | ||
this.objectKey = objectKey; | ||
this.objectClient = objectClient; | ||
this.metadata = metadata; | ||
|
@@ -96,7 +90,10 @@ public BlockManager( | |
this.sequentialReadProgression = new SequentialReadProgression(configuration); | ||
this.ioPlanner = new IOPlanner(blockStore); | ||
this.rangeOptimiser = new RangeOptimiser(configuration); | ||
this.streamContext = streamContext; | ||
this.streamContext = openFileInformation.getStreamContext(); | ||
this.isSequential = | ||
openFileInformation.getInputPolicy() != null | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. move to a function, maybe in utils. keeps the constructor clean of any logic code |
||
&& openFileInformation.getInputPolicy() == InputPolicy.Sequential; | ||
} | ||
|
||
/** | ||
|
@@ -170,7 +167,7 @@ public synchronized void makeRangeAvailable(long pos, long len, ReadMode readMod | |
// TODO: Improve readModes, as tracked in | ||
// https://github.com/awslabs/analytics-accelerator-s3/issues/195 | ||
final long generation; | ||
if (readMode != ReadMode.ASYNC && patternDetector.isSequentialRead(pos)) { | ||
if (isSequential && readMode != ReadMode.ASYNC && patternDetector.isSequentialRead(pos)) { | ||
generation = patternDetector.getGeneration(pos); | ||
effectiveEnd = | ||
Math.max( | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -28,6 +28,7 @@ | |
import software.amazon.s3.analyticsaccelerator.request.ObjectMetadata; | ||
import software.amazon.s3.analyticsaccelerator.request.StreamContext; | ||
import software.amazon.s3.analyticsaccelerator.util.ObjectKey; | ||
import software.amazon.s3.analyticsaccelerator.util.OpenFileInformation; | ||
import software.amazon.s3.analyticsaccelerator.util.S3URI; | ||
import software.amazon.s3.analyticsaccelerator.util.StreamAttributes; | ||
|
||
|
@@ -39,7 +40,7 @@ public class PhysicalIOImpl implements PhysicalIO { | |
private final StreamContext streamContext; | ||
private ObjectKey objectKey; | ||
private final ObjectMetadata metadata; | ||
|
||
private final OpenFileInformation openFileInformation; | ||
private final long physicalIOBirth = System.nanoTime(); | ||
|
||
private static final String OPERATION_READ = "physical.io.read"; | ||
|
@@ -61,7 +62,7 @@ public PhysicalIOImpl( | |
@NonNull BlobStore blobStore, | ||
@NonNull Telemetry telemetry) | ||
throws IOException { | ||
this(s3URI, metadataStore, blobStore, telemetry, null); | ||
this(s3URI, metadataStore, blobStore, telemetry, OpenFileInformation.DEFAULT); | ||
} | ||
|
||
/** | ||
|
@@ -71,19 +72,20 @@ public PhysicalIOImpl( | |
* @param metadataStore a metadata cache | ||
* @param blobStore a data cache | ||
* @param telemetry The {@link Telemetry} to use to report measurements. | ||
* @param streamContext contains audit headers to be attached in the request header | ||
* @param openFileInformation known file information including input policy | ||
*/ | ||
public PhysicalIOImpl( | ||
@NonNull S3URI s3URI, | ||
@NonNull MetadataStore metadataStore, | ||
@NonNull BlobStore blobStore, | ||
@NonNull Telemetry telemetry, | ||
StreamContext streamContext) | ||
OpenFileInformation openFileInformation) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. add the @nonnull check |
||
throws IOException { | ||
this.metadataStore = metadataStore; | ||
this.blobStore = blobStore; | ||
this.telemetry = telemetry; | ||
this.streamContext = streamContext; | ||
this.openFileInformation = openFileInformation; | ||
this.streamContext = openFileInformation.getStreamContext(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we can remove streamContext from here, only pass openFileInformation to blobStore which will have the streamContext. |
||
this.metadata = this.metadataStore.get(s3URI); | ||
this.objectKey = ObjectKey.builder().s3URI(s3URI).etag(metadata.getEtag()).build(); | ||
} | ||
|
@@ -122,7 +124,10 @@ public int read(long pos) throws IOException { | |
StreamAttributes.physicalIORelativeTimestamp( | ||
System.nanoTime() - physicalIOBirth)) | ||
.build(), | ||
() -> blobStore.get(this.objectKey, this.metadata, streamContext).read(pos)); | ||
() -> | ||
blobStore | ||
.get(this.objectKey, this.metadata, streamContext, openFileInformation) | ||
.read(pos)); | ||
} catch (Exception e) { | ||
handleOperationExceptions(e); | ||
throw e; | ||
|
@@ -159,7 +164,10 @@ public int read(byte[] buf, int off, int len, long pos) throws IOException { | |
StreamAttributes.physicalIORelativeTimestamp( | ||
System.nanoTime() - physicalIOBirth)) | ||
.build(), | ||
() -> blobStore.get(objectKey, this.metadata, streamContext).read(buf, off, len, pos)); | ||
() -> | ||
blobStore | ||
.get(objectKey, this.metadata, streamContext, openFileInformation) | ||
.read(buf, off, len, pos)); | ||
} catch (Exception e) { | ||
handleOperationExceptions(e); | ||
throw e; | ||
|
@@ -195,7 +203,7 @@ public int readTail(byte[] buf, int off, int len) throws IOException { | |
.build(), | ||
() -> | ||
blobStore | ||
.get(objectKey, this.metadata, streamContext) | ||
.get(objectKey, this.metadata, streamContext, openFileInformation) | ||
.read(buf, off, len, contentLength - len)); | ||
} catch (Exception e) { | ||
handleOperationExceptions(e); | ||
|
@@ -222,7 +230,10 @@ public IOPlanExecution execute(IOPlan ioPlan) { | |
StreamAttributes.physicalIORelativeTimestamp( | ||
System.nanoTime() - physicalIOBirth)) | ||
.build(), | ||
() -> blobStore.get(objectKey, this.metadata, streamContext).execute(ioPlan)); | ||
() -> | ||
blobStore | ||
.get(objectKey, this.metadata, streamContext, openFileInformation) | ||
.execute(ioPlan)); | ||
} | ||
|
||
private void handleOperationExceptions(Exception e) { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,8 +17,7 @@ | |
|
||
import static org.junit.jupiter.api.Assertions.*; | ||
import static org.mockito.ArgumentMatchers.any; | ||
import static org.mockito.Mockito.mock; | ||
import static org.mockito.Mockito.when; | ||
import static org.mockito.Mockito.*; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. don't use * imports turn this off in intellij |
||
|
||
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; | ||
import java.io.IOException; | ||
|
@@ -40,6 +39,7 @@ | |
import software.amazon.s3.analyticsaccelerator.io.logical.impl.ParquetLogicalIOImpl; | ||
import software.amazon.s3.analyticsaccelerator.request.ObjectClient; | ||
import software.amazon.s3.analyticsaccelerator.request.ObjectMetadata; | ||
import software.amazon.s3.analyticsaccelerator.util.InputPolicy; | ||
import software.amazon.s3.analyticsaccelerator.util.OpenFileInformation; | ||
import software.amazon.s3.analyticsaccelerator.util.S3URI; | ||
|
||
|
@@ -54,6 +54,19 @@ public class S3SeekableInputStreamFactoryTest { | |
|
||
private static final S3URI TEST_URI = S3URI.of("test-bucket", "test-key"); | ||
|
||
private static OpenFileInformation createMockOpenFileInfo() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why do you need a mock object here? you can just create OpenFileInformation object |
||
OpenFileInformation mockObj = mock(OpenFileInformation.class); | ||
OpenFileInformation.OpenFileInformationBuilder mockBuilder = | ||
mock(OpenFileInformation.OpenFileInformationBuilder.class); | ||
|
||
when(mockObj.getInputPolicy()).thenReturn(null); | ||
when(mockBuilder.inputPolicy(any(InputPolicy.class))).thenReturn(mockBuilder); | ||
when(mockBuilder.build()).thenReturn(mockObj); | ||
when(mockObj.toBuilder()).thenReturn(mockBuilder); | ||
|
||
return mockObj; | ||
} | ||
|
||
@Test | ||
void testConstructor() { | ||
ObjectClient objectClient = mock(ObjectClient.class); | ||
|
@@ -99,7 +112,7 @@ void testCreateDefaultStream() throws IOException { | |
|
||
inputStream = | ||
s3SeekableInputStreamFactory.createStream( | ||
S3URI.of("bucket", "key"), mock(OpenFileInformation.class)); | ||
S3URI.of("bucket", "key"), createMockOpenFileInfo()); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. basically here you can just pass in OpenFileInformation.DEFAULT, instead of the mock object |
||
assertNotNull(inputStream); | ||
} | ||
|
||
|
@@ -155,8 +168,7 @@ void testCreateIndependentStream() throws IOException { | |
|
||
S3SeekableInputStream inputStream = s3SeekableInputStreamFactory.createStream(s3URI); | ||
assertNotNull(inputStream); | ||
|
||
inputStream = s3SeekableInputStreamFactory.createStream(s3URI, mock(OpenFileInformation.class)); | ||
inputStream = s3SeekableInputStreamFactory.createStream(s3URI, createMockOpenFileInfo()); | ||
assertNotNull(inputStream); | ||
} | ||
|
||
|
@@ -203,20 +215,19 @@ void testCreateLogicalIO() throws IOException { | |
s3SeekableInputStreamFactory | ||
.getObjectMetadataStore() | ||
.storeObjectMetadata(testURITXT, objectMetadata); | ||
|
||
OpenFileInformation mockFileInfo = createMockOpenFileInfo(); | ||
assertTrue( | ||
s3SeekableInputStreamFactory.createLogicalIO( | ||
testURIParquet, mock(OpenFileInformation.class)) | ||
s3SeekableInputStreamFactory.createLogicalIO(testURIParquet, mockFileInfo) | ||
instanceof ParquetLogicalIOImpl); | ||
assertTrue( | ||
s3SeekableInputStreamFactory.createLogicalIO(testURIKEYPAR, mock(OpenFileInformation.class)) | ||
s3SeekableInputStreamFactory.createLogicalIO(testURIKEYPAR, mockFileInfo) | ||
instanceof ParquetLogicalIOImpl); | ||
|
||
assertTrue( | ||
s3SeekableInputStreamFactory.createLogicalIO(testURIJAVA, mock(OpenFileInformation.class)) | ||
s3SeekableInputStreamFactory.createLogicalIO(testURIJAVA, mockFileInfo) | ||
instanceof DefaultLogicalIOImpl); | ||
assertTrue( | ||
s3SeekableInputStreamFactory.createLogicalIO(testURITXT, mock(OpenFileInformation.class)) | ||
s3SeekableInputStreamFactory.createLogicalIO(testURITXT, mockFileInfo) | ||
instanceof DefaultLogicalIOImpl); | ||
} | ||
|
||
|
@@ -266,8 +277,7 @@ private static void assertInputStreamReadExceptions( | |
S3SeekableInputStreamFactory factory = | ||
new S3SeekableInputStreamFactory( | ||
new S3SdkObjectClient(mockS3AsyncClient), S3SeekableInputStreamConfiguration.DEFAULT); | ||
S3SeekableInputStream inputStream = | ||
factory.createStream(TEST_URI, mock(OpenFileInformation.class)); | ||
S3SeekableInputStream inputStream = factory.createStream(TEST_URI, createMockOpenFileInfo()); | ||
Exception thrownException = assertThrows(Exception.class, inputStream::read); | ||
assertInstanceOf(IOException.class, thrownException); | ||
Optional.ofNullable(thrownException.getCause()) | ||
|
@@ -283,7 +293,7 @@ private static void assertInputStreamHeadException( | |
new S3SdkObjectClient(mockS3AsyncClient), S3SeekableInputStreamConfiguration.DEFAULT); | ||
Exception thrownException = | ||
assertThrows( | ||
Exception.class, () -> factory.createStream(TEST_URI, mock(OpenFileInformation.class))); | ||
Exception.class, () -> factory.createStream(TEST_URI, createMockOpenFileInfo())); | ||
assertInstanceOf(IOException.class, thrownException); | ||
Optional.ofNullable(thrownException.getCause()) | ||
.ifPresent( | ||
|
@@ -294,4 +304,25 @@ private static void assertInputStreamHeadException( | |
private static Exception[] exceptions() { | ||
return ExceptionHandler.getSampleExceptions(); | ||
} | ||
|
||
@Test | ||
void testCreateLogicalIO_NullOpenFileInformationThrowsException() { | ||
// Setup | ||
ObjectClient objectClient = mock(ObjectClient.class); | ||
S3SeekableInputStreamFactory factory = | ||
new S3SeekableInputStreamFactory( | ||
objectClient, | ||
S3SeekableInputStreamConfiguration.builder() | ||
.logicalIOConfiguration( | ||
LogicalIOConfiguration.builder().prefetchFooterEnabled(false).build()) | ||
.build()); | ||
|
||
S3URI uri = S3URI.of("bucket", "test.txt"); | ||
|
||
// Test that null OpenFileInformation throws NullPointerException | ||
assertThrows( | ||
NullPointerException.class, | ||
() -> factory.createLogicalIO(uri, null), | ||
"createLogicalIO should throw NullPointerException when OpenFileInformation is null"); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what does this do? and why are we doing it?