-
Notifications
You must be signed in to change notification settings - Fork 437
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[GLUTEN-7860][CORE] In shuffle writer, replace MemoryMappedFile to avoid OOM #7861
base: main
Are you sure you want to change the base?
Conversation
cc @kecookier |
/Benchmark Velox |
1 similar comment
/Benchmark Velox |
/Benchmark Velox |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cpp/core/shuffle/Spill.cc
Outdated
@@ -73,7 +73,7 @@ void Spill::insertPayload( | |||
|
|||
void Spill::openSpillFile() { | |||
if (!is_) { | |||
GLUTEN_ASSIGN_OR_THROW(is_, arrow::io::MemoryMappedFile::Open(spillFile_, arrow::io::FileMode::READ)); | |||
GLUTEN_ASSIGN_OR_THROW(is_, arrow::io::ReadableFile::Open(spillFile_)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the API implemented with buffered read?
Not sure whether https://github.com/apache/arrow/blob/main/cpp/src/arrow/io/buffered.h may help here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Spill merge needn't buffer
@zhztheplayer There's no shuffle spill on jenkins. The change won't be tested. |
Thought we always rely on Spark-controlled spill in shuffle. Does Jenkins CI always have enough memory for all shuffle data? |
This comment was marked as off-topic.
This comment was marked as off-topic.
Is it because the spill will be triggered on other operators in the pipeline? Like a sort + shuffle. Will the sort be triggered or shuffle? |
@zhztheplayer @marin-ma can we create a query and config to test it? |
02fc498
to
d87c094
Compare
eaf10aa
to
43a4f06
Compare
@FelixYBW @zhztheplayer I added |
43a4f06
to
8eea3c7
Compare
cpp/core/shuffle/Utils.cc
Outdated
auto fstream = std::shared_ptr<MmapFileStream>(new MmapFileStream()); | ||
fstream->fd_ = std::move(fd); | ||
fstream->data_ = static_cast<uint8_t*>(result); | ||
fstream->size_ = size; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we use std::make_shared and set the argument through ctor?
@@ -72,4 +72,34 @@ arrow::Result<std::shared_ptr<arrow::RecordBatch>> makeUncompressedRecordBatch( | |||
|
|||
std::shared_ptr<arrow::Buffer> zeroLengthNullBuffer(); | |||
|
|||
class MmapFileStream : public arrow::io::InputStream { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you please add some comments to explain the usage/functionality for this class?
6baf43e
to
2203599
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some minor comments. Thanks!
cpp/core/shuffle/Utils.h
Outdated
// to prefetch and release memory timely. | ||
class MmapFileStream : public arrow::io::InputStream { | ||
public: | ||
MmapFileStream(arrow::internal::FileDescriptor fd, uint8_t* data, int64_t size) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please separate the declaration and definition. And add a blank line between two member functions.
cpp/core/shuffle/Utils.h
Outdated
arrow::Status Close() override; | ||
arrow::Result<int64_t> Read(int64_t nbytes, void* out) override; | ||
arrow::Result<std::shared_ptr<arrow::Buffer>> Read(int64_t nbytes) override; | ||
bool closed() const override { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
cpp/core/shuffle/Utils.h
Outdated
}; | ||
|
||
private: | ||
arrow::Result<int64_t> actualReadSize(int64_t nbytes) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
@@ -72,4 +72,37 @@ arrow::Result<std::shared_ptr<arrow::RecordBatch>> makeUncompressedRecordBatch( | |||
|
|||
std::shared_ptr<arrow::Buffer> zeroLengthNullBuffer(); | |||
|
|||
// MmapFileStream is used to optimize sequential file reading. It uses madvise | |||
// to prefetch and release memory timely. | |||
class MmapFileStream : public arrow::io::InputStream { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You may contribute MmapFileStream to Apache Arrow in future.
Thank you. Looks good solution! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Thanks!
So far the spill will be triggered on components holding more memory no matter it's Velox operator or shuffle. We have a basic priority setting in Spiller API and in future we can extend and use it to implement some fixed spill order. |
So now once spill is called, all operator's spill is triggered, right? |
We pass a target spill size to Velox API so usually the spill call stops when enough memory space is reclaimed. So a portion of the operators can be omitted in the procedure. |
What changes were proposed in this pull request?
This pr fixed #7860 by
MmapFileStream
extendedarrow:io::InputStream
.MmapFileStream
will invoke MADV_DONTNEED to release previous memory when read next range of data.How was this patch tested?
# Executor Memory Config spark.executor.memory=512M spark.yarn.executor.memoryOverhead=512M spark.gluten.memory.offHeap.size.in.bytes=1610612736
Test Result:
munmap
patch in above test: