Skip to content

Commit

Permalink
Impl MmapFileStream
Browse files Browse the repository at this point in the history
  • Loading branch information
ccat3z committed Nov 18, 2024
1 parent 17d9cd8 commit eaf10aa
Show file tree
Hide file tree
Showing 4 changed files with 125 additions and 3 deletions.
2 changes: 1 addition & 1 deletion cpp/core/shuffle/Spill.cc
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ void Spill::insertPayload(

void Spill::openSpillFile() {
if (!is_) {
GLUTEN_ASSIGN_OR_THROW(is_, arrow::io::MemoryMappedFile::Open(spillFile_, arrow::io::FileMode::READ));
GLUTEN_ASSIGN_OR_THROW(is_, MmapFileStream::open(spillFile_));
rawIs_ = is_.get();
}
}
Expand Down
3 changes: 1 addition & 2 deletions cpp/core/shuffle/Spill.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,8 @@ class Spill final {
};

SpillType type_;
std::shared_ptr<arrow::io::MemoryMappedFile> is_;
std::shared_ptr<gluten::MmapFileStream> is_;
std::list<PartitionPayload> partitionPayloads_{};
std::shared_ptr<arrow::io::MemoryMappedFile> inputStream_{};
std::string spillFile_;
int64_t spillTime_;
int64_t compressTime_;
Expand Down
93 changes: 93 additions & 0 deletions cpp/core/shuffle/Utils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@
#include <numeric>
#include <sstream>
#include <thread>
#include <sys/mman.h>
#include <glog/logging.h>
#include <arrow/buffer.h>
#include "shuffle/Options.h"
#include "utils/Timer.h"

Expand Down Expand Up @@ -212,6 +215,96 @@ arrow::Result<std::shared_ptr<arrow::RecordBatch>> makeUncompressedRecordBatch(
}
return arrow::RecordBatch::Make(writeSchema, 1, {arrays});
}

arrow::Result<std::shared_ptr<MmapFileStream>> MmapFileStream::open(const std::string& path) {
ARROW_ASSIGN_OR_RAISE(auto fileName, arrow::internal::PlatformFilename::FromString(path));

ARROW_ASSIGN_OR_RAISE(auto fd, arrow::internal::FileOpenReadable(fileName));
ARROW_ASSIGN_OR_RAISE(auto size, arrow::internal::FileGetSize(fd.fd()));

void* result = mmap(nullptr, size, PROT_READ, MAP_PRIVATE, fd.fd(), 0);
if (result == MAP_FAILED) {
return arrow::Status::IOError("Memory mapping file failed: ", ::arrow::internal::ErrnoMessage(errno));
}

auto fstream = std::shared_ptr<MmapFileStream>(new MmapFileStream());
fstream->fd_ = std::move(fd);
fstream->data_ = static_cast<uint8_t *>(result);
fstream->size_ = size;
return fstream;
}

void MmapFileStream::advance(int64_t length) {
static auto pageSize = static_cast<size_t>(arrow::internal::GetPageSize());
static auto pageMask = ~(pageSize - 1);
DCHECK_GT(pageSize, 0);
DCHECK_EQ(pageMask & pageSize, pageSize);

auto purgeLength = (pos_ - posRetain_) & pageMask;
if (purgeLength > 0) {
int ret = madvise(data_ + posRetain_, purgeLength, MADV_DONTNEED);
if (ret != 0) {
LOG(WARNING) << "fadvise failed " << ::arrow::internal::ErrnoMessage(errno);
}
posRetain_ += purgeLength;
}

pos_ += length;
}

void MmapFileStream::willNeed(int64_t length) {
static auto pageSize = static_cast<size_t>(arrow::internal::GetPageSize());
static auto pageMask = ~(pageSize - 1);
DCHECK_GT(pageSize, 0);
DCHECK_EQ(pageMask & pageSize, pageSize);

auto willNeedPos = pos_ & pageMask;
auto willNeedLen = pos_ + length - willNeedPos;;
int ret = madvise(data_ + willNeedPos, willNeedLen, MADV_WILLNEED);
if (ret != 0) {
LOG(WARNING) << "madvise willneed failed: " << ::arrow::internal::ErrnoMessage(errno);
}
}

arrow::Status MmapFileStream::Close() {
if (data_ != nullptr) {
int result = munmap(data_, size_);
if (result != 0) {
LOG(WARNING) << "munmap failed";
}
data_ = nullptr;
}

return fd_.Close();
}

arrow::Result<int64_t> MmapFileStream::Tell() const {
return pos_;
}

arrow::Result<int64_t> MmapFileStream::Read(int64_t nbytes, void* out) {
ARROW_ASSIGN_OR_RAISE(nbytes, actualReadSize(nbytes));

if (nbytes > 0) {
memcpy(out, data_ + pos_, nbytes);
advance(nbytes);
}

return nbytes;
}

arrow::Result<std::shared_ptr<arrow::Buffer>> MmapFileStream::Read(int64_t nbytes) {
ARROW_ASSIGN_OR_RAISE(nbytes, actualReadSize(nbytes));

if (nbytes > 0) {
auto buffer = std::make_shared<arrow::Buffer>(data_ + pos_, nbytes);
willNeed(nbytes);
advance(nbytes);
return buffer;
} else {
return std::make_shared<arrow::Buffer>(nullptr, 0);
}
}
} // namespace gluten

std::string gluten::generateUuid() {
Expand Down
30 changes: 30 additions & 0 deletions cpp/core/shuffle/Utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,4 +72,34 @@ arrow::Result<std::shared_ptr<arrow::RecordBatch>> makeUncompressedRecordBatch(

std::shared_ptr<arrow::Buffer> zeroLengthNullBuffer();

class MmapFileStream : public arrow::io::InputStream {
public:
static arrow::Result<std::shared_ptr<MmapFileStream>> open(const std::string& path);
arrow::Result<int64_t> Tell() const override;
arrow::Status Close() override;
arrow::Result<int64_t> Read(int64_t nbytes, void* out) override;
arrow::Result<std::shared_ptr<arrow::Buffer>> Read(int64_t nbytes) override;
bool closed() const override {
return data_ == nullptr;
};

private:
arrow::Result<int64_t> actualReadSize(int64_t nbytes) {
if (nbytes < 0 || pos_ > size_) {
return arrow::Status::IOError("Read out of range. Offset: ", pos_, " Size: ", nbytes, " File Size: ", size_);
}
return std::min(size_ - pos_, nbytes);
}

void advance(int64_t length);
void willNeed(int64_t length);

arrow::internal::FileDescriptor fd_;
uint8_t* data_ = nullptr;
int64_t size_;
int64_t pos_ = 0;
int64_t posRetain_ = 0;
MmapFileStream() = default;
};

} // namespace gluten

0 comments on commit eaf10aa

Please sign in to comment.