Skip to content

Commit

Permalink
adds some tests
Browse files Browse the repository at this point in the history
  • Loading branch information
ahmarsuhail committed Aug 24, 2024
1 parent e6707ef commit dfee1ee
Show file tree
Hide file tree
Showing 9 changed files with 95 additions and 77 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public S3SeekableInputStreamFactory(
this.parquetMetadataStore = new ParquetMetadataStore(configuration.getLogicalIOConfiguration());
this.objectMetadataStore =
new MetadataStore(objectClient, telemetry, configuration.getPhysicalIOConfiguration());
this.memoryTracker = new MemoryTracker(configuration.getPhysicalIOConfiguration());
this.memoryTracker = new MemoryTracker();
this.objectBlobStore =
new BlobStore(
objectMetadataStore,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@ public class PhysicalIOConfiguration {
private static final long DEFAULT_READ_AHEAD_BYTES = 64 * ONE_KB;
private static final long DEFAULT_MAX_RANGE_SIZE = 8 * ONE_MB;
private static final long DEFAULT_PART_SIZE = 8 * ONE_MB;
// TODO: For some reason when I set this to 4 * ONE_GB, things start failing :(
private static final long DEFAULT_MAX_MEMORY_LIMIT_BYTES = 4 * ONE_MB;
public static final int DEFAULT_CACHE_EVICTION_TIME_MILLIS = 15 * 1000;

/** Capacity, in blobs. {@link PhysicalIOConfiguration#DEFAULT_CAPACITY_BLOB_STORE} by default. */
Expand Down Expand Up @@ -58,14 +56,6 @@ public class PhysicalIOConfiguration {
/** Part size, in bytes. {@link PhysicalIOConfiguration#DEFAULT_PART_SIZE} by default. */
@Builder.Default private long partSizeBytes = DEFAULT_PART_SIZE;

/**
* Max memory a single instance of the S3SeekableInputStreamFactory can use, in bytes. {@link
* PhysicalIOConfiguration#DEFAULT_PART_SIZE} by default.
*/
@Builder.Default private long maxMemoryLimitBytes = DEFAULT_MAX_MEMORY_LIMIT_BYTES;

private static final String MAX_MEMORY_BYTES_KEY = "max.memory.bytes";

/**
* Cache eviction time in milliseconds. {@link
* PhysicalIOConfiguration#DEFAULT_CACHE_EVICTION_TIME_MILLIS} by default.
Expand Down Expand Up @@ -95,8 +85,6 @@ public static PhysicalIOConfiguration fromConfiguration(ConnectorConfiguration c
.readAheadBytes(configuration.getLong(READ_AHEAD_BYTES_KEY, DEFAULT_READ_AHEAD_BYTES))
.maxRangeSizeBytes(configuration.getLong(MAX_RANGE_SIZE_BYTES_KEY, DEFAULT_MAX_RANGE_SIZE))
.partSizeBytes(configuration.getLong(PART_SIZE_BYTES_KEY, DEFAULT_PART_SIZE))
.maxMemoryLimitBytes(
configuration.getLong(MAX_MEMORY_BYTES_KEY, DEFAULT_MAX_MEMORY_LIMIT_BYTES))
.cacheEvictionTimeMillis(
configuration.getInt(CACHE_EVICTION_TIME_KEY, DEFAULT_CACHE_EVICTION_TIME_MILLIS))
.build();
Expand All @@ -111,7 +99,6 @@ public static PhysicalIOConfiguration fromConfiguration(ConnectorConfiguration c
* @param readAheadBytes Read ahead, in bytes
* @param maxRangeSizeBytes Maximum physical read issued against the object store
* @param partSizeBytes What part size to use when splitting up logical reads
* @param maxMemoryLimitBytes The maximum memory in bytes a single input stream factory may use
* @param cacheEvictionTimeMillis Cache eviction time in milliseconds
*/
@Builder
Expand All @@ -122,7 +109,6 @@ private PhysicalIOConfiguration(
long readAheadBytes,
long maxRangeSizeBytes,
long partSizeBytes,
long maxMemoryLimitBytes,
int cacheEvictionTimeMillis) {
Preconditions.checkArgument(blobStoreCapacity > 0, "`blobStoreCapacity` must be positive");
Preconditions.checkArgument(
Expand All @@ -131,7 +117,6 @@ private PhysicalIOConfiguration(
Preconditions.checkArgument(readAheadBytes > 0, "`readAheadLengthBytes` must be positive");
Preconditions.checkArgument(maxRangeSizeBytes > 0, "`maxRangeSize` must be positive");
Preconditions.checkArgument(partSizeBytes > 0, "`partSize` must be positive");
Preconditions.checkArgument(maxMemoryLimitBytes > 0, "`maxMemoryLimitBytes` must be positive");
Preconditions.checkArgument(
cacheEvictionTimeMillis > 0, "`cacheEvictionTime` must be positive");

Expand All @@ -141,7 +126,6 @@ private PhysicalIOConfiguration(
this.readAheadBytes = readAheadBytes;
this.maxRangeSizeBytes = maxRangeSizeBytes;
this.partSizeBytes = partSizeBytes;
this.maxMemoryLimitBytes = maxMemoryLimitBytes;
this.cacheEvictionTimeMillis = cacheEvictionTimeMillis;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,24 +38,44 @@ public BlockStore(
MetadataStore metadataStore,
PhysicalIOConfiguration configuration,
MemoryTracker memoryTracker) {
Preconditions.checkNotNull(s3URI, "`s3URI` must not be null");
Preconditions.checkNotNull(metadataStore, "`metadataStore` must not be null");

this.s3URI = s3URI;
this.metadataStore = metadataStore;
this.configuration = configuration;
this.blockCount = 0;
this.blocks =
this(
s3URI,
metadataStore,
configuration,
memoryTracker,
Caffeine.newBuilder()
.maximumSize(configuration.getBlobStoreCapacity())
.expireAfterWrite(Duration.ofMillis(configuration.getCacheEvictionTimeMillis()))
.removalListener(this::removalListener)
.build();
this.memoryTracker = memoryTracker;
.removalListener(
(Integer key, Block block, RemovalCause cause) ->
memoryTracker.freeMemory(block.getLength()))
.build());
}

private void removalListener(Integer key, Block block, RemovalCause cause) {
memoryTracker.freeMemory(block.getLength());
/**
* Constructs a new instance of a BlockStore. This version helps with dependency injection.
*
* @param s3URI s3URI the object's S3 URI
* @param metadataStore the metadata cache
* @param configuration physicalIO configuration
* @param memoryTracker the memory tracker
* @param blockCache block cache to use
*/
protected BlockStore(
S3URI s3URI,
MetadataStore metadataStore,
PhysicalIOConfiguration configuration,
MemoryTracker memoryTracker,
Cache<Integer, Block> blockCache) {

Preconditions.checkNotNull(s3URI, "`s3URI` must not be null");
Preconditions.checkNotNull(metadataStore, "`metadataStore` must not be null");
this.s3URI = s3URI;
this.metadataStore = metadataStore;
this.configuration = configuration;
this.memoryTracker = memoryTracker;
this.blocks = blockCache;
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,30 +1,14 @@
package com.amazon.connector.s3.io.physical.data;

import com.amazon.connector.s3.io.physical.PhysicalIOConfiguration;
import lombok.Getter;
import lombok.NonNull;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/**
* Responsible for tracking total memory used by an instance of {@link
* com.amazon.connector.s3.S3SeekableInputStreamFactory}.
*/
public class MemoryTracker {

private static final Logger LOG = LogManager.getLogger(MemoryTracker.class);

@Getter long memoryUsed;
long maxMemoryAllowed;

/**
* Constructs an instance of MemoryTracker.
*
* @param configuration physicalIO configuration
*/
public MemoryTracker(@NonNull PhysicalIOConfiguration configuration) {
this.maxMemoryAllowed = configuration.getMaxMemoryLimitBytes();
}

/**
* Increment memory used.
Expand All @@ -33,7 +17,6 @@ public MemoryTracker(@NonNull PhysicalIOConfiguration configuration) {
* @return total memory in use
*/
public long incrementMemoryUsed(long bytesAllocated) {
LOG.info("MEMORY ALLOCATED {}", bytesAllocated);
this.memoryUsed += bytesAllocated;
return memoryUsed;
}
Expand All @@ -45,17 +28,7 @@ public long incrementMemoryUsed(long bytesAllocated) {
* @return total memory in ues
*/
public long freeMemory(long bytesFreed) {
LOG.info("MEMORY FREED {}", bytesFreed);
this.memoryUsed -= bytesFreed;
return memoryUsed;
}

/**
* Gets available memory.
*
* @return available memory
*/
public long getAvailableMemory() {
return maxMemoryAllowed - memoryUsed;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ public class S3SeekableInputStreamTestBase {

protected final PhysicalIOConfiguration physicalIOConfiguration = PhysicalIOConfiguration.DEFAULT;
protected final FakeObjectClient fakeObjectClient = new FakeObjectClient(TEST_DATA);
protected final MemoryTracker memoryTracker = new MemoryTracker(physicalIOConfiguration);
protected final MemoryTracker memoryTracker = new MemoryTracker();
protected final MetadataStore metadataStore =
new MetadataStore(fakeObjectClient, TestTelemetry.DEFAULT, PhysicalIOConfiguration.DEFAULT);
protected final BlobStore blobStore =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,6 @@ void testCloseDependencies() throws IOException {
@Test
void testMetadaWithZeroContentLength() {
ObjectClient mockClient = mock(ObjectClient.class);
MemoryTracker memoryTracker = new MemoryTracker(PhysicalIOConfiguration.DEFAULT);
when(mockClient.headObject(any(HeadRequest.class)))
.thenReturn(
CompletableFuture.completedFuture(ObjectMetadata.builder().contentLength(0).build()));
Expand All @@ -120,7 +119,7 @@ void testMetadaWithZeroContentLength() {
mockClient,
TestTelemetry.DEFAULT,
PhysicalIOConfiguration.DEFAULT,
memoryTracker);
mock(MemoryTracker.class));
PhysicalIOImpl physicalIO =
new PhysicalIOImpl(s3URI, metadataStore, blobStore, TestTelemetry.DEFAULT);
assertDoesNotThrow(
Expand All @@ -136,7 +135,6 @@ void testMetadaWithZeroContentLength() {
@Test
void testMetadataWithNegativeContentLength() {
ObjectClient mockClient = mock(ObjectClient.class);
MemoryTracker memoryTracker = new MemoryTracker(PhysicalIOConfiguration.DEFAULT);
when(mockClient.headObject(any(HeadRequest.class)))
.thenReturn(
CompletableFuture.completedFuture(ObjectMetadata.builder().contentLength(-1).build()));
Expand All @@ -149,7 +147,7 @@ void testMetadataWithNegativeContentLength() {
mockClient,
TestTelemetry.DEFAULT,
PhysicalIOConfiguration.DEFAULT,
memoryTracker);
mock(MemoryTracker.class));
PhysicalIOImpl physicalIO =
new PhysicalIOImpl(s3URI, metadataStore, blobStore, TestTelemetry.DEFAULT);
assertDoesNotThrow(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,6 @@ private BlockManager getTestBlockManager(ObjectClient objectClient, int size) {
metadataStore,
TestTelemetry.DEFAULT,
PhysicalIOConfiguration.DEFAULT,
new MemoryTracker(PhysicalIOConfiguration.DEFAULT));
mock(MemoryTracker.class));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,11 @@
import com.amazon.connector.s3.request.ReadMode;
import com.amazon.connector.s3.util.FakeObjectClient;
import com.amazon.connector.s3.util.S3URI;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.RemovalCause;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Optional;
import java.util.OptionalLong;
import org.junit.jupiter.api.Test;
Expand All @@ -24,11 +29,11 @@ public class BlockStoreTest {
public void test__blockStore__getBlockAfterAddBlock() {
// Given: empty BlockStore
FakeObjectClient fakeObjectClient = new FakeObjectClient("test-data");
MemoryTracker memoryTracker = new MemoryTracker(PhysicalIOConfiguration.DEFAULT);
MetadataStore metadataStore =
new MetadataStore(fakeObjectClient, TestTelemetry.DEFAULT, PhysicalIOConfiguration.DEFAULT);
BlockStore blockStore =
new BlockStore(TEST_URI, metadataStore, PhysicalIOConfiguration.DEFAULT, memoryTracker);
new BlockStore(
TEST_URI, metadataStore, PhysicalIOConfiguration.DEFAULT, mock(MemoryTracker.class));

// When: a new block is added
blockStore.add(
Expand All @@ -48,11 +53,11 @@ public void test__blockStore__findNextMissingByteCorrect() {
// Given: BlockStore with blocks (2,3), (5,10), (12,15)
final String X_TIMES_16 = "xxxxxxxxxxxxxxxx";
FakeObjectClient fakeObjectClient = new FakeObjectClient(X_TIMES_16);
MemoryTracker memoryTracker = new MemoryTracker(PhysicalIOConfiguration.DEFAULT);
MetadataStore metadataStore =
new MetadataStore(fakeObjectClient, TestTelemetry.DEFAULT, PhysicalIOConfiguration.DEFAULT);
BlockStore blockStore =
new BlockStore(TEST_URI, metadataStore, PhysicalIOConfiguration.DEFAULT, memoryTracker);
new BlockStore(
TEST_URI, metadataStore, PhysicalIOConfiguration.DEFAULT, mock(MemoryTracker.class));

blockStore.add(
new Block(TEST_URI, fakeObjectClient, TestTelemetry.DEFAULT, 2, 3, 0, ReadMode.SYNC));
Expand All @@ -79,9 +84,9 @@ public void test__blockStore__findNextAvailableByteCorrect() {
FakeObjectClient fakeObjectClient = new FakeObjectClient(X_TIMES_16);
MetadataStore metadataStore =
new MetadataStore(fakeObjectClient, TestTelemetry.DEFAULT, PhysicalIOConfiguration.DEFAULT);
MemoryTracker memoryTracker = new MemoryTracker(PhysicalIOConfiguration.DEFAULT);
BlockStore blockStore =
new BlockStore(TEST_URI, metadataStore, PhysicalIOConfiguration.DEFAULT, memoryTracker);
new BlockStore(
TEST_URI, metadataStore, PhysicalIOConfiguration.DEFAULT, mock(MemoryTracker.class));

blockStore.add(
new Block(TEST_URI, fakeObjectClient, TestTelemetry.DEFAULT, 2, 3, 0, ReadMode.SYNC));
Expand Down Expand Up @@ -141,4 +146,48 @@ public void test__blockStore__closeWorksWithExceptions() {
// Then: 1\ blockStore.close did not throw, 2\ b2 was closed
verify(b2, times(1)).close();
}

@Test
public void testMemoryTrackedOnEviction() {
// Given: BlockStore with a block
MemoryTracker memoryTracker = mock(MemoryTracker.class);
PhysicalIOConfiguration configuration =
PhysicalIOConfiguration.builder().blobStoreCapacity(3).build();

// Cache which will do evictions synchronously to aid testing.
Cache<Integer, Block> blockCache =
Caffeine.newBuilder()
.maximumSize(configuration.getBlobStoreCapacity())
.expireAfterWrite(Duration.ofMillis(configuration.getCacheEvictionTimeMillis()))
.executor(Runnable::run)
.removalListener(
(Integer key, Block block, RemovalCause cause) ->
memoryTracker.freeMemory(block.getLength()))
.build();

BlockStore blockStore =
new BlockStore(
TEST_URI, mock(MetadataStore.class), configuration, memoryTracker, blockCache);

byte[] content = new byte[101];
FakeObjectClient fakeObjectClient =
new FakeObjectClient(new String(content, StandardCharsets.UTF_8));
Block block =
new Block(TEST_URI, fakeObjectClient, TestTelemetry.DEFAULT, 100, 200, 0, ReadMode.SYNC);

blockStore.add(block);
blockStore.add(block);
blockStore.add(block);

verify(memoryTracker, times(3)).incrementMemoryUsed(101);

// Cache is of size 3, check that an element is evicted when a fourth element is added.
blockStore.add(block);
verify(memoryTracker, times(1)).freeMemory(101);

// When: blockStore is closed
blockStore.close();
// All allocated
verify(memoryTracker, times(4)).freeMemory(101);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,7 @@

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;

import com.amazon.connector.s3.io.physical.PhysicalIOConfiguration;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import org.junit.jupiter.api.Test;

Expand All @@ -13,21 +11,17 @@
justification = "We mean to pass nulls to checks")
public class MemoryTrackerTest {

@Test
void testCreateBoundaries() {
assertThrows(NullPointerException.class, () -> new MemoryTracker(null));
}

@Test
void testDefaultConstructor() {
assertNotNull(new MemoryTracker(PhysicalIOConfiguration.DEFAULT));
assertNotNull(new MemoryTracker());
}

@Test
void testMemoryUsed() {
MemoryTracker memoryTracker = new MemoryTracker(PhysicalIOConfiguration.DEFAULT);
MemoryTracker memoryTracker = new MemoryTracker();
assertEquals(0, memoryTracker.getMemoryUsed());
assertEquals(500, memoryTracker.incrementMemoryUsed(500));
assertEquals(200, memoryTracker.freeMemory(300));
assertEquals(200, memoryTracker.getMemoryUsed());
}
}

0 comments on commit dfee1ee

Please sign in to comment.