From 56d389d790cc28875c67591f05b17f6d07da92b3 Mon Sep 17 00:00:00 2001 From: Ahmar Suhail Date: Fri, 21 Feb 2025 15:45:34 +0000 Subject: [PATCH 1/5] rename OpenFileInformation to OpenStreamInformation --- .../common/telemetry/TelemetryDatapoint.java | 4 +++- .../TelemetryDatapointMeasurement.java | 4 +++- ...mation.java => OpenStreamInformation.java} | 6 ++--- .../S3SeekableInputStreamFactory.java | 22 ++++++++++-------- .../util/ObjectFormatSelector.java | 8 +++---- .../S3SeekableInputStreamFactoryTest.java | 23 +++++++++++-------- .../util/ObjectFormatSelectorTest.java | 11 +++++---- .../S3SdkObjectClient.java | 2 -- 8 files changed, 45 insertions(+), 35 deletions(-) rename common/src/main/java/software/amazon/s3/analyticsaccelerator/util/{OpenFileInformation.java => OpenStreamInformation.java} (85%) diff --git a/common/src/main/java/software/amazon/s3/analyticsaccelerator/common/telemetry/TelemetryDatapoint.java b/common/src/main/java/software/amazon/s3/analyticsaccelerator/common/telemetry/TelemetryDatapoint.java index 882265e0..5533fc7d 100644 --- a/common/src/main/java/software/amazon/s3/analyticsaccelerator/common/telemetry/TelemetryDatapoint.java +++ b/common/src/main/java/software/amazon/s3/analyticsaccelerator/common/telemetry/TelemetryDatapoint.java @@ -117,7 +117,9 @@ public T build() { return buildCore(); } - /** @return new instance of whatever this builder builds */ + /** + * @return new instance of whatever this builder builds + */ protected abstract T buildCore(); } } diff --git a/common/src/main/java/software/amazon/s3/analyticsaccelerator/common/telemetry/TelemetryDatapointMeasurement.java b/common/src/main/java/software/amazon/s3/analyticsaccelerator/common/telemetry/TelemetryDatapointMeasurement.java index bb16bdb3..16d4905c 100644 --- a/common/src/main/java/software/amazon/s3/analyticsaccelerator/common/telemetry/TelemetryDatapointMeasurement.java +++ b/common/src/main/java/software/amazon/s3/analyticsaccelerator/common/telemetry/TelemetryDatapointMeasurement.java @@ -100,7 +100,9 @@ public T build() { return buildCore(); } - /** @return new instance of whatever this builder builds */ + /** + * @return new instance of whatever this builder builds + */ protected abstract T buildCore(); } } diff --git a/common/src/main/java/software/amazon/s3/analyticsaccelerator/util/OpenFileInformation.java b/common/src/main/java/software/amazon/s3/analyticsaccelerator/util/OpenStreamInformation.java similarity index 85% rename from common/src/main/java/software/amazon/s3/analyticsaccelerator/util/OpenFileInformation.java rename to common/src/main/java/software/amazon/s3/analyticsaccelerator/util/OpenStreamInformation.java index efa9be5d..adafcfe8 100644 --- a/common/src/main/java/software/amazon/s3/analyticsaccelerator/util/OpenFileInformation.java +++ b/common/src/main/java/software/amazon/s3/analyticsaccelerator/util/OpenStreamInformation.java @@ -26,11 +26,11 @@ */ @Value @Builder -public class OpenFileInformation { +public class OpenStreamInformation { StreamContext streamContext; ObjectMetadata objectMetadata; InputPolicy inputPolicy; - /** Default set of settings for {@link OpenFileInformation} */ - public static final OpenFileInformation DEFAULT = OpenFileInformation.builder().build(); + /** Default set of settings for {@link OpenStreamInformation} */ + public static final OpenStreamInformation DEFAULT = OpenStreamInformation.builder().build(); } diff --git a/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/S3SeekableInputStreamFactory.java b/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/S3SeekableInputStreamFactory.java index 16368f7b..081c4215 100644 --- a/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/S3SeekableInputStreamFactory.java +++ b/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/S3SeekableInputStreamFactory.java @@ -29,7 +29,7 @@ import software.amazon.s3.analyticsaccelerator.request.ObjectClient; import software.amazon.s3.analyticsaccelerator.request.ObjectMetadata; import software.amazon.s3.analyticsaccelerator.util.ObjectFormatSelector; -import software.amazon.s3.analyticsaccelerator.util.OpenFileInformation; +import software.amazon.s3.analyticsaccelerator.util.OpenStreamInformation; import software.amazon.s3.analyticsaccelerator.util.S3URI; /** @@ -103,23 +103,25 @@ public S3SeekableInputStream createStream(@NonNull S3URI s3URI, ObjectMetadata m * Creates an instance of SeekableStream with file information * * @param s3URI the object's S3 URI - * @param openFileInformation known file information this key + * @param openStreamInformation known file information this key * @return An instance of the input stream. * @throws IOException IoException */ public S3SeekableInputStream createStream( - @NonNull S3URI s3URI, @NonNull OpenFileInformation openFileInformation) throws IOException { - storeObjectMetadata(s3URI, openFileInformation.getObjectMetadata()); - return new S3SeekableInputStream(s3URI, createLogicalIO(s3URI, openFileInformation), telemetry); + @NonNull S3URI s3URI, @NonNull OpenStreamInformation openStreamInformation) + throws IOException { + storeObjectMetadata(s3URI, openStreamInformation.getObjectMetadata()); + return new S3SeekableInputStream( + s3URI, createLogicalIO(s3URI, openStreamInformation), telemetry); } LogicalIO createLogicalIO(S3URI s3URI) throws IOException { - return createLogicalIO(s3URI, OpenFileInformation.DEFAULT); + return createLogicalIO(s3URI, OpenStreamInformation.DEFAULT); } - LogicalIO createLogicalIO(S3URI s3URI, OpenFileInformation openFileInformation) + LogicalIO createLogicalIO(S3URI s3URI, OpenStreamInformation openStreamInformation) throws IOException { - switch (objectFormatSelector.getObjectFormat(s3URI, openFileInformation)) { + switch (objectFormatSelector.getObjectFormat(s3URI, openStreamInformation)) { case PARQUET: return new ParquetLogicalIOImpl( s3URI, @@ -128,7 +130,7 @@ LogicalIO createLogicalIO(S3URI s3URI, OpenFileInformation openFileInformation) objectMetadataStore, objectBlobStore, telemetry, - openFileInformation.getStreamContext()), + openStreamInformation.getStreamContext()), telemetry, configuration.getLogicalIOConfiguration(), parquetColumnPrefetchStore); @@ -141,7 +143,7 @@ LogicalIO createLogicalIO(S3URI s3URI, OpenFileInformation openFileInformation) objectMetadataStore, objectBlobStore, telemetry, - openFileInformation.getStreamContext()), + openStreamInformation.getStreamContext()), telemetry); } } diff --git a/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/util/ObjectFormatSelector.java b/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/util/ObjectFormatSelector.java index 45c7f327..43ffa6db 100644 --- a/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/util/ObjectFormatSelector.java +++ b/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/util/ObjectFormatSelector.java @@ -38,10 +38,10 @@ public ObjectFormatSelector(LogicalIOConfiguration configuration) { * Uses a regex matcher to select the file format based on the file extension of the key. * * @param s3URI the object's S3 URI - * @param openFileInformation known file information for the file + * @param openStreamInformation known file information for the file * @return the file format of the object */ - public ObjectFormat getObjectFormat(S3URI s3URI, OpenFileInformation openFileInformation) { + public ObjectFormat getObjectFormat(S3URI s3URI, OpenStreamInformation openStreamInformation) { // If the supplied policy in open file information is Sequential, then use the default input // stream, regardless of the file format (even if it's parquet!). This is important for @@ -49,8 +49,8 @@ public ObjectFormat getObjectFormat(S3URI s3URI, OpenFileInformation openFileInf // read parquet file sequentially (as they simply need to copy over the file), // instead of the regular parquet pattern of footer first, then specific columns etc., so our // parquet specific optimisations are of no use there :( - if (openFileInformation.getInputPolicy() != null - && openFileInformation.getInputPolicy().equals(InputPolicy.Sequential)) { + if (openStreamInformation.getInputPolicy() != null + && openStreamInformation.getInputPolicy().equals(InputPolicy.Sequential)) { return ObjectFormat.DEFAULT; } diff --git a/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/S3SeekableInputStreamFactoryTest.java b/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/S3SeekableInputStreamFactoryTest.java index 8d63ea6c..2e4dbfbd 100644 --- a/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/S3SeekableInputStreamFactoryTest.java +++ b/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/S3SeekableInputStreamFactoryTest.java @@ -40,7 +40,7 @@ import software.amazon.s3.analyticsaccelerator.io.logical.impl.ParquetLogicalIOImpl; import software.amazon.s3.analyticsaccelerator.request.ObjectClient; import software.amazon.s3.analyticsaccelerator.request.ObjectMetadata; -import software.amazon.s3.analyticsaccelerator.util.OpenFileInformation; +import software.amazon.s3.analyticsaccelerator.util.OpenStreamInformation; import software.amazon.s3.analyticsaccelerator.util.S3URI; @SuppressFBWarnings( @@ -99,7 +99,7 @@ void testCreateDefaultStream() throws IOException { inputStream = s3SeekableInputStreamFactory.createStream( - S3URI.of("bucket", "key"), mock(OpenFileInformation.class)); + S3URI.of("bucket", "key"), mock(OpenStreamInformation.class)); assertNotNull(inputStream); } @@ -156,7 +156,8 @@ void testCreateIndependentStream() throws IOException { S3SeekableInputStream inputStream = s3SeekableInputStreamFactory.createStream(s3URI); assertNotNull(inputStream); - inputStream = s3SeekableInputStreamFactory.createStream(s3URI, mock(OpenFileInformation.class)); + inputStream = + s3SeekableInputStreamFactory.createStream(s3URI, mock(OpenStreamInformation.class)); assertNotNull(inputStream); } @@ -174,7 +175,7 @@ void testCreateStreamThrowsOnNullArgument() { assertThrows( NullPointerException.class, () -> { - s3SeekableInputStreamFactory.createStream(null, mock(OpenFileInformation.class)); + s3SeekableInputStreamFactory.createStream(null, mock(OpenStreamInformation.class)); }); } @@ -206,17 +207,18 @@ void testCreateLogicalIO() throws IOException { assertTrue( s3SeekableInputStreamFactory.createLogicalIO( - testURIParquet, mock(OpenFileInformation.class)) + testURIParquet, mock(OpenStreamInformation.class)) instanceof ParquetLogicalIOImpl); assertTrue( - s3SeekableInputStreamFactory.createLogicalIO(testURIKEYPAR, mock(OpenFileInformation.class)) + s3SeekableInputStreamFactory.createLogicalIO( + testURIKEYPAR, mock(OpenStreamInformation.class)) instanceof ParquetLogicalIOImpl); assertTrue( - s3SeekableInputStreamFactory.createLogicalIO(testURIJAVA, mock(OpenFileInformation.class)) + s3SeekableInputStreamFactory.createLogicalIO(testURIJAVA, mock(OpenStreamInformation.class)) instanceof DefaultLogicalIOImpl); assertTrue( - s3SeekableInputStreamFactory.createLogicalIO(testURITXT, mock(OpenFileInformation.class)) + s3SeekableInputStreamFactory.createLogicalIO(testURITXT, mock(OpenStreamInformation.class)) instanceof DefaultLogicalIOImpl); } @@ -267,7 +269,7 @@ private static void assertInputStreamReadExceptions( new S3SeekableInputStreamFactory( new S3SdkObjectClient(mockS3AsyncClient), S3SeekableInputStreamConfiguration.DEFAULT); S3SeekableInputStream inputStream = - factory.createStream(TEST_URI, mock(OpenFileInformation.class)); + factory.createStream(TEST_URI, mock(OpenStreamInformation.class)); Exception thrownException = assertThrows(Exception.class, inputStream::read); assertInstanceOf(IOException.class, thrownException); Optional.ofNullable(thrownException.getCause()) @@ -283,7 +285,8 @@ private static void assertInputStreamHeadException( new S3SdkObjectClient(mockS3AsyncClient), S3SeekableInputStreamConfiguration.DEFAULT); Exception thrownException = assertThrows( - Exception.class, () -> factory.createStream(TEST_URI, mock(OpenFileInformation.class))); + Exception.class, + () -> factory.createStream(TEST_URI, mock(OpenStreamInformation.class))); assertInstanceOf(IOException.class, thrownException); Optional.ofNullable(thrownException.getCause()) .ifPresent( diff --git a/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/util/ObjectFormatSelectorTest.java b/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/util/ObjectFormatSelectorTest.java index 00a19bef..6092a1d0 100644 --- a/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/util/ObjectFormatSelectorTest.java +++ b/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/util/ObjectFormatSelectorTest.java @@ -30,7 +30,8 @@ public void testDefaultConfigParquetLogicalIOSelection(String key) { new ObjectFormatSelector(LogicalIOConfiguration.DEFAULT); assertEquals( - objectFormatSelector.getObjectFormat(S3URI.of("bucket", key), OpenFileInformation.DEFAULT), + objectFormatSelector.getObjectFormat( + S3URI.of("bucket", key), OpenStreamInformation.DEFAULT), ObjectFormat.PARQUET); } @@ -43,7 +44,8 @@ public void testConfiguredExtensionParquetLogicalIOSelection(String key) { LogicalIOConfiguration.builder().parquetFormatSelectorRegex("^.*.(pr3|par3)$").build()); assertEquals( - objectFormatSelector.getObjectFormat(S3URI.of("bucket", key), OpenFileInformation.DEFAULT), + objectFormatSelector.getObjectFormat( + S3URI.of("bucket", key), OpenStreamInformation.DEFAULT), ObjectFormat.PARQUET); } @@ -54,7 +56,8 @@ public void testNonParquetLogicalIOSelection(String key) { new ObjectFormatSelector(LogicalIOConfiguration.DEFAULT); assertEquals( - objectFormatSelector.getObjectFormat(S3URI.of("bucket", key), OpenFileInformation.DEFAULT), + objectFormatSelector.getObjectFormat( + S3URI.of("bucket", key), OpenStreamInformation.DEFAULT), ObjectFormat.DEFAULT); } @@ -67,7 +70,7 @@ public void testDefaultLogicalIOSelectionWithSequentialInputPolicy(String key) { assertEquals( objectFormatSelector.getObjectFormat( S3URI.of("bucket", key), - OpenFileInformation.builder().inputPolicy(InputPolicy.Sequential).build()), + OpenStreamInformation.builder().inputPolicy(InputPolicy.Sequential).build()), ObjectFormat.DEFAULT); } } diff --git a/object-client/src/main/java/software/amazon/s3/analyticsaccelerator/S3SdkObjectClient.java b/object-client/src/main/java/software/amazon/s3/analyticsaccelerator/S3SdkObjectClient.java index e6ff7775..4e7d2d0d 100644 --- a/object-client/src/main/java/software/amazon/s3/analyticsaccelerator/S3SdkObjectClient.java +++ b/object-client/src/main/java/software/amazon/s3/analyticsaccelerator/S3SdkObjectClient.java @@ -185,8 +185,6 @@ public CompletableFuture getObject( referrerHeader = getRequest.getReferrer().toString(); } - LOG.info("auditHeaders {}", referrerHeader); - builder.overrideConfiguration( AwsRequestOverrideConfiguration.builder() .putHeader(HEADER_REFERER, referrerHeader) From c61e6e316c9e52929c2abea36eeb3dcf36131552 Mon Sep 17 00:00:00 2001 From: Ahmar Suhail Date: Fri, 21 Feb 2025 15:50:58 +0000 Subject: [PATCH 2/5] spotlessApply with Java 8 --- .../common/telemetry/TelemetryDatapoint.java | 4 +--- .../common/telemetry/TelemetryDatapointMeasurement.java | 4 +--- 2 files changed, 2 insertions(+), 6 deletions(-) diff --git a/common/src/main/java/software/amazon/s3/analyticsaccelerator/common/telemetry/TelemetryDatapoint.java b/common/src/main/java/software/amazon/s3/analyticsaccelerator/common/telemetry/TelemetryDatapoint.java index 5533fc7d..882265e0 100644 --- a/common/src/main/java/software/amazon/s3/analyticsaccelerator/common/telemetry/TelemetryDatapoint.java +++ b/common/src/main/java/software/amazon/s3/analyticsaccelerator/common/telemetry/TelemetryDatapoint.java @@ -117,9 +117,7 @@ public T build() { return buildCore(); } - /** - * @return new instance of whatever this builder builds - */ + /** @return new instance of whatever this builder builds */ protected abstract T buildCore(); } } diff --git a/common/src/main/java/software/amazon/s3/analyticsaccelerator/common/telemetry/TelemetryDatapointMeasurement.java b/common/src/main/java/software/amazon/s3/analyticsaccelerator/common/telemetry/TelemetryDatapointMeasurement.java index 16d4905c..bb16bdb3 100644 --- a/common/src/main/java/software/amazon/s3/analyticsaccelerator/common/telemetry/TelemetryDatapointMeasurement.java +++ b/common/src/main/java/software/amazon/s3/analyticsaccelerator/common/telemetry/TelemetryDatapointMeasurement.java @@ -100,9 +100,7 @@ public T build() { return buildCore(); } - /** - * @return new instance of whatever this builder builds - */ + /** @return new instance of whatever this builder builds */ protected abstract T buildCore(); } } From 0276857d7edd07f85d3c38eea04d33371cd7dfe6 Mon Sep 17 00:00:00 2001 From: Ahmar Suhail Date: Fri, 21 Feb 2025 15:52:40 +0000 Subject: [PATCH 3/5] remove logger --- .../amazon/s3/analyticsaccelerator/S3SdkObjectClient.java | 3 --- 1 file changed, 3 deletions(-) diff --git a/object-client/src/main/java/software/amazon/s3/analyticsaccelerator/S3SdkObjectClient.java b/object-client/src/main/java/software/amazon/s3/analyticsaccelerator/S3SdkObjectClient.java index 4e7d2d0d..59391260 100644 --- a/object-client/src/main/java/software/amazon/s3/analyticsaccelerator/S3SdkObjectClient.java +++ b/object-client/src/main/java/software/amazon/s3/analyticsaccelerator/S3SdkObjectClient.java @@ -23,8 +23,6 @@ import java.util.function.Function; import lombok.Getter; import lombok.NonNull; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import software.amazon.awssdk.awscore.AwsRequestOverrideConfiguration; import software.amazon.awssdk.core.async.AsyncResponseTransformer; import software.amazon.awssdk.services.s3.S3AsyncClient; @@ -41,7 +39,6 @@ public class S3SdkObjectClient implements ObjectClient { private static final String HEADER_USER_AGENT = "User-Agent"; private static final String HEADER_REFERER = "Referer"; - private static final Logger LOG = LoggerFactory.getLogger(S3SdkObjectClient.class); @Getter @NonNull private final S3AsyncClient s3AsyncClient; @NonNull private final Telemetry telemetry; From 569ff08af83d44cd3131d7bd1378e1bda31f4356 Mon Sep 17 00:00:00 2001 From: Ahmar Suhail Date: Fri, 21 Feb 2025 15:56:18 +0000 Subject: [PATCH 4/5] fix typo --- .../s3/analyticsaccelerator/util/ObjectFormatSelector.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/util/ObjectFormatSelector.java b/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/util/ObjectFormatSelector.java index 43ffa6db..5a3db2f8 100644 --- a/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/util/ObjectFormatSelector.java +++ b/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/util/ObjectFormatSelector.java @@ -43,7 +43,7 @@ public ObjectFormatSelector(LogicalIOConfiguration configuration) { */ public ObjectFormat getObjectFormat(S3URI s3URI, OpenStreamInformation openStreamInformation) { - // If the supplied policy in open file information is Sequential, then use the default input + // If the supplied policy in open stream information is Sequential, then use the default input // stream, regardless of the file format (even if it's parquet!). This is important for // applications like DISTCP, which use a "whole_file" read policy with S3A, where they will // read parquet file sequentially (as they simply need to copy over the file), From dce97df1bf0d646ad64117147f5d7089931fd37c Mon Sep 17 00:00:00 2001 From: Ahmar Suhail Date: Fri, 21 Feb 2025 16:03:31 +0000 Subject: [PATCH 5/5] more typos --- .../s3/analyticsaccelerator/S3SeekableInputStreamFactory.java | 2 +- .../s3/analyticsaccelerator/util/ObjectFormatSelector.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/S3SeekableInputStreamFactory.java b/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/S3SeekableInputStreamFactory.java index 081c4215..a4f19b13 100644 --- a/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/S3SeekableInputStreamFactory.java +++ b/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/S3SeekableInputStreamFactory.java @@ -103,7 +103,7 @@ public S3SeekableInputStream createStream(@NonNull S3URI s3URI, ObjectMetadata m * Creates an instance of SeekableStream with file information * * @param s3URI the object's S3 URI - * @param openStreamInformation known file information this key + * @param openStreamInformation known information for this key * @return An instance of the input stream. * @throws IOException IoException */ diff --git a/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/util/ObjectFormatSelector.java b/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/util/ObjectFormatSelector.java index 5a3db2f8..21d34304 100644 --- a/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/util/ObjectFormatSelector.java +++ b/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/util/ObjectFormatSelector.java @@ -38,7 +38,7 @@ public ObjectFormatSelector(LogicalIOConfiguration configuration) { * Uses a regex matcher to select the file format based on the file extension of the key. * * @param s3URI the object's S3 URI - * @param openStreamInformation known file information for the file + * @param openStreamInformation known information for this key * @return the file format of the object */ public ObjectFormat getObjectFormat(S3URI s3URI, OpenStreamInformation openStreamInformation) {