-
Notifications
You must be signed in to change notification settings - Fork 9
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Memory manager #221
base: main
Are you sure you want to change the base?
Memory manager #221
Conversation
…ry to fix error java.lang.OutOfMemoryError: Java heap space
...stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/physical/data/BlobStore.java
Outdated
Show resolved
Hide resolved
...integrationTest/java/software/amazon/s3/analyticsaccelerator/access/IntegrationTestBase.java
Show resolved
Hide resolved
Can we please update the description this is a pretty big change and is hard to understand without a nice description. |
...integrationTest/java/software/amazon/s3/analyticsaccelerator/access/IntegrationTestBase.java
Show resolved
Hide resolved
...c/main/java/software/amazon/s3/analyticsaccelerator/io/physical/PhysicalIOConfiguration.java
Outdated
Show resolved
Hide resolved
int metadataStoreCapacity, | ||
long blockSizeBytes, | ||
long readAheadBytes, | ||
long maxRangeSizeBytes, | ||
long partSizeBytes, | ||
double sequentialPrefetchBase, | ||
double sequentialPrefetchSpeed) { | ||
Preconditions.checkArgument(blobStoreCapacity > 0, "`blobStoreCapacity` must be positive"); | ||
Preconditions.checkArgument(maxMemoryLimitAAL > 0, "`maxMemoryLimitAAL` must be positive"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should agree on a limit here besides 0? Because if a user passes through 1 for example I think weird things will happen and I don't know if it will be obvious to the user what is happening. Can we not do something like 30 * blockSizeBytes or something
input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/physical/data/Blob.java
Outdated
Show resolved
Hide resolved
input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/physical/data/Blob.java
Outdated
Show resolved
Hide resolved
...stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/physical/data/BlobStore.java
Outdated
Show resolved
Hide resolved
...am/src/main/java/software/amazon/s3/analyticsaccelerator/io/physical/data/MemoryManager.java
Show resolved
Hide resolved
...stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/physical/data/BlobStore.java
Outdated
Show resolved
Hide resolved
...stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/physical/data/BlobStore.java
Outdated
Show resolved
Hide resolved
...stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/physical/data/BlobStore.java
Show resolved
Hide resolved
We discussed a bit in person. I am mostly ok with these changes for now! but will discuss with @fuatbasik how much work we want to do to make this reusable. Currently I'm not happy with the BlockStore > Updates Memory Tracker > Asks BlobStore to clean up We can have a contained cached interface, which we can reused in both the block store, the blob store, metadatastore which should make it easy to implement caching logic across our library. Just not sure how much of this we want to do now.. |
|
||
/** Checks memory usage of the BlobStore and schedules eviction if required */ | ||
private void checkBlobStoreMemoryUsageAndEvictIfRequired() { | ||
if (shouldEvict() && isEvictionInProgress.compareAndSet(false, true)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that compareAndSet is cool, learnt something also I think is a good way of handling the cleanup. :D
} | ||
}); | ||
this.blobMap = Collections.synchronizedMap(new LinkedHashMap<ObjectKey, Blob>(16, 0.75f, true)); | ||
if (configuration.getMaxMemoryLimitAAL() != Long.MAX_VALUE) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could we move this to a method and make a comment about what it is doing? with all the numbers and if's it can be a bit confusing about what the end result looks like here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, it is really hard to reason what is going on here.
* @return the max memory limit of the BlobStore | ||
*/ | ||
private long calculateDefaultMemoryLimit() { | ||
return 3 * ONE_GB; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Decided this based on benchmark results.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @rajdchak. I put few comments but need another pass. My biggest question is about ownership of concerns. It is quite confusing today if BlobStore should depend a memory manager of if a MemoryManager should depend on a BlobStore. Next, we have multiple counters on MemoryManager and BlobStore, one to trigger eviction another one to keep track of usage (i guess).
Another one is the role of a MemoryManager seems like implemented in the BlobStore but this is not extensible or maintainable. I will do another full pass once you address these.
@@ -125,7 +128,12 @@ LogicalIO createLogicalIO(S3URI s3URI, StreamContext streamContext) throws IOExc | |||
return new ParquetLogicalIOImpl( | |||
s3URI, | |||
new PhysicalIOImpl( | |||
s3URI, objectMetadataStore, objectBlobStore, telemetry, streamContext), | |||
s3URI, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
since we have 6 parameters shall we move to a builder here?
@@ -134,7 +142,12 @@ LogicalIO createLogicalIO(S3URI s3URI, StreamContext streamContext) throws IOExc | |||
return new DefaultLogicalIOImpl( | |||
s3URI, | |||
new PhysicalIOImpl( | |||
s3URI, objectMetadataStore, objectBlobStore, telemetry, streamContext), | |||
s3URI, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lets avoid code duplication on creating PhysicalIOImpl
/** Capacity, in blobs. {@link PhysicalIOConfiguration#DEFAULT_CAPACITY_BLOB_STORE} by default. */ | ||
@Builder.Default private int blobStoreCapacity = DEFAULT_CAPACITY_BLOB_STORE; | ||
/** | ||
* Max memory to be used by library. {@link PhysicalIOConfiguration#DEFAULT_MAX_MEMORY_LIMIT_AAL} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you please add the unit here? Bytes, KBs? MBs?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it is even better if the name tells about the unit
@@ -30,7 +30,7 @@ | |||
@Builder | |||
@EqualsAndHashCode | |||
public class PhysicalIOConfiguration { | |||
private static final int DEFAULT_CAPACITY_BLOB_STORE = 50; | |||
private static final long DEFAULT_MAX_MEMORY_LIMIT_AAL = Long.MAX_VALUE; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: it would be lovely if the name also tells the unit. i.e. DEFAULT_MAX_MEMORY_BYTES
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am confused. Here we are setting default limit to Long.MAX_VALUE, later in the code, we are using Long.MAX_VALUE to confirm this value was not explicitly set and actually set a 3GB default number. Why not just set it to 3GB here?
try { | ||
blockManager.makePositionAvailable(pos, ReadMode.SYNC); | ||
return blockManager.getBlock(pos).get().read(pos); | ||
} finally { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why we are comfortable with lack of a catch block?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just wanted to decrement the active readers in any scenario. If there is any error thrown it can be handled by the caller methods, I don't think we want to catch anything here.
LOG.info( | ||
"Scheduling eviction because blobstore memory usage exceeded and currently is " | ||
+ memoryUsage.get()); | ||
evictionExecutor.execute( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why eviction business logic lives in BlobStore now that we have a MemoryManager?
@@ -106,5 +253,6 @@ public int blobCount() { | |||
@Override | |||
public void close() { | |||
blobMap.forEach((k, v) -> v.close()); | |||
evictionExecutor.shutdownNow(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this should probably close the memorymanager right?
@@ -75,6 +77,7 @@ public S3SeekableInputStreamFactory( | |||
this.objectFormatSelector = new ObjectFormatSelector(configuration.getLogicalIOConfiguration()); | |||
this.objectBlobStore = | |||
new BlobStore(objectClient, telemetry, configuration.getPhysicalIOConfiguration()); | |||
this.memoryManager = new MemoryManager(objectBlobStore); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should a memoryManager have a blobStore or blobStore have a memorymanager?
nextPosition += bytesRead; | ||
} | ||
return numBytesRead; | ||
} finally { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why try if not catching
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just wanted to decrement the active readers in any scenario so needed the finally block.
|
||
private static final String BLOB_STORE_CAPACITY_KEY = "blobstore.capacity"; | ||
private static final String MAX_MEMORY_LIMIT_AAL_KEY = "max.memory"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: we can drop AAL from here. It is already part of the AAL.
Description of change
Memory Management for AAL library.
Relevant issues
Does this contribution introduce any breaking changes to the existing APIs or behaviors?
Does this contribution introduce any new public APIs or behaviors?
How was the contribution tested?
Does this contribution need a changelog entry?
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license and I agree to the terms of the Developer Certificate of Origin (DCO).