Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GLUTEN-7860][CORE] In shuffle writer, replace MemoryMappedFile to avoid OOM #7861

Open
wants to merge 3 commits into
base: main
Choose a base branch
from

Conversation

ccat3z
Copy link
Contributor

@ccat3z ccat3z commented Nov 8, 2024

What changes were proposed in this pull request?

This pr fixed #7860 by MmapFileStream extended arrow:io::InputStream. MmapFileStream will invoke MADV_DONTNEED to release previous memory when read next range of data.

How was this patch tested?

// Generate 10 partitions, each partition has about 10GB random data.
def gen(scale: Int, parts: Int) = {
  sc.parallelize(1 to (1024*1024), numSlices = 1000)
    .map(x => (x % 1000, randStr(scale * parts)))
    .repartition(parts)
    .toDF("a", "b")
    .save./* ... */
}

// Trigger shuffle spill by `repartition(50)`.
def test(parts: Int = 50) = {
  spark.read./* ... */.repartition(parts)
    .filter(expr("a < 0*rand()")) // avoid pushdown repartition
}
# Executor Memory Config
spark.executor.memory=512M
spark.yarn.executor.memoryOverhead=512M
spark.gluten.memory.offHeap.size.in.bytes=1610612736

Test Result:

impl avg time to merge spills (s) avg total spilled size of each task (MB)
read (arrow ReadableFile) 10.58706836156 9935.920098495480
mmap (open required range by MemoryMappedFile) 6.602059312420000 9935.920098495480
madv (this pr) 6.73993204562 9935.920098495480
mmap (repace madv by munmap in this pr) 6.55791399852 9935.920098495480

munmap patch in above test:

diff --git a/cpp/core/shuffle/Utils.cc b/cpp/core/shuffle/Utils.cc
index 1ceb777f1..742c53c90 100644
--- a/cpp/core/shuffle/Utils.cc
+++ b/cpp/core/shuffle/Utils.cc
@@ -243,9 +243,9 @@ void MmapFileStream::advance(int64_t length) {
 
   auto purgeLength = (pos_ - posRetain_) & pageMask;
   if (purgeLength > 0) {
-    int ret = madvise(data_ + posRetain_, purgeLength, MADV_DONTNEED);
+    int ret = munmap(data_ + posRetain_, purgeLength);
     if (ret != 0) {
-      LOG(WARNING) << "fadvise failed " << ::arrow::internal::ErrnoMessage(errno);
+      LOG(WARNING) << "munmap failed " << ::arrow::internal::ErrnoMessage(errno);
     }
     posRetain_ += purgeLength;
   }
@@ -269,7 +269,7 @@ void MmapFileStream::willNeed(int64_t length) {
 
 arrow::Status MmapFileStream::Close() {
   if (data_ != nullptr) {
-    int result = munmap(data_, size_);
+    int result = munmap(data_ + posRetain_, size_ - posRetain_);
     if (result != 0) {
       LOG(WARNING) << "munmap failed";
     }

@github-actions github-actions bot added the VELOX label Nov 8, 2024
Copy link

github-actions bot commented Nov 8, 2024

#7860

@ccat3z
Copy link
Contributor Author

ccat3z commented Nov 8, 2024

cc @kecookier

@zhztheplayer zhztheplayer changed the title [GLUTEN-7860][CORE] Replace MemoryMappedFile with ReadableFile to avoid OOM [GLUTEN-7860][CORE] In shuffle writer, replace MemoryMappedFile with ReadableFile to avoid OOM Nov 8, 2024
@kecookier
Copy link
Contributor

/Benchmark Velox

1 similar comment
@ccat3z
Copy link
Contributor Author

ccat3z commented Nov 9, 2024

/Benchmark Velox

@ccat3z ccat3z marked this pull request as ready for review November 9, 2024 03:15
@ccat3z
Copy link
Contributor Author

ccat3z commented Nov 9, 2024

/Benchmark Velox

Copy link
Member

@zhztheplayer zhztheplayer left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ccat3z Do you see #7860 fixed with this approach?

I am triggering a benchmark manually.

cc @marin-ma @FelixYBW

@@ -73,7 +73,7 @@ void Spill::insertPayload(

void Spill::openSpillFile() {
if (!is_) {
GLUTEN_ASSIGN_OR_THROW(is_, arrow::io::MemoryMappedFile::Open(spillFile_, arrow::io::FileMode::READ));
GLUTEN_ASSIGN_OR_THROW(is_, arrow::io::ReadableFile::Open(spillFile_));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the API implemented with buffered read?

Not sure whether https://github.com/apache/arrow/blob/main/cpp/src/arrow/io/buffered.h may help here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Spill merge needn't buffer

@marin-ma
Copy link
Contributor

I am triggering a benchmark manually.

@zhztheplayer There's no shuffle spill on jenkins. The change won't be tested.

@zhztheplayer
Copy link
Member

I am triggering a benchmark manually.

@zhztheplayer There's no shuffle spill on jenkins. The change won't be tested.

Thought we always rely on Spark-controlled spill in shuffle. Does Jenkins CI always have enough memory for all shuffle data?

@GlutenPerfBot

This comment was marked as off-topic.

@FelixYBW
Copy link
Contributor

@zhztheplayer There's no shuffle spill on jenkins. The change won't be tested.

Is it because the spill will be triggered on other operators in the pipeline? Like a sort + shuffle. Will the sort be triggered or shuffle?

@FelixYBW
Copy link
Contributor

@zhztheplayer @marin-ma can we create a query and config to test it?

@ccat3z ccat3z changed the title [GLUTEN-7860][CORE] In shuffle writer, replace MemoryMappedFile with ReadableFile to avoid OOM [GLUTEN-7860][CORE] In shuffle writer, replace MemoryMappedFile to avoid OOM Nov 18, 2024
@ccat3z ccat3z force-pushed the mmap-read-file branch 2 times, most recently from eaf10aa to 43a4f06 Compare November 18, 2024 03:34
@ccat3z
Copy link
Contributor Author

ccat3z commented Nov 18, 2024

@FelixYBW @zhztheplayer I added MmapFileStream in this pr. MmapFileStream will invoke MADV_DONTNEED to release previous memory when reading next range of data. Test approach and result has updated in PR description.

Comment on lines 230 to 233
auto fstream = std::shared_ptr<MmapFileStream>(new MmapFileStream());
fstream->fd_ = std::move(fd);
fstream->data_ = static_cast<uint8_t*>(result);
fstream->size_ = size;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use std::make_shared and set the argument through ctor?

@@ -72,4 +72,34 @@ arrow::Result<std::shared_ptr<arrow::RecordBatch>> makeUncompressedRecordBatch(

std::shared_ptr<arrow::Buffer> zeroLengthNullBuffer();

class MmapFileStream : public arrow::io::InputStream {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please add some comments to explain the usage/functionality for this class?

Copy link
Contributor

@marin-ma marin-ma left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some minor comments. Thanks!

// to prefetch and release memory timely.
class MmapFileStream : public arrow::io::InputStream {
public:
MmapFileStream(arrow::internal::FileDescriptor fd, uint8_t* data, int64_t size)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please separate the declaration and definition. And add a blank line between two member functions.

arrow::Status Close() override;
arrow::Result<int64_t> Read(int64_t nbytes, void* out) override;
arrow::Result<std::shared_ptr<arrow::Buffer>> Read(int64_t nbytes) override;
bool closed() const override {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

};

private:
arrow::Result<int64_t> actualReadSize(int64_t nbytes) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

@@ -72,4 +72,37 @@ arrow::Result<std::shared_ptr<arrow::RecordBatch>> makeUncompressedRecordBatch(

std::shared_ptr<arrow::Buffer> zeroLengthNullBuffer();

// MmapFileStream is used to optimize sequential file reading. It uses madvise
// to prefetch and release memory timely.
class MmapFileStream : public arrow::io::InputStream {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You may contribute MmapFileStream to Apache Arrow in future.

@FelixYBW
Copy link
Contributor

Thank you. Looks good solution!

Copy link
Contributor

@marin-ma marin-ma left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Thanks!

@zhztheplayer
Copy link
Member

Is it because the spill will be triggered on other operators in the pipeline? Like a sort + shuffle. Will the sort be triggered or shuffle?

So far the spill will be triggered on components holding more memory no matter it's Velox operator or shuffle. We have a basic priority setting in Spiller API and in future we can extend and use it to implement some fixed spill order.

@FelixYBW
Copy link
Contributor

Is it because the spill will be triggered on other operators in the pipeline? Like a sort + shuffle. Will the sort be triggered or shuffle?

So far the spill will be triggered on components holding more memory no matter it's Velox operator or shuffle. We have a basic priority setting in Spiller API and in future we can extend and use it to implement some fixed spill order.

So now once spill is called, all operator's spill is triggered, right?

@zhztheplayer
Copy link
Member

zhztheplayer commented Nov 20, 2024

Is it because the spill will be triggered on other operators in the pipeline? Like a sort + shuffle. Will the sort be triggered or shuffle?

So far the spill will be triggered on components holding more memory no matter it's Velox operator or shuffle. We have a basic priority setting in Spiller API and in future we can extend and use it to implement some fixed spill order.

So now once spill is called, all operator's spill is triggered, right?

We pass a target spill size to Velox API so usually the spill call stops when enough memory space is reclaimed. So a portion of the operators can be omitted in the procedure.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[CORE] LocalParitionWriter causes OOM during mergeSpills
6 participants