Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Memory manager #221

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Open

Memory manager #221

wants to merge 8 commits into from

Conversation

rajdchak
Copy link
Contributor

@rajdchak rajdchak commented Feb 10, 2025

Description of change

Memory Management for AAL library.

Relevant issues

Does this contribution introduce any breaking changes to the existing APIs or behaviors?

Does this contribution introduce any new public APIs or behaviors?

How was the contribution tested?

Does this contribution need a changelog entry?

  • I have updated the CHANGELOG or README if appropriate

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license and I agree to the terms of the Developer Certificate of Origin (DCO).

@stubz151
Copy link
Contributor

Can we please update the description this is a pretty big change and is hard to understand without a nice description.

int metadataStoreCapacity,
long blockSizeBytes,
long readAheadBytes,
long maxRangeSizeBytes,
long partSizeBytes,
double sequentialPrefetchBase,
double sequentialPrefetchSpeed) {
Preconditions.checkArgument(blobStoreCapacity > 0, "`blobStoreCapacity` must be positive");
Preconditions.checkArgument(maxMemoryLimitAAL > 0, "`maxMemoryLimitAAL` must be positive");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should agree on a limit here besides 0? Because if a user passes through 1 for example I think weird things will happen and I don't know if it will be obvious to the user what is happening. Can we not do something like 30 * blockSizeBytes or something

@ahmarsuhail
Copy link
Collaborator

We discussed a bit in person. I am mostly ok with these changes for now! but will discuss with @fuatbasik how much work we want to do to make this reusable.

Currently I'm not happy with the BlockStore > Updates Memory Tracker > Asks BlobStore to clean up

We can have a contained cached interface, which we can reused in both the block store, the blob store, metadatastore which should make it easy to implement caching logic across our library. Just not sure how much of this we want to do now..


/** Checks memory usage of the BlobStore and schedules eviction if required */
private void checkBlobStoreMemoryUsageAndEvictIfRequired() {
if (shouldEvict() && isEvictionInProgress.compareAndSet(false, true)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that compareAndSet is cool, learnt something also I think is a good way of handling the cleanup. :D

}
});
this.blobMap = Collections.synchronizedMap(new LinkedHashMap<ObjectKey, Blob>(16, 0.75f, true));
if (configuration.getMaxMemoryLimitAAL() != Long.MAX_VALUE) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could we move this to a method and make a comment about what it is doing? with all the numbers and if's it can be a bit confusing about what the end result looks like here.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, it is really hard to reason what is going on here.

* @return the max memory limit of the BlobStore
*/
private long calculateDefaultMemoryLimit() {
return 3 * ONE_GB;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Decided this based on benchmark results.

Copy link
Collaborator

@fuatbasik fuatbasik left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @rajdchak. I put few comments but need another pass. My biggest question is about ownership of concerns. It is quite confusing today if BlobStore should depend a memory manager of if a MemoryManager should depend on a BlobStore. Next, we have multiple counters on MemoryManager and BlobStore, one to trigger eviction another one to keep track of usage (i guess).

Another one is the role of a MemoryManager seems like implemented in the BlobStore but this is not extensible or maintainable. I will do another full pass once you address these.

@@ -125,7 +128,12 @@ LogicalIO createLogicalIO(S3URI s3URI, StreamContext streamContext) throws IOExc
return new ParquetLogicalIOImpl(
s3URI,
new PhysicalIOImpl(
s3URI, objectMetadataStore, objectBlobStore, telemetry, streamContext),
s3URI,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since we have 6 parameters shall we move to a builder here?

@@ -134,7 +142,12 @@ LogicalIO createLogicalIO(S3URI s3URI, StreamContext streamContext) throws IOExc
return new DefaultLogicalIOImpl(
s3URI,
new PhysicalIOImpl(
s3URI, objectMetadataStore, objectBlobStore, telemetry, streamContext),
s3URI,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets avoid code duplication on creating PhysicalIOImpl

/** Capacity, in blobs. {@link PhysicalIOConfiguration#DEFAULT_CAPACITY_BLOB_STORE} by default. */
@Builder.Default private int blobStoreCapacity = DEFAULT_CAPACITY_BLOB_STORE;
/**
* Max memory to be used by library. {@link PhysicalIOConfiguration#DEFAULT_MAX_MEMORY_LIMIT_AAL}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you please add the unit here? Bytes, KBs? MBs?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is even better if the name tells about the unit

@@ -30,7 +30,7 @@
@Builder
@EqualsAndHashCode
public class PhysicalIOConfiguration {
private static final int DEFAULT_CAPACITY_BLOB_STORE = 50;
private static final long DEFAULT_MAX_MEMORY_LIMIT_AAL = Long.MAX_VALUE;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: it would be lovely if the name also tells the unit. i.e. DEFAULT_MAX_MEMORY_BYTES

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am confused. Here we are setting default limit to Long.MAX_VALUE, later in the code, we are using Long.MAX_VALUE to confirm this value was not explicitly set and actually set a 3GB default number. Why not just set it to 3GB here?

try {
blockManager.makePositionAvailable(pos, ReadMode.SYNC);
return blockManager.getBlock(pos).get().read(pos);
} finally {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why we are comfortable with lack of a catch block?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just wanted to decrement the active readers in any scenario. If there is any error thrown it can be handled by the caller methods, I don't think we want to catch anything here.

LOG.info(
"Scheduling eviction because blobstore memory usage exceeded and currently is "
+ memoryUsage.get());
evictionExecutor.execute(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why eviction business logic lives in BlobStore now that we have a MemoryManager?

@@ -106,5 +253,6 @@ public int blobCount() {
@Override
public void close() {
blobMap.forEach((k, v) -> v.close());
evictionExecutor.shutdownNow();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should probably close the memorymanager right?

@@ -75,6 +77,7 @@ public S3SeekableInputStreamFactory(
this.objectFormatSelector = new ObjectFormatSelector(configuration.getLogicalIOConfiguration());
this.objectBlobStore =
new BlobStore(objectClient, telemetry, configuration.getPhysicalIOConfiguration());
this.memoryManager = new MemoryManager(objectBlobStore);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should a memoryManager have a blobStore or blobStore have a memorymanager?

nextPosition += bytesRead;
}
return numBytesRead;
} finally {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why try if not catching

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just wanted to decrement the active readers in any scenario so needed the finally block.


private static final String BLOB_STORE_CAPACITY_KEY = "blobstore.capacity";
private static final String MAX_MEMORY_LIMIT_AAL_KEY = "max.memory";
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we can drop AAL from here. It is already part of the AAL.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants