From dfee1eefdd37dd235ead33dc3bd91b9e0559425d Mon Sep 17 00:00:00 2001 From: Ahmar Suhail Date: Sat, 24 Aug 2024 09:20:30 +0100 Subject: [PATCH] adds some tests --- .../s3/S3SeekableInputStreamFactory.java | 2 +- .../io/physical/PhysicalIOConfiguration.java | 16 ----- .../s3/io/physical/data/BlockStore.java | 44 +++++++++---- .../s3/io/physical/data/MemoryTracker.java | 27 -------- .../s3/S3SeekableInputStreamTestBase.java | 2 +- .../impl/ParquetLogicalIOImplTest.java | 6 +- .../s3/io/physical/data/BlockManagerTest.java | 2 +- .../s3/io/physical/data/BlockStoreTest.java | 61 +++++++++++++++++-- .../io/physical/data/MemoryTrackerTest.java | 12 +--- 9 files changed, 95 insertions(+), 77 deletions(-) diff --git a/input-stream/src/main/java/com/amazon/connector/s3/S3SeekableInputStreamFactory.java b/input-stream/src/main/java/com/amazon/connector/s3/S3SeekableInputStreamFactory.java index b93355fa..a423d823 100644 --- a/input-stream/src/main/java/com/amazon/connector/s3/S3SeekableInputStreamFactory.java +++ b/input-stream/src/main/java/com/amazon/connector/s3/S3SeekableInputStreamFactory.java @@ -48,7 +48,7 @@ public S3SeekableInputStreamFactory( this.parquetMetadataStore = new ParquetMetadataStore(configuration.getLogicalIOConfiguration()); this.objectMetadataStore = new MetadataStore(objectClient, telemetry, configuration.getPhysicalIOConfiguration()); - this.memoryTracker = new MemoryTracker(configuration.getPhysicalIOConfiguration()); + this.memoryTracker = new MemoryTracker(); this.objectBlobStore = new BlobStore( objectMetadataStore, diff --git a/input-stream/src/main/java/com/amazon/connector/s3/io/physical/PhysicalIOConfiguration.java b/input-stream/src/main/java/com/amazon/connector/s3/io/physical/PhysicalIOConfiguration.java index a2d89748..357a84f8 100644 --- a/input-stream/src/main/java/com/amazon/connector/s3/io/physical/PhysicalIOConfiguration.java +++ b/input-stream/src/main/java/com/amazon/connector/s3/io/physical/PhysicalIOConfiguration.java @@ -21,8 +21,6 @@ public class PhysicalIOConfiguration { private static final long DEFAULT_READ_AHEAD_BYTES = 64 * ONE_KB; private static final long DEFAULT_MAX_RANGE_SIZE = 8 * ONE_MB; private static final long DEFAULT_PART_SIZE = 8 * ONE_MB; - // TODO: For some reason when I set this to 4 * ONE_GB, things start failing :( - private static final long DEFAULT_MAX_MEMORY_LIMIT_BYTES = 4 * ONE_MB; public static final int DEFAULT_CACHE_EVICTION_TIME_MILLIS = 15 * 1000; /** Capacity, in blobs. {@link PhysicalIOConfiguration#DEFAULT_CAPACITY_BLOB_STORE} by default. */ @@ -58,14 +56,6 @@ public class PhysicalIOConfiguration { /** Part size, in bytes. {@link PhysicalIOConfiguration#DEFAULT_PART_SIZE} by default. */ @Builder.Default private long partSizeBytes = DEFAULT_PART_SIZE; - /** - * Max memory a single instance of the S3SeekableInputStreamFactory can use, in bytes. {@link - * PhysicalIOConfiguration#DEFAULT_PART_SIZE} by default. - */ - @Builder.Default private long maxMemoryLimitBytes = DEFAULT_MAX_MEMORY_LIMIT_BYTES; - - private static final String MAX_MEMORY_BYTES_KEY = "max.memory.bytes"; - /** * Cache eviction time in milliseconds. {@link * PhysicalIOConfiguration#DEFAULT_CACHE_EVICTION_TIME_MILLIS} by default. @@ -95,8 +85,6 @@ public static PhysicalIOConfiguration fromConfiguration(ConnectorConfiguration c .readAheadBytes(configuration.getLong(READ_AHEAD_BYTES_KEY, DEFAULT_READ_AHEAD_BYTES)) .maxRangeSizeBytes(configuration.getLong(MAX_RANGE_SIZE_BYTES_KEY, DEFAULT_MAX_RANGE_SIZE)) .partSizeBytes(configuration.getLong(PART_SIZE_BYTES_KEY, DEFAULT_PART_SIZE)) - .maxMemoryLimitBytes( - configuration.getLong(MAX_MEMORY_BYTES_KEY, DEFAULT_MAX_MEMORY_LIMIT_BYTES)) .cacheEvictionTimeMillis( configuration.getInt(CACHE_EVICTION_TIME_KEY, DEFAULT_CACHE_EVICTION_TIME_MILLIS)) .build(); @@ -111,7 +99,6 @@ public static PhysicalIOConfiguration fromConfiguration(ConnectorConfiguration c * @param readAheadBytes Read ahead, in bytes * @param maxRangeSizeBytes Maximum physical read issued against the object store * @param partSizeBytes What part size to use when splitting up logical reads - * @param maxMemoryLimitBytes The maximum memory in bytes a single input stream factory may use * @param cacheEvictionTimeMillis Cache eviction time in milliseconds */ @Builder @@ -122,7 +109,6 @@ private PhysicalIOConfiguration( long readAheadBytes, long maxRangeSizeBytes, long partSizeBytes, - long maxMemoryLimitBytes, int cacheEvictionTimeMillis) { Preconditions.checkArgument(blobStoreCapacity > 0, "`blobStoreCapacity` must be positive"); Preconditions.checkArgument( @@ -131,7 +117,6 @@ private PhysicalIOConfiguration( Preconditions.checkArgument(readAheadBytes > 0, "`readAheadLengthBytes` must be positive"); Preconditions.checkArgument(maxRangeSizeBytes > 0, "`maxRangeSize` must be positive"); Preconditions.checkArgument(partSizeBytes > 0, "`partSize` must be positive"); - Preconditions.checkArgument(maxMemoryLimitBytes > 0, "`maxMemoryLimitBytes` must be positive"); Preconditions.checkArgument( cacheEvictionTimeMillis > 0, "`cacheEvictionTime` must be positive"); @@ -141,7 +126,6 @@ private PhysicalIOConfiguration( this.readAheadBytes = readAheadBytes; this.maxRangeSizeBytes = maxRangeSizeBytes; this.partSizeBytes = partSizeBytes; - this.maxMemoryLimitBytes = maxMemoryLimitBytes; this.cacheEvictionTimeMillis = cacheEvictionTimeMillis; } } diff --git a/input-stream/src/main/java/com/amazon/connector/s3/io/physical/data/BlockStore.java b/input-stream/src/main/java/com/amazon/connector/s3/io/physical/data/BlockStore.java index 97aa6cc2..b646651e 100644 --- a/input-stream/src/main/java/com/amazon/connector/s3/io/physical/data/BlockStore.java +++ b/input-stream/src/main/java/com/amazon/connector/s3/io/physical/data/BlockStore.java @@ -38,24 +38,44 @@ public BlockStore( MetadataStore metadataStore, PhysicalIOConfiguration configuration, MemoryTracker memoryTracker) { - Preconditions.checkNotNull(s3URI, "`s3URI` must not be null"); - Preconditions.checkNotNull(metadataStore, "`metadataStore` must not be null"); - this.s3URI = s3URI; - this.metadataStore = metadataStore; - this.configuration = configuration; - this.blockCount = 0; - this.blocks = + this( + s3URI, + metadataStore, + configuration, + memoryTracker, Caffeine.newBuilder() .maximumSize(configuration.getBlobStoreCapacity()) .expireAfterWrite(Duration.ofMillis(configuration.getCacheEvictionTimeMillis())) - .removalListener(this::removalListener) - .build(); - this.memoryTracker = memoryTracker; + .removalListener( + (Integer key, Block block, RemovalCause cause) -> + memoryTracker.freeMemory(block.getLength())) + .build()); } - private void removalListener(Integer key, Block block, RemovalCause cause) { - memoryTracker.freeMemory(block.getLength()); + /** + * Constructs a new instance of a BlockStore. This version helps with dependency injection. + * + * @param s3URI s3URI the object's S3 URI + * @param metadataStore the metadata cache + * @param configuration physicalIO configuration + * @param memoryTracker the memory tracker + * @param blockCache block cache to use + */ + protected BlockStore( + S3URI s3URI, + MetadataStore metadataStore, + PhysicalIOConfiguration configuration, + MemoryTracker memoryTracker, + Cache blockCache) { + + Preconditions.checkNotNull(s3URI, "`s3URI` must not be null"); + Preconditions.checkNotNull(metadataStore, "`metadataStore` must not be null"); + this.s3URI = s3URI; + this.metadataStore = metadataStore; + this.configuration = configuration; + this.memoryTracker = memoryTracker; + this.blocks = blockCache; } /** diff --git a/input-stream/src/main/java/com/amazon/connector/s3/io/physical/data/MemoryTracker.java b/input-stream/src/main/java/com/amazon/connector/s3/io/physical/data/MemoryTracker.java index 35eb9458..1d7e9b73 100644 --- a/input-stream/src/main/java/com/amazon/connector/s3/io/physical/data/MemoryTracker.java +++ b/input-stream/src/main/java/com/amazon/connector/s3/io/physical/data/MemoryTracker.java @@ -1,10 +1,6 @@ package com.amazon.connector.s3.io.physical.data; -import com.amazon.connector.s3.io.physical.PhysicalIOConfiguration; import lombok.Getter; -import lombok.NonNull; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; /** * Responsible for tracking total memory used by an instance of {@link @@ -12,19 +8,7 @@ */ public class MemoryTracker { - private static final Logger LOG = LogManager.getLogger(MemoryTracker.class); - @Getter long memoryUsed; - long maxMemoryAllowed; - - /** - * Constructs an instance of MemoryTracker. - * - * @param configuration physicalIO configuration - */ - public MemoryTracker(@NonNull PhysicalIOConfiguration configuration) { - this.maxMemoryAllowed = configuration.getMaxMemoryLimitBytes(); - } /** * Increment memory used. @@ -33,7 +17,6 @@ public MemoryTracker(@NonNull PhysicalIOConfiguration configuration) { * @return total memory in use */ public long incrementMemoryUsed(long bytesAllocated) { - LOG.info("MEMORY ALLOCATED {}", bytesAllocated); this.memoryUsed += bytesAllocated; return memoryUsed; } @@ -45,17 +28,7 @@ public long incrementMemoryUsed(long bytesAllocated) { * @return total memory in ues */ public long freeMemory(long bytesFreed) { - LOG.info("MEMORY FREED {}", bytesFreed); this.memoryUsed -= bytesFreed; return memoryUsed; } - - /** - * Gets available memory. - * - * @return available memory - */ - public long getAvailableMemory() { - return maxMemoryAllowed - memoryUsed; - } } diff --git a/input-stream/src/test/java/com/amazon/connector/s3/S3SeekableInputStreamTestBase.java b/input-stream/src/test/java/com/amazon/connector/s3/S3SeekableInputStreamTestBase.java index 333cf580..e8289e4a 100644 --- a/input-stream/src/test/java/com/amazon/connector/s3/S3SeekableInputStreamTestBase.java +++ b/input-stream/src/test/java/com/amazon/connector/s3/S3SeekableInputStreamTestBase.java @@ -19,7 +19,7 @@ public class S3SeekableInputStreamTestBase { protected final PhysicalIOConfiguration physicalIOConfiguration = PhysicalIOConfiguration.DEFAULT; protected final FakeObjectClient fakeObjectClient = new FakeObjectClient(TEST_DATA); - protected final MemoryTracker memoryTracker = new MemoryTracker(physicalIOConfiguration); + protected final MemoryTracker memoryTracker = new MemoryTracker(); protected final MetadataStore metadataStore = new MetadataStore(fakeObjectClient, TestTelemetry.DEFAULT, PhysicalIOConfiguration.DEFAULT); protected final BlobStore blobStore = diff --git a/input-stream/src/test/java/com/amazon/connector/s3/io/logical/impl/ParquetLogicalIOImplTest.java b/input-stream/src/test/java/com/amazon/connector/s3/io/logical/impl/ParquetLogicalIOImplTest.java index 78e60a46..4227ce0d 100644 --- a/input-stream/src/test/java/com/amazon/connector/s3/io/logical/impl/ParquetLogicalIOImplTest.java +++ b/input-stream/src/test/java/com/amazon/connector/s3/io/logical/impl/ParquetLogicalIOImplTest.java @@ -107,7 +107,6 @@ void testCloseDependencies() throws IOException { @Test void testMetadaWithZeroContentLength() { ObjectClient mockClient = mock(ObjectClient.class); - MemoryTracker memoryTracker = new MemoryTracker(PhysicalIOConfiguration.DEFAULT); when(mockClient.headObject(any(HeadRequest.class))) .thenReturn( CompletableFuture.completedFuture(ObjectMetadata.builder().contentLength(0).build())); @@ -120,7 +119,7 @@ void testMetadaWithZeroContentLength() { mockClient, TestTelemetry.DEFAULT, PhysicalIOConfiguration.DEFAULT, - memoryTracker); + mock(MemoryTracker.class)); PhysicalIOImpl physicalIO = new PhysicalIOImpl(s3URI, metadataStore, blobStore, TestTelemetry.DEFAULT); assertDoesNotThrow( @@ -136,7 +135,6 @@ void testMetadaWithZeroContentLength() { @Test void testMetadataWithNegativeContentLength() { ObjectClient mockClient = mock(ObjectClient.class); - MemoryTracker memoryTracker = new MemoryTracker(PhysicalIOConfiguration.DEFAULT); when(mockClient.headObject(any(HeadRequest.class))) .thenReturn( CompletableFuture.completedFuture(ObjectMetadata.builder().contentLength(-1).build())); @@ -149,7 +147,7 @@ void testMetadataWithNegativeContentLength() { mockClient, TestTelemetry.DEFAULT, PhysicalIOConfiguration.DEFAULT, - memoryTracker); + mock(MemoryTracker.class)); PhysicalIOImpl physicalIO = new PhysicalIOImpl(s3URI, metadataStore, blobStore, TestTelemetry.DEFAULT); assertDoesNotThrow( diff --git a/input-stream/src/test/java/com/amazon/connector/s3/io/physical/data/BlockManagerTest.java b/input-stream/src/test/java/com/amazon/connector/s3/io/physical/data/BlockManagerTest.java index d1f499ee..ca834f39 100644 --- a/input-stream/src/test/java/com/amazon/connector/s3/io/physical/data/BlockManagerTest.java +++ b/input-stream/src/test/java/com/amazon/connector/s3/io/physical/data/BlockManagerTest.java @@ -195,6 +195,6 @@ private BlockManager getTestBlockManager(ObjectClient objectClient, int size) { metadataStore, TestTelemetry.DEFAULT, PhysicalIOConfiguration.DEFAULT, - new MemoryTracker(PhysicalIOConfiguration.DEFAULT)); + mock(MemoryTracker.class)); } } diff --git a/input-stream/src/test/java/com/amazon/connector/s3/io/physical/data/BlockStoreTest.java b/input-stream/src/test/java/com/amazon/connector/s3/io/physical/data/BlockStoreTest.java index 3612a9fd..ad47e5c5 100644 --- a/input-stream/src/test/java/com/amazon/connector/s3/io/physical/data/BlockStoreTest.java +++ b/input-stream/src/test/java/com/amazon/connector/s3/io/physical/data/BlockStoreTest.java @@ -12,6 +12,11 @@ import com.amazon.connector.s3.request.ReadMode; import com.amazon.connector.s3.util.FakeObjectClient; import com.amazon.connector.s3.util.S3URI; +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.RemovalCause; +import java.nio.charset.StandardCharsets; +import java.time.Duration; import java.util.Optional; import java.util.OptionalLong; import org.junit.jupiter.api.Test; @@ -24,11 +29,11 @@ public class BlockStoreTest { public void test__blockStore__getBlockAfterAddBlock() { // Given: empty BlockStore FakeObjectClient fakeObjectClient = new FakeObjectClient("test-data"); - MemoryTracker memoryTracker = new MemoryTracker(PhysicalIOConfiguration.DEFAULT); MetadataStore metadataStore = new MetadataStore(fakeObjectClient, TestTelemetry.DEFAULT, PhysicalIOConfiguration.DEFAULT); BlockStore blockStore = - new BlockStore(TEST_URI, metadataStore, PhysicalIOConfiguration.DEFAULT, memoryTracker); + new BlockStore( + TEST_URI, metadataStore, PhysicalIOConfiguration.DEFAULT, mock(MemoryTracker.class)); // When: a new block is added blockStore.add( @@ -48,11 +53,11 @@ public void test__blockStore__findNextMissingByteCorrect() { // Given: BlockStore with blocks (2,3), (5,10), (12,15) final String X_TIMES_16 = "xxxxxxxxxxxxxxxx"; FakeObjectClient fakeObjectClient = new FakeObjectClient(X_TIMES_16); - MemoryTracker memoryTracker = new MemoryTracker(PhysicalIOConfiguration.DEFAULT); MetadataStore metadataStore = new MetadataStore(fakeObjectClient, TestTelemetry.DEFAULT, PhysicalIOConfiguration.DEFAULT); BlockStore blockStore = - new BlockStore(TEST_URI, metadataStore, PhysicalIOConfiguration.DEFAULT, memoryTracker); + new BlockStore( + TEST_URI, metadataStore, PhysicalIOConfiguration.DEFAULT, mock(MemoryTracker.class)); blockStore.add( new Block(TEST_URI, fakeObjectClient, TestTelemetry.DEFAULT, 2, 3, 0, ReadMode.SYNC)); @@ -79,9 +84,9 @@ public void test__blockStore__findNextAvailableByteCorrect() { FakeObjectClient fakeObjectClient = new FakeObjectClient(X_TIMES_16); MetadataStore metadataStore = new MetadataStore(fakeObjectClient, TestTelemetry.DEFAULT, PhysicalIOConfiguration.DEFAULT); - MemoryTracker memoryTracker = new MemoryTracker(PhysicalIOConfiguration.DEFAULT); BlockStore blockStore = - new BlockStore(TEST_URI, metadataStore, PhysicalIOConfiguration.DEFAULT, memoryTracker); + new BlockStore( + TEST_URI, metadataStore, PhysicalIOConfiguration.DEFAULT, mock(MemoryTracker.class)); blockStore.add( new Block(TEST_URI, fakeObjectClient, TestTelemetry.DEFAULT, 2, 3, 0, ReadMode.SYNC)); @@ -141,4 +146,48 @@ public void test__blockStore__closeWorksWithExceptions() { // Then: 1\ blockStore.close did not throw, 2\ b2 was closed verify(b2, times(1)).close(); } + + @Test + public void testMemoryTrackedOnEviction() { + // Given: BlockStore with a block + MemoryTracker memoryTracker = mock(MemoryTracker.class); + PhysicalIOConfiguration configuration = + PhysicalIOConfiguration.builder().blobStoreCapacity(3).build(); + + // Cache which will do evictions synchronously to aid testing. + Cache blockCache = + Caffeine.newBuilder() + .maximumSize(configuration.getBlobStoreCapacity()) + .expireAfterWrite(Duration.ofMillis(configuration.getCacheEvictionTimeMillis())) + .executor(Runnable::run) + .removalListener( + (Integer key, Block block, RemovalCause cause) -> + memoryTracker.freeMemory(block.getLength())) + .build(); + + BlockStore blockStore = + new BlockStore( + TEST_URI, mock(MetadataStore.class), configuration, memoryTracker, blockCache); + + byte[] content = new byte[101]; + FakeObjectClient fakeObjectClient = + new FakeObjectClient(new String(content, StandardCharsets.UTF_8)); + Block block = + new Block(TEST_URI, fakeObjectClient, TestTelemetry.DEFAULT, 100, 200, 0, ReadMode.SYNC); + + blockStore.add(block); + blockStore.add(block); + blockStore.add(block); + + verify(memoryTracker, times(3)).incrementMemoryUsed(101); + + // Cache is of size 3, check that an element is evicted when a fourth element is added. + blockStore.add(block); + verify(memoryTracker, times(1)).freeMemory(101); + + // When: blockStore is closed + blockStore.close(); + // All allocated + verify(memoryTracker, times(4)).freeMemory(101); + } } diff --git a/input-stream/src/test/java/com/amazon/connector/s3/io/physical/data/MemoryTrackerTest.java b/input-stream/src/test/java/com/amazon/connector/s3/io/physical/data/MemoryTrackerTest.java index e9c535cf..19724ec1 100644 --- a/input-stream/src/test/java/com/amazon/connector/s3/io/physical/data/MemoryTrackerTest.java +++ b/input-stream/src/test/java/com/amazon/connector/s3/io/physical/data/MemoryTrackerTest.java @@ -2,9 +2,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.api.Assertions.assertThrows; -import com.amazon.connector.s3.io.physical.PhysicalIOConfiguration; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import org.junit.jupiter.api.Test; @@ -13,21 +11,17 @@ justification = "We mean to pass nulls to checks") public class MemoryTrackerTest { - @Test - void testCreateBoundaries() { - assertThrows(NullPointerException.class, () -> new MemoryTracker(null)); - } - @Test void testDefaultConstructor() { - assertNotNull(new MemoryTracker(PhysicalIOConfiguration.DEFAULT)); + assertNotNull(new MemoryTracker()); } @Test void testMemoryUsed() { - MemoryTracker memoryTracker = new MemoryTracker(PhysicalIOConfiguration.DEFAULT); + MemoryTracker memoryTracker = new MemoryTracker(); assertEquals(0, memoryTracker.getMemoryUsed()); assertEquals(500, memoryTracker.incrementMemoryUsed(500)); assertEquals(200, memoryTracker.freeMemory(300)); + assertEquals(200, memoryTracker.getMemoryUsed()); } }