From 8eea3c71ce4455157a559a9b4fcf71c75d75cca9 Mon Sep 17 00:00:00 2001 From: Lingfeng Zhang Date: Sat, 9 Nov 2024 17:59:58 +0800 Subject: [PATCH 1/3] Impl MmapFileStream --- cpp/core/shuffle/Spill.cc | 2 +- cpp/core/shuffle/Spill.h | 3 +- cpp/core/shuffle/Utils.cc | 93 +++++++++++++++++++++++++++++++++++++++ cpp/core/shuffle/Utils.h | 30 +++++++++++++ 4 files changed, 125 insertions(+), 3 deletions(-) diff --git a/cpp/core/shuffle/Spill.cc b/cpp/core/shuffle/Spill.cc index d8b9bc7ebf99..8efa323945d7 100644 --- a/cpp/core/shuffle/Spill.cc +++ b/cpp/core/shuffle/Spill.cc @@ -73,7 +73,7 @@ void Spill::insertPayload( void Spill::openSpillFile() { if (!is_) { - GLUTEN_ASSIGN_OR_THROW(is_, arrow::io::MemoryMappedFile::Open(spillFile_, arrow::io::FileMode::READ)); + GLUTEN_ASSIGN_OR_THROW(is_, MmapFileStream::open(spillFile_)); rawIs_ = is_.get(); } } diff --git a/cpp/core/shuffle/Spill.h b/cpp/core/shuffle/Spill.h index c82a60f562b4..2a88177d9756 100644 --- a/cpp/core/shuffle/Spill.h +++ b/cpp/core/shuffle/Spill.h @@ -69,9 +69,8 @@ class Spill final { }; SpillType type_; - std::shared_ptr is_; + std::shared_ptr is_; std::list partitionPayloads_{}; - std::shared_ptr inputStream_{}; std::string spillFile_; int64_t spillTime_; int64_t compressTime_; diff --git a/cpp/core/shuffle/Utils.cc b/cpp/core/shuffle/Utils.cc index 6854c1978370..8965288beeed 100644 --- a/cpp/core/shuffle/Utils.cc +++ b/cpp/core/shuffle/Utils.cc @@ -16,10 +16,13 @@ */ #include "shuffle/Utils.h" +#include #include #include #include #include +#include +#include #include #include #include @@ -212,6 +215,96 @@ arrow::Result> makeUncompressedRecordBatch( } return arrow::RecordBatch::Make(writeSchema, 1, {arrays}); } + +arrow::Result> MmapFileStream::open(const std::string& path) { + ARROW_ASSIGN_OR_RAISE(auto fileName, arrow::internal::PlatformFilename::FromString(path)); + + ARROW_ASSIGN_OR_RAISE(auto fd, arrow::internal::FileOpenReadable(fileName)); + ARROW_ASSIGN_OR_RAISE(auto size, arrow::internal::FileGetSize(fd.fd())); + + void* result = mmap(nullptr, size, PROT_READ, MAP_PRIVATE, fd.fd(), 0); + if (result == MAP_FAILED) { + return arrow::Status::IOError("Memory mapping file failed: ", ::arrow::internal::ErrnoMessage(errno)); + } + + auto fstream = std::shared_ptr(new MmapFileStream()); + fstream->fd_ = std::move(fd); + fstream->data_ = static_cast(result); + fstream->size_ = size; + return fstream; +} + +void MmapFileStream::advance(int64_t length) { + static auto pageSize = static_cast(arrow::internal::GetPageSize()); + static auto pageMask = ~(pageSize - 1); + DCHECK_GT(pageSize, 0); + DCHECK_EQ(pageMask & pageSize, pageSize); + + auto purgeLength = (pos_ - posRetain_) & pageMask; + if (purgeLength > 0) { + int ret = madvise(data_ + posRetain_, purgeLength, MADV_DONTNEED); + if (ret != 0) { + LOG(WARNING) << "fadvise failed " << ::arrow::internal::ErrnoMessage(errno); + } + posRetain_ += purgeLength; + } + + pos_ += length; +} + +void MmapFileStream::willNeed(int64_t length) { + static auto pageSize = static_cast(arrow::internal::GetPageSize()); + static auto pageMask = ~(pageSize - 1); + DCHECK_GT(pageSize, 0); + DCHECK_EQ(pageMask & pageSize, pageSize); + + auto willNeedPos = pos_ & pageMask; + auto willNeedLen = pos_ + length - willNeedPos; + int ret = madvise(data_ + willNeedPos, willNeedLen, MADV_WILLNEED); + if (ret != 0) { + LOG(WARNING) << "madvise willneed failed: " << ::arrow::internal::ErrnoMessage(errno); + } +} + +arrow::Status MmapFileStream::Close() { + if (data_ != nullptr) { + int result = munmap(data_, size_); + if (result != 0) { + LOG(WARNING) << "munmap failed"; + } + data_ = nullptr; + } + + return fd_.Close(); +} + +arrow::Result MmapFileStream::Tell() const { + return pos_; +} + +arrow::Result MmapFileStream::Read(int64_t nbytes, void* out) { + ARROW_ASSIGN_OR_RAISE(nbytes, actualReadSize(nbytes)); + + if (nbytes > 0) { + memcpy(out, data_ + pos_, nbytes); + advance(nbytes); + } + + return nbytes; +} + +arrow::Result> MmapFileStream::Read(int64_t nbytes) { + ARROW_ASSIGN_OR_RAISE(nbytes, actualReadSize(nbytes)); + + if (nbytes > 0) { + auto buffer = std::make_shared(data_ + pos_, nbytes); + willNeed(nbytes); + advance(nbytes); + return buffer; + } else { + return std::make_shared(nullptr, 0); + } +} } // namespace gluten std::string gluten::generateUuid() { diff --git a/cpp/core/shuffle/Utils.h b/cpp/core/shuffle/Utils.h index c4e2409d2da0..5d6f07707a73 100644 --- a/cpp/core/shuffle/Utils.h +++ b/cpp/core/shuffle/Utils.h @@ -72,4 +72,34 @@ arrow::Result> makeUncompressedRecordBatch( std::shared_ptr zeroLengthNullBuffer(); +class MmapFileStream : public arrow::io::InputStream { + public: + static arrow::Result> open(const std::string& path); + arrow::Result Tell() const override; + arrow::Status Close() override; + arrow::Result Read(int64_t nbytes, void* out) override; + arrow::Result> Read(int64_t nbytes) override; + bool closed() const override { + return data_ == nullptr; + }; + + private: + arrow::Result actualReadSize(int64_t nbytes) { + if (nbytes < 0 || pos_ > size_) { + return arrow::Status::IOError("Read out of range. Offset: ", pos_, " Size: ", nbytes, " File Size: ", size_); + } + return std::min(size_ - pos_, nbytes); + } + + void advance(int64_t length); + void willNeed(int64_t length); + + arrow::internal::FileDescriptor fd_; + uint8_t* data_ = nullptr; + int64_t size_; + int64_t pos_ = 0; + int64_t posRetain_ = 0; + MmapFileStream() = default; +}; + } // namespace gluten From 220359916d7d5ad5cb28428a8fe0996e5b19d372 Mon Sep 17 00:00:00 2001 From: Lingfeng Zhang Date: Mon, 18 Nov 2024 20:01:33 +0800 Subject: [PATCH 2/3] update --- cpp/core/shuffle/Utils.cc | 6 +----- cpp/core/shuffle/Utils.h | 5 ++++- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/cpp/core/shuffle/Utils.cc b/cpp/core/shuffle/Utils.cc index 8965288beeed..157c100a1f6f 100644 --- a/cpp/core/shuffle/Utils.cc +++ b/cpp/core/shuffle/Utils.cc @@ -227,11 +227,7 @@ arrow::Result> MmapFileStream::open(const std::s return arrow::Status::IOError("Memory mapping file failed: ", ::arrow::internal::ErrnoMessage(errno)); } - auto fstream = std::shared_ptr(new MmapFileStream()); - fstream->fd_ = std::move(fd); - fstream->data_ = static_cast(result); - fstream->size_ = size; - return fstream; + return std::make_shared(std::move(fd), static_cast(result), size); } void MmapFileStream::advance(int64_t length) { diff --git a/cpp/core/shuffle/Utils.h b/cpp/core/shuffle/Utils.h index 5d6f07707a73..41b0d5160445 100644 --- a/cpp/core/shuffle/Utils.h +++ b/cpp/core/shuffle/Utils.h @@ -72,8 +72,12 @@ arrow::Result> makeUncompressedRecordBatch( std::shared_ptr zeroLengthNullBuffer(); +// MmapFileStream is used to optimize sequential file reading. It uses madvise +// to prefetch and release memory timely. class MmapFileStream : public arrow::io::InputStream { public: + MmapFileStream(arrow::internal::FileDescriptor fd, uint8_t* data, int64_t size) + : fd_(std::move(fd)), data_(data), size_(size){}; static arrow::Result> open(const std::string& path); arrow::Result Tell() const override; arrow::Status Close() override; @@ -99,7 +103,6 @@ class MmapFileStream : public arrow::io::InputStream { int64_t size_; int64_t pos_ = 0; int64_t posRetain_ = 0; - MmapFileStream() = default; }; } // namespace gluten From bd579ba52e68f860c2cf4c8d35ed42e3dad8d35b Mon Sep 17 00:00:00 2001 From: Lingfeng Zhang Date: Tue, 19 Nov 2024 14:07:53 +0800 Subject: [PATCH 3/3] Separate the declaration and definition --- cpp/core/shuffle/Utils.cc | 14 ++++++++++++++ cpp/core/shuffle/Utils.h | 21 ++++++++++----------- 2 files changed, 24 insertions(+), 11 deletions(-) diff --git a/cpp/core/shuffle/Utils.cc b/cpp/core/shuffle/Utils.cc index 157c100a1f6f..ac5a0c6c80f5 100644 --- a/cpp/core/shuffle/Utils.cc +++ b/cpp/core/shuffle/Utils.cc @@ -216,6 +216,9 @@ arrow::Result> makeUncompressedRecordBatch( return arrow::RecordBatch::Make(writeSchema, 1, {arrays}); } +MmapFileStream::MmapFileStream(arrow::internal::FileDescriptor fd, uint8_t* data, int64_t size) + : fd_(std::move(fd)), data_(data), size_(size){}; + arrow::Result> MmapFileStream::open(const std::string& path) { ARROW_ASSIGN_OR_RAISE(auto fileName, arrow::internal::PlatformFilename::FromString(path)); @@ -230,6 +233,17 @@ arrow::Result> MmapFileStream::open(const std::s return std::make_shared(std::move(fd), static_cast(result), size); } +arrow::Result MmapFileStream::actualReadSize(int64_t nbytes) { + if (nbytes < 0 || pos_ > size_) { + return arrow::Status::IOError("Read out of range. Offset: ", pos_, " Size: ", nbytes, " File Size: ", size_); + } + return std::min(size_ - pos_, nbytes); +} + +bool MmapFileStream::closed() const { + return data_ == nullptr; +}; + void MmapFileStream::advance(int64_t length) { static auto pageSize = static_cast(arrow::internal::GetPageSize()); static auto pageMask = ~(pageSize - 1); diff --git a/cpp/core/shuffle/Utils.h b/cpp/core/shuffle/Utils.h index 41b0d5160445..43f1feb12e64 100644 --- a/cpp/core/shuffle/Utils.h +++ b/cpp/core/shuffle/Utils.h @@ -76,26 +76,25 @@ std::shared_ptr zeroLengthNullBuffer(); // to prefetch and release memory timely. class MmapFileStream : public arrow::io::InputStream { public: - MmapFileStream(arrow::internal::FileDescriptor fd, uint8_t* data, int64_t size) - : fd_(std::move(fd)), data_(data), size_(size){}; + MmapFileStream(arrow::internal::FileDescriptor fd, uint8_t* data, int64_t size); + static arrow::Result> open(const std::string& path); + arrow::Result Tell() const override; + arrow::Status Close() override; + arrow::Result Read(int64_t nbytes, void* out) override; + arrow::Result> Read(int64_t nbytes) override; - bool closed() const override { - return data_ == nullptr; - }; + + bool closed() const override; private: - arrow::Result actualReadSize(int64_t nbytes) { - if (nbytes < 0 || pos_ > size_) { - return arrow::Status::IOError("Read out of range. Offset: ", pos_, " Size: ", nbytes, " File Size: ", size_); - } - return std::min(size_ - pos_, nbytes); - } + arrow::Result actualReadSize(int64_t nbytes); void advance(int64_t length); + void willNeed(int64_t length); arrow::internal::FileDescriptor fd_;