Skip to content

Commit

Permalink
Added extra logging to InputStream reader (#230)
Browse files Browse the repository at this point in the history
## Description of change
This PR adds extra logging to StreamUtils to be able to debug the
timeout issues.

#### Relevant issues
- #229
- #219

#### Does this contribution introduce any breaking changes to the
existing APIs or behaviors?
No

#### Does this contribution introduce any new public APIs or behaviors?
No

#### How was the contribution tested?
- Unit tests

#### Does this contribution need a changelog entry?
No

---

By submitting this pull request, I confirm that my contribution is made
under the terms of the Apache 2.0 license and I agree to the terms of
the [Developer Certificate of Origin
(DCO)](https://developercertificate.org/).

---------

Co-authored-by: Erdogan Ozkoca <[email protected]>
  • Loading branch information
ozkoca and Erdogan Ozkoca authored Feb 25, 2025
1 parent 3d522c2 commit 964ab0c
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,8 @@ private void generateSourceAndData() throws IOException {
this.source.thenApply(
objectContent -> {
try {
return StreamUtils.toByteArray(objectContent, TIMEOUT_MILLIS);
return StreamUtils.toByteArray(
objectContent, this.objectKey, this.range, TIMEOUT_MILLIS);
} catch (IOException | TimeoutException e) {
throw new RuntimeException(
"Error while converting InputStream to byte array", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.s3.analyticsaccelerator.request.ObjectContent;
import software.amazon.s3.analyticsaccelerator.request.Range;

/** Utility class for stream operations. */
public class StreamUtils {
Expand All @@ -35,10 +36,13 @@ public class StreamUtils {
* Convert an InputStream from the underlying object to a byte array.
*
* @param objectContent the part of the object
* @param objectKey container for S3 object to read
* @param range range of the S3 object to read
* @param timeoutMs read timeout in milliseconds
* @return a byte array
*/
public static byte[] toByteArray(ObjectContent objectContent, long timeoutMs)
public static byte[] toByteArray(
ObjectContent objectContent, ObjectKey objectKey, Range range, long timeoutMs)
throws IOException, TimeoutException {
InputStream inStream = objectContent.getStream();
ByteArrayOutputStream outStream = new ByteArrayOutputStream();
Expand All @@ -50,11 +54,22 @@ public static byte[] toByteArray(ObjectContent objectContent, long timeoutMs)
() -> {
try {
int numBytesRead;
LOG.info("Starting to read from InputStream");
LOG.info(
"Starting to read from InputStream for Block s3URI={}, etag={}, start={}, end={}",
objectKey.s3URI,
objectKey.etag,
range.getStart(),
range.getEnd());
while ((numBytesRead = inStream.read(buffer, 0, buffer.length)) != -1) {
outStream.write(buffer, 0, numBytesRead);
}
LOG.info("Successfully read from InputStream");
LOG.info(
"Successfully read from InputStream for Block numBytesRead={}, s3URI={}, etag={}, start={}, end={}",
numBytesRead,
objectKey.s3URI,
objectKey.etag,
range.getStart(),
range.getEnd());
return null;
} finally {
inStream.close();
Expand All @@ -66,7 +81,12 @@ public static byte[] toByteArray(ObjectContent objectContent, long timeoutMs)

} catch (TimeoutException e) {
future.cancel(true);
LOG.warn("Reading from InputStream has timed out.");
LOG.warn(
"Reading from InputStream has timed out for Block s3URI={}, etag={}, start={}, end={}",
objectKey.s3URI,
objectKey.etag,
range.getStart(),
range.getEnd());
throw new TimeoutException("Read operation timed out");
} catch (Exception e) {
throw new IOException("Error reading stream", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,16 @@
import lombok.SneakyThrows;
import org.junit.jupiter.api.Test;
import software.amazon.s3.analyticsaccelerator.request.ObjectContent;
import software.amazon.s3.analyticsaccelerator.request.Range;

public class StreamUtilsTest {

private static final long TIMEOUT_MILLIS = 1_000;
private static final S3URI TEST_S3URI = S3URI.of("test-bucket", "test-key");
private static final String TEST_ETAG = "test-etag";
private static final Range TEST_RANGE = new Range(0, 20);
private static final ObjectKey TEST_OBJECT_KEY =
ObjectKey.builder().s3URI(TEST_S3URI).etag(TEST_ETAG).build();

@SneakyThrows
@Test
Expand All @@ -41,7 +47,8 @@ public void testToByteArrayWorksWithEmptyStream() {
ObjectContent.builder().stream(new ByteArrayInputStream(new byte[0])).build();

// When: toByteArray is called
byte[] buf = StreamUtils.toByteArray(objectContent, TIMEOUT_MILLIS);
byte[] buf =
StreamUtils.toByteArray(objectContent, TEST_OBJECT_KEY, TEST_RANGE, TIMEOUT_MILLIS);

// Then: returned byte array is empty
String content = new String(buf, StandardCharsets.UTF_8);
Expand All @@ -58,7 +65,8 @@ public void testToByteArrayConvertsCorrectly() {
ObjectContent objectContent = ObjectContent.builder().stream(inputStream).build();

// When: toByteArray is called
byte[] buf = StreamUtils.toByteArray(objectContent, TIMEOUT_MILLIS);
byte[] buf =
StreamUtils.toByteArray(objectContent, TEST_OBJECT_KEY, TEST_RANGE, TIMEOUT_MILLIS);

// Then: 'Hello World' is returned
assertEquals("Hello World", new String(buf, StandardCharsets.UTF_8));
Expand All @@ -82,7 +90,8 @@ void toByteArrayShouldThrowTimeoutExceptionWhenStreamReadTakesTooLong() throws E

// Test the timeout behavior
assertThrows(
TimeoutException.class, () -> StreamUtils.toByteArray(mockContent, TIMEOUT_MILLIS));
TimeoutException.class,
() -> StreamUtils.toByteArray(mockContent, TEST_OBJECT_KEY, TEST_RANGE, TIMEOUT_MILLIS));

// Verify the stream was accessed
verify(mockContent).getStream();
Expand Down

0 comments on commit 964ab0c

Please sign in to comment.