Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rename OpenFileInformation to OpenStreamInformation #227

Merged
merged 5 commits into from
Feb 21, 2025
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@
*/
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: These doc should be updated. It still says Open file information.

@Value
@Builder
public class OpenFileInformation {
public class OpenStreamInformation {
StreamContext streamContext;
ObjectMetadata objectMetadata;
InputPolicy inputPolicy;

/** Default set of settings for {@link OpenFileInformation} */
public static final OpenFileInformation DEFAULT = OpenFileInformation.builder().build();
/** Default set of settings for {@link OpenStreamInformation} */
public static final OpenStreamInformation DEFAULT = OpenStreamInformation.builder().build();
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import software.amazon.s3.analyticsaccelerator.request.ObjectClient;
import software.amazon.s3.analyticsaccelerator.request.ObjectMetadata;
import software.amazon.s3.analyticsaccelerator.util.ObjectFormatSelector;
import software.amazon.s3.analyticsaccelerator.util.OpenFileInformation;
import software.amazon.s3.analyticsaccelerator.util.OpenStreamInformation;
import software.amazon.s3.analyticsaccelerator.util.S3URI;

/**
Expand Down Expand Up @@ -103,23 +103,25 @@ public S3SeekableInputStream createStream(@NonNull S3URI s3URI, ObjectMetadata m
* Creates an instance of SeekableStream with file information
*
* @param s3URI the object's S3 URI
* @param openFileInformation known file information this key
* @param openStreamInformation known file information this key
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • @param openStreamInformation known file information for the file
    should be stream right?

* @return An instance of the input stream.
* @throws IOException IoException
*/
public S3SeekableInputStream createStream(
@NonNull S3URI s3URI, @NonNull OpenFileInformation openFileInformation) throws IOException {
storeObjectMetadata(s3URI, openFileInformation.getObjectMetadata());
return new S3SeekableInputStream(s3URI, createLogicalIO(s3URI, openFileInformation), telemetry);
@NonNull S3URI s3URI, @NonNull OpenStreamInformation openStreamInformation)
throws IOException {
storeObjectMetadata(s3URI, openStreamInformation.getObjectMetadata());
return new S3SeekableInputStream(
s3URI, createLogicalIO(s3URI, openStreamInformation), telemetry);
}

LogicalIO createLogicalIO(S3URI s3URI) throws IOException {
return createLogicalIO(s3URI, OpenFileInformation.DEFAULT);
return createLogicalIO(s3URI, OpenStreamInformation.DEFAULT);
}

LogicalIO createLogicalIO(S3URI s3URI, OpenFileInformation openFileInformation)
LogicalIO createLogicalIO(S3URI s3URI, OpenStreamInformation openStreamInformation)
throws IOException {
switch (objectFormatSelector.getObjectFormat(s3URI, openFileInformation)) {
switch (objectFormatSelector.getObjectFormat(s3URI, openStreamInformation)) {
case PARQUET:
return new ParquetLogicalIOImpl(
s3URI,
Expand All @@ -128,7 +130,7 @@ LogicalIO createLogicalIO(S3URI s3URI, OpenFileInformation openFileInformation)
objectMetadataStore,
objectBlobStore,
telemetry,
openFileInformation.getStreamContext()),
openStreamInformation.getStreamContext()),
telemetry,
configuration.getLogicalIOConfiguration(),
parquetColumnPrefetchStore);
Expand All @@ -141,7 +143,7 @@ LogicalIO createLogicalIO(S3URI s3URI, OpenFileInformation openFileInformation)
objectMetadataStore,
objectBlobStore,
telemetry,
openFileInformation.getStreamContext()),
openStreamInformation.getStreamContext()),
telemetry);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,19 +38,19 @@ public ObjectFormatSelector(LogicalIOConfiguration configuration) {
* Uses a regex matcher to select the file format based on the file extension of the key.
*
* @param s3URI the object's S3 URI
* @param openFileInformation known file information for the file
* @param openStreamInformation known file information for the file
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • @param openStreamInformation known file information for the file

should be known stream info?

* @return the file format of the object
*/
public ObjectFormat getObjectFormat(S3URI s3URI, OpenFileInformation openFileInformation) {
public ObjectFormat getObjectFormat(S3URI s3URI, OpenStreamInformation openStreamInformation) {

// If the supplied policy in open file information is Sequential, then use the default input
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

still says Open file information

// stream, regardless of the file format (even if it's parquet!). This is important for
// applications like DISTCP, which use a "whole_file" read policy with S3A, where they will
// read parquet file sequentially (as they simply need to copy over the file),
// instead of the regular parquet pattern of footer first, then specific columns etc., so our
// parquet specific optimisations are of no use there :(
if (openFileInformation.getInputPolicy() != null
&& openFileInformation.getInputPolicy().equals(InputPolicy.Sequential)) {
if (openStreamInformation.getInputPolicy() != null
&& openStreamInformation.getInputPolicy().equals(InputPolicy.Sequential)) {
return ObjectFormat.DEFAULT;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
import software.amazon.s3.analyticsaccelerator.io.logical.impl.ParquetLogicalIOImpl;
import software.amazon.s3.analyticsaccelerator.request.ObjectClient;
import software.amazon.s3.analyticsaccelerator.request.ObjectMetadata;
import software.amazon.s3.analyticsaccelerator.util.OpenFileInformation;
import software.amazon.s3.analyticsaccelerator.util.OpenStreamInformation;
import software.amazon.s3.analyticsaccelerator.util.S3URI;

@SuppressFBWarnings(
Expand Down Expand Up @@ -99,7 +99,7 @@ void testCreateDefaultStream() throws IOException {

inputStream =
s3SeekableInputStreamFactory.createStream(
S3URI.of("bucket", "key"), mock(OpenFileInformation.class));
S3URI.of("bucket", "key"), mock(OpenStreamInformation.class));
assertNotNull(inputStream);
}

Expand Down Expand Up @@ -156,7 +156,8 @@ void testCreateIndependentStream() throws IOException {
S3SeekableInputStream inputStream = s3SeekableInputStreamFactory.createStream(s3URI);
assertNotNull(inputStream);

inputStream = s3SeekableInputStreamFactory.createStream(s3URI, mock(OpenFileInformation.class));
inputStream =
s3SeekableInputStreamFactory.createStream(s3URI, mock(OpenStreamInformation.class));
assertNotNull(inputStream);
}

Expand All @@ -174,7 +175,7 @@ void testCreateStreamThrowsOnNullArgument() {
assertThrows(
NullPointerException.class,
() -> {
s3SeekableInputStreamFactory.createStream(null, mock(OpenFileInformation.class));
s3SeekableInputStreamFactory.createStream(null, mock(OpenStreamInformation.class));
});
}

Expand Down Expand Up @@ -206,17 +207,18 @@ void testCreateLogicalIO() throws IOException {

assertTrue(
s3SeekableInputStreamFactory.createLogicalIO(
testURIParquet, mock(OpenFileInformation.class))
testURIParquet, mock(OpenStreamInformation.class))
instanceof ParquetLogicalIOImpl);
assertTrue(
s3SeekableInputStreamFactory.createLogicalIO(testURIKEYPAR, mock(OpenFileInformation.class))
s3SeekableInputStreamFactory.createLogicalIO(
testURIKEYPAR, mock(OpenStreamInformation.class))
instanceof ParquetLogicalIOImpl);

assertTrue(
s3SeekableInputStreamFactory.createLogicalIO(testURIJAVA, mock(OpenFileInformation.class))
s3SeekableInputStreamFactory.createLogicalIO(testURIJAVA, mock(OpenStreamInformation.class))
instanceof DefaultLogicalIOImpl);
assertTrue(
s3SeekableInputStreamFactory.createLogicalIO(testURITXT, mock(OpenFileInformation.class))
s3SeekableInputStreamFactory.createLogicalIO(testURITXT, mock(OpenStreamInformation.class))
instanceof DefaultLogicalIOImpl);
}

Expand Down Expand Up @@ -267,7 +269,7 @@ private static void assertInputStreamReadExceptions(
new S3SeekableInputStreamFactory(
new S3SdkObjectClient(mockS3AsyncClient), S3SeekableInputStreamConfiguration.DEFAULT);
S3SeekableInputStream inputStream =
factory.createStream(TEST_URI, mock(OpenFileInformation.class));
factory.createStream(TEST_URI, mock(OpenStreamInformation.class));
Exception thrownException = assertThrows(Exception.class, inputStream::read);
assertInstanceOf(IOException.class, thrownException);
Optional.ofNullable(thrownException.getCause())
Expand All @@ -283,7 +285,8 @@ private static void assertInputStreamHeadException(
new S3SdkObjectClient(mockS3AsyncClient), S3SeekableInputStreamConfiguration.DEFAULT);
Exception thrownException =
assertThrows(
Exception.class, () -> factory.createStream(TEST_URI, mock(OpenFileInformation.class)));
Exception.class,
() -> factory.createStream(TEST_URI, mock(OpenStreamInformation.class)));
assertInstanceOf(IOException.class, thrownException);
Optional.ofNullable(thrownException.getCause())
.ifPresent(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ public void testDefaultConfigParquetLogicalIOSelection(String key) {
new ObjectFormatSelector(LogicalIOConfiguration.DEFAULT);

assertEquals(
objectFormatSelector.getObjectFormat(S3URI.of("bucket", key), OpenFileInformation.DEFAULT),
objectFormatSelector.getObjectFormat(
S3URI.of("bucket", key), OpenStreamInformation.DEFAULT),
ObjectFormat.PARQUET);
}

Expand All @@ -43,7 +44,8 @@ public void testConfiguredExtensionParquetLogicalIOSelection(String key) {
LogicalIOConfiguration.builder().parquetFormatSelectorRegex("^.*.(pr3|par3)$").build());

assertEquals(
objectFormatSelector.getObjectFormat(S3URI.of("bucket", key), OpenFileInformation.DEFAULT),
objectFormatSelector.getObjectFormat(
S3URI.of("bucket", key), OpenStreamInformation.DEFAULT),
ObjectFormat.PARQUET);
}

Expand All @@ -54,7 +56,8 @@ public void testNonParquetLogicalIOSelection(String key) {
new ObjectFormatSelector(LogicalIOConfiguration.DEFAULT);

assertEquals(
objectFormatSelector.getObjectFormat(S3URI.of("bucket", key), OpenFileInformation.DEFAULT),
objectFormatSelector.getObjectFormat(
S3URI.of("bucket", key), OpenStreamInformation.DEFAULT),
ObjectFormat.DEFAULT);
}

Expand All @@ -67,7 +70,7 @@ public void testDefaultLogicalIOSelectionWithSequentialInputPolicy(String key) {
assertEquals(
objectFormatSelector.getObjectFormat(
S3URI.of("bucket", key),
OpenFileInformation.builder().inputPolicy(InputPolicy.Sequential).build()),
OpenStreamInformation.builder().inputPolicy(InputPolicy.Sequential).build()),
ObjectFormat.DEFAULT);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -185,8 +185,6 @@ public CompletableFuture<ObjectContent> getObject(
referrerHeader = getRequest.getReferrer().toString();
}

LOG.info("auditHeaders {}", referrerHeader);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this logger still needed?


builder.overrideConfiguration(
AwsRequestOverrideConfiguration.builder()
.putHeader(HEADER_REFERER, referrerHeader)
Expand Down