Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Memory manager #221

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ void testSequentialReads(
StreamReadPatternKind streamReadPattern,
AALInputStreamConfigurationKind configuration)
throws IOException, InterruptedException, ExecutionException {

testAALReadConcurrency(
s3ClientKind,
s3Object,
Expand All @@ -47,6 +48,17 @@ void testSequentialReads(
CONCURRENCY_ITERATIONS);
}

@ParameterizedTest
@MethodSource("evictionTests")
void testReadsWithEviction(
S3ClientKind s3ClientKind,
StreamReadPatternKind streamReadPattern,
AALInputStreamConfigurationKind configuration)
throws IOException, InterruptedException, ExecutionException {

testAALReadConcurrencyWithEviction(s3ClientKind, streamReadPattern, configuration);
}

@ParameterizedTest
@MethodSource("skippingReads")
void testSkippingReads(
Expand Down Expand Up @@ -134,4 +146,9 @@ static Stream<Arguments> etagTests() {
parquetPatterns(),
getS3SeekableInputStreamConfigurations());
}

static Stream<Arguments> evictionTests() {
return argumentsForEvictionTest(
getS3ClientKinds(), parquetPatterns(), getS3SeekableInputStreamConfigurations());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,68 @@ protected void testAALReadConcurrency(
}
}

protected void testAALReadConcurrencyWithEviction(
@NonNull S3ClientKind s3ClientKind,
@NonNull StreamReadPatternKind streamReadPatternKind,
@NonNull AALInputStreamConfigurationKind AALInputStreamConfigurationKind)
throws IOException, InterruptedException, ExecutionException {

final Map<S3Object, Crc32CChecksum> s3ObjectCrc32CChecksumMap = new HashMap<>();
// Read using the standard S3 async client. We do this once, to calculate the checksums
for (S3Object obj : S3Object.smallAndMediumObjects()) {
System.out.println("Getting checksum for " + obj);
StreamReadPattern streamReadPattern = streamReadPatternKind.getStreamReadPattern(obj);
Crc32CChecksum directChecksum = new Crc32CChecksum();
executeReadPatternDirectly(s3ClientKind, obj, streamReadPattern, Optional.of(directChecksum));
s3ObjectCrc32CChecksumMap.put(obj, directChecksum);
}

List<S3Object> s3Objects = new ArrayList<>();
s3Objects.addAll(S3Object.smallObjects());
s3Objects.addAll(S3Object.smallAndMediumObjects());
s3Objects.addAll(S3Object.mediumObjects());

// Create the s3DATClientStreamReader - that creates the shared state
try (S3AALClientStreamReader s3AALClientStreamReader =
this.createS3AALClientStreamReader(s3ClientKind, AALInputStreamConfigurationKind)) {
// Create the thread pool
ExecutorService executorService = Executors.newFixedThreadPool(10);

List<Future<?>> resultFutures = new ArrayList<>();

for (S3Object obj : s3Objects) {
Future<?> future =
executorService.submit(
() -> {
try {
// This will create a new stream every time, but all streams will share state
Crc32CChecksum datChecksum = new Crc32CChecksum();
executeReadPatternOnAAL(
obj,
s3AALClientStreamReader,
streamReadPatternKind.getStreamReadPattern(obj),
Optional.of(datChecksum));

// Assert checksums
assertChecksums(s3ObjectCrc32CChecksumMap.get(obj), datChecksum);
} catch (Throwable t) {
throw new RuntimeException(t);
}
});
resultFutures.add(future);
}

// wait for each future to propagate errors
for (Future<?> future : resultFutures) {
// This should throw an exception, if a thread threw one, including assertions
future.get();
}
// Shutdown. Wait for termination indefinitely - we expect it to always complete
executorService.shutdown();
assertTrue(executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS));
}
}

/**
* Stream read patterns to test on
*
Expand Down Expand Up @@ -404,6 +466,21 @@ static Stream<Arguments> argumentsFor(
}
}
}
return results.stream();
}

static Stream<Arguments> argumentsForEvictionTest(
List<S3ClientKind> clients,
List<StreamReadPatternKind> readPatterns,
List<AALInputStreamConfigurationKind> configurations) {
ArrayList<Arguments> results = new ArrayList<>();
for (S3ClientKind client : clients) {
for (StreamReadPatternKind readPattern : readPatterns) {
for (AALInputStreamConfigurationKind configuration : configurations) {
results.add(Arguments.of(client, readPattern, configuration));
}
}
}

return results.stream();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import software.amazon.s3.analyticsaccelerator.io.logical.impl.ParquetColumnPrefetchStore;
import software.amazon.s3.analyticsaccelerator.io.logical.impl.ParquetLogicalIOImpl;
import software.amazon.s3.analyticsaccelerator.io.physical.data.BlobStore;
import software.amazon.s3.analyticsaccelerator.io.physical.data.MemoryManager;
import software.amazon.s3.analyticsaccelerator.io.physical.data.MetadataStore;
import software.amazon.s3.analyticsaccelerator.io.physical.impl.PhysicalIOImpl;
import software.amazon.s3.analyticsaccelerator.request.ObjectClient;
Expand Down Expand Up @@ -53,6 +54,7 @@ public class S3SeekableInputStreamFactory implements AutoCloseable {
private final BlobStore objectBlobStore;
private final Telemetry telemetry;
private final ObjectFormatSelector objectFormatSelector;
private final MemoryManager memoryManager;

/**
* Creates a new instance of {@link S3SeekableInputStreamFactory}. This factory should be used to
Expand All @@ -75,6 +77,7 @@ public S3SeekableInputStreamFactory(
this.objectFormatSelector = new ObjectFormatSelector(configuration.getLogicalIOConfiguration());
this.objectBlobStore =
new BlobStore(objectClient, telemetry, configuration.getPhysicalIOConfiguration());
this.memoryManager = new MemoryManager(objectBlobStore);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should a memoryManager have a blobStore or blobStore have a memorymanager?

}

/**
Expand Down Expand Up @@ -125,7 +128,12 @@ LogicalIO createLogicalIO(S3URI s3URI, StreamContext streamContext) throws IOExc
return new ParquetLogicalIOImpl(
s3URI,
new PhysicalIOImpl(
s3URI, objectMetadataStore, objectBlobStore, telemetry, streamContext),
s3URI,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since we have 6 parameters shall we move to a builder here?

objectMetadataStore,
objectBlobStore,
telemetry,
streamContext,
memoryManager),
telemetry,
configuration.getLogicalIOConfiguration(),
parquetColumnPrefetchStore);
Expand All @@ -134,7 +142,12 @@ LogicalIO createLogicalIO(S3URI s3URI, StreamContext streamContext) throws IOExc
return new DefaultLogicalIOImpl(
s3URI,
new PhysicalIOImpl(
s3URI, objectMetadataStore, objectBlobStore, telemetry, streamContext),
s3URI,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets avoid code duplication on creating PhysicalIOImpl

objectMetadataStore,
objectBlobStore,
telemetry,
streamContext,
memoryManager),
telemetry);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
@Builder
@EqualsAndHashCode
public class PhysicalIOConfiguration {
private static final int DEFAULT_CAPACITY_BLOB_STORE = 50;
private static final long DEFAULT_MAX_MEMORY_LIMIT_AAL = Long.MAX_VALUE;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: it would be lovely if the name also tells the unit. i.e. DEFAULT_MAX_MEMORY_BYTES

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am confused. Here we are setting default limit to Long.MAX_VALUE, later in the code, we are using Long.MAX_VALUE to confirm this value was not explicitly set and actually set a 3GB default number. Why not just set it to 3GB here?

private static final int DEFAULT_CAPACITY_METADATA_STORE = 50;
private static final boolean DEFAULT_USE_SINGLE_CACHE = true;
private static final long DEFAULT_BLOCK_SIZE_BYTES = 8 * ONE_MB;
Expand All @@ -40,10 +40,13 @@ public class PhysicalIOConfiguration {
private static final double DEFAULT_SEQUENTIAL_PREFETCH_BASE = 2.0;
private static final double DEFAULT_SEQUENTIAL_PREFETCH_SPEED = 1.0;

/** Capacity, in blobs. {@link PhysicalIOConfiguration#DEFAULT_CAPACITY_BLOB_STORE} by default. */
@Builder.Default private int blobStoreCapacity = DEFAULT_CAPACITY_BLOB_STORE;
/**
* Max memory to be used by library. {@link PhysicalIOConfiguration#DEFAULT_MAX_MEMORY_LIMIT_AAL}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you please add the unit here? Bytes, KBs? MBs?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is even better if the name tells about the unit

* by default.
*/
@Builder.Default private long maxMemoryLimitAAL = DEFAULT_MAX_MEMORY_LIMIT_AAL;

private static final String BLOB_STORE_CAPACITY_KEY = "blobstore.capacity";
private static final String MAX_MEMORY_LIMIT_AAL_KEY = "aal.maxmemory";

/**
* Capacity, in blobs. {@link PhysicalIOConfiguration#DEFAULT_CAPACITY_METADATA_STORE} by default.
Expand Down Expand Up @@ -104,8 +107,8 @@ public class PhysicalIOConfiguration {
*/
public static PhysicalIOConfiguration fromConfiguration(ConnectorConfiguration configuration) {
return PhysicalIOConfiguration.builder()
.blobStoreCapacity(
configuration.getInt(BLOB_STORE_CAPACITY_KEY, DEFAULT_CAPACITY_BLOB_STORE))
.maxMemoryLimitAAL(
configuration.getLong(MAX_MEMORY_LIMIT_AAL_KEY, DEFAULT_MAX_MEMORY_LIMIT_AAL))
.metadataStoreCapacity(
configuration.getInt(METADATA_STORE_CAPACITY_KEY, DEFAULT_CAPACITY_METADATA_STORE))
.blockSizeBytes(configuration.getLong(BLOCK_SIZE_BYTES_KEY, DEFAULT_BLOCK_SIZE_BYTES))
Expand All @@ -123,7 +126,7 @@ public static PhysicalIOConfiguration fromConfiguration(ConnectorConfiguration c
/**
* Constructs {@link PhysicalIOConfiguration}.
*
* @param blobStoreCapacity The capacity of the BlobStore
* @param maxMemoryLimitAAL The memory limit for library
* @param metadataStoreCapacity The capacity of the MetadataStore
* @param blockSizeBytes Block size, in bytes
* @param readAheadBytes Read ahead, in bytes
Expand All @@ -136,15 +139,15 @@ public static PhysicalIOConfiguration fromConfiguration(ConnectorConfiguration c
*/
@Builder
private PhysicalIOConfiguration(
int blobStoreCapacity,
long maxMemoryLimitAAL,
int metadataStoreCapacity,
long blockSizeBytes,
long readAheadBytes,
long maxRangeSizeBytes,
long partSizeBytes,
double sequentialPrefetchBase,
double sequentialPrefetchSpeed) {
Preconditions.checkArgument(blobStoreCapacity > 0, "`blobStoreCapacity` must be positive");
Preconditions.checkArgument(maxMemoryLimitAAL > 0, "`blobStoreCapacity` must be positive");
Preconditions.checkArgument(
metadataStoreCapacity > 0, "`metadataStoreCapacity` must be positive");
Preconditions.checkArgument(blockSizeBytes > 0, "`blockSizeBytes` must be positive");
Expand All @@ -156,7 +159,7 @@ private PhysicalIOConfiguration(
Preconditions.checkArgument(
sequentialPrefetchSpeed > 0, "`sequentialPrefetchSpeed` must be positive");

this.blobStoreCapacity = blobStoreCapacity;
this.maxMemoryLimitAAL = maxMemoryLimitAAL;
this.metadataStoreCapacity = metadataStoreCapacity;
this.blockSizeBytes = blockSizeBytes;
this.readAheadBytes = readAheadBytes;
Expand All @@ -171,7 +174,7 @@ public String toString() {
final StringBuilder builder = new StringBuilder();

builder.append("PhysicalIO configuration:\n");
builder.append("\tblobStoreCapacity: " + blobStoreCapacity + "\n");
builder.append("\tmaxMemoryLimitAAL: " + maxMemoryLimitAAL + "\n");
builder.append("\tmetadataStoreCapacity: " + metadataStoreCapacity + "\n");
builder.append("\tblockSizeBytes: " + blockSizeBytes + "\n");
builder.append("\treadAheadBytes: " + readAheadBytes + "\n");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.NonNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -41,6 +42,7 @@ public class Blob implements Closeable {
private final BlockManager blockManager;
private final ObjectMetadata metadata;
private final Telemetry telemetry;
private AtomicInteger activeReaders;

/**
* Construct a new Blob.
Expand All @@ -60,6 +62,32 @@ public Blob(
this.metadata = metadata;
this.blockManager = blockManager;
this.telemetry = telemetry;
this.activeReaders = new AtomicInteger(0);
}

/**
* Updates and returns the active readers of this blob
*
* @param readers The delta to be added to the current active readers of this blob
* @return the updated active readers for this blob
*/
public int updateActiveReaders(int readers) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: update generally means re-setting a value to a new value. Here, we are actually incrementing/decrementing the existing value. We can pick a better name here.

return activeReaders.addAndGet(readers);
}

/** @return the current active readers of this blob */
public int getActiveReaders() {
return activeReaders.get();
}

/** @return the object key of this blob */
public ObjectKey getObjectKey() {
return objectKey;
}

/** @return the blockManager of this blob */
public BlockManager getBlockManager() {
return blockManager;
}

/**
Expand All @@ -71,8 +99,13 @@ public Blob(
*/
public int read(long pos) throws IOException {
Preconditions.checkArgument(pos >= 0, "`pos` must be non-negative");
blockManager.makePositionAvailable(pos, ReadMode.SYNC);
return blockManager.getBlock(pos).get().read(pos);
try {
blockManager.makePositionAvailable(pos, ReadMode.SYNC);
int bytesRead = blockManager.getBlock(pos).get().read(pos);
return bytesRead;
} finally {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why we are comfortable with lack of a catch block?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just wanted to decrement the active readers in any scenario. If there is any error thrown it can be handled by the caller methods, I don't think we want to catch anything here.

updateActiveReaders(-1);
}
}

/**
Expand All @@ -92,34 +125,37 @@ public int read(byte[] buf, int off, int len, long pos) throws IOException {
Preconditions.checkArgument(0 <= len, "`len` must not be negative");
Preconditions.checkArgument(off < buf.length, "`off` must be less than size of buffer");

blockManager.makeRangeAvailable(pos, len, ReadMode.SYNC);
try {
blockManager.makeRangeAvailable(pos, len, ReadMode.SYNC);

long nextPosition = pos;
int numBytesRead = 0;
long nextPosition = pos;
int numBytesRead = 0;

while (numBytesRead < len && nextPosition < contentLength()) {
final long nextPositionFinal = nextPosition;
Block nextBlock =
blockManager
.getBlock(nextPosition)
.orElseThrow(
() ->
new IllegalStateException(
String.format(
"This block (for position %s) should have been available.",
nextPositionFinal)));
while (numBytesRead < len && nextPosition < contentLength()) {
final long nextPositionFinal = nextPosition;
Block nextBlock =
blockManager
.getBlock(nextPosition)
.orElseThrow(
() ->
new IllegalStateException(
String.format(
"This block (for position %s) should have been available.",
nextPositionFinal)));

int bytesRead = nextBlock.read(buf, off + numBytesRead, len - numBytesRead, nextPosition);
int bytesRead = nextBlock.read(buf, off + numBytesRead, len - numBytesRead, nextPosition);

if (bytesRead == -1) {
return numBytesRead;
}
if (bytesRead == -1) {
return numBytesRead;
}

numBytesRead = numBytesRead + bytesRead;
nextPosition += bytesRead;
numBytesRead = numBytesRead + bytesRead;
nextPosition += bytesRead;
}
return numBytesRead;
} finally {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why try if not catching

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just wanted to decrement the active readers in any scenario so needed the finally block.

updateActiveReaders(-1);
}

return numBytesRead;
}

/**
Expand Down
Loading
Loading