Skip to content

Commit

Permalink
[GLUTEN-7969][VL] Enable spill to multiple directories for micro benc…
Browse files Browse the repository at this point in the history
…hmark (#7970)
  • Loading branch information
marin-ma authored Nov 19, 2024
1 parent 97c423f commit 90428b9
Show file tree
Hide file tree
Showing 12 changed files with 187 additions and 161 deletions.
5 changes: 4 additions & 1 deletion cpp/core/shuffle/LocalPartitionWriter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,7 @@ LocalPartitionWriter::LocalPartitionWriter(
}

std::string LocalPartitionWriter::nextSpilledFileDir() {
auto spilledFileDir = getSpilledShuffleFileDir(localDirs_[dirSelection_], subDirSelection_[dirSelection_]);
auto spilledFileDir = getShuffleSpillDir(localDirs_[dirSelection_], subDirSelection_[dirSelection_]);
subDirSelection_[dirSelection_] = (subDirSelection_[dirSelection_] + 1) % options_.numSubDirs;
dirSelection_ = (dirSelection_ + 1) % localDirs_.size();
return spilledFileDir;
Expand Down Expand Up @@ -505,6 +505,9 @@ arrow::Status LocalPartitionWriter::stop(ShuffleWriterMetrics* metrics) {
"Merging from spill " + std::to_string(s) + " is not exhausted. pid: " + std::to_string(pid));
}
}
if (std::filesystem::exists(spill->spillFile()) && !std::filesystem::remove(spill->spillFile())) {
LOG(WARNING) << "Error while deleting spill file " << spill->spillFile();
}
++s;
}
spills_.clear();
Expand Down
1 change: 0 additions & 1 deletion cpp/core/shuffle/LocalPartitionWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

#pragma once

#include <arrow/filesystem/localfs.h>
#include <arrow/io/api.h>

#include "shuffle/PartitionWriter.h"
Expand Down
38 changes: 17 additions & 21 deletions cpp/core/shuffle/Utils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,15 @@

#include "shuffle/Utils.h"
#include <arrow/record_batch.h>
#include <boost/uuid/uuid_generators.hpp>
#include <boost/uuid/uuid_io.hpp>
#include <fcntl.h>
#include <unistd.h>
#include <iomanip>
#include <iostream>
#include <numeric>
#include <sstream>
#include <thread>
#include "shuffle/Options.h"
#include "utils/StringUtil.h"
#include "utils/Timer.h"

namespace gluten {
Expand Down Expand Up @@ -214,40 +214,36 @@ arrow::Result<std::shared_ptr<arrow::RecordBatch>> makeUncompressedRecordBatch(
}
} // namespace gluten

std::string gluten::generateUuid() {
boost::uuids::random_generator generator;
return boost::uuids::to_string(generator());
}

std::string gluten::getSpilledShuffleFileDir(const std::string& configuredDir, int32_t subDirId) {
auto fs = std::make_shared<arrow::fs::LocalFileSystem>();
std::string gluten::getShuffleSpillDir(const std::string& configuredDir, int32_t subDirId) {
std::stringstream ss;
ss << std::setfill('0') << std::setw(2) << std::hex << subDirId;
auto dir = arrow::fs::internal::ConcatAbstractPath(configuredDir, ss.str());
return dir;
return std::filesystem::path(configuredDir) / ss.str();
}

arrow::Result<std::string> gluten::createTempShuffleFile(const std::string& dir) {
if (dir.length() == 0) {
return arrow::Status::Invalid("Failed to create spilled file, got empty path.");
}

auto fs = std::make_shared<arrow::fs::LocalFileSystem>();
ARROW_ASSIGN_OR_RAISE(auto path_info, fs->GetFileInfo(dir));
if (path_info.type() == arrow::fs::FileType::NotFound) {
RETURN_NOT_OK(fs->CreateDir(dir, true));
if (std::filesystem::exists(dir)) {
if (!std::filesystem::is_directory(dir)) {
return arrow::Status::Invalid("Invalid directory. File path exists but is not a directory: ", dir);
}
} else {
std::filesystem::create_directories(dir);
}

const auto parentPath = std::filesystem::path(dir);
bool exist = true;
std::string filePath;
std::filesystem::path filePath;
while (exist) {
filePath = arrow::fs::internal::ConcatAbstractPath(dir, "temp_shuffle_" + generateUuid());
ARROW_ASSIGN_OR_RAISE(auto file_info, fs->GetFileInfo(filePath));
if (file_info.type() == arrow::fs::FileType::NotFound) {
int fd = open(filePath.c_str(), O_CREAT | O_EXCL | O_RDWR, 0666);
filePath = parentPath / ("temp-shuffle-" + generateUuid());
if (!std::filesystem::exists(filePath)) {
auto fd = open(filePath.c_str(), O_CREAT | O_EXCL | O_RDWR, 0666);
if (fd < 0) {
if (errno != EEXIST) {
return arrow::Status::IOError("Failed to open local file " + filePath + ", Reason: " + strerror(errno));
return arrow::Status::IOError(
"Failed to open local file " + filePath.string() + ", Reason: " + strerror(errno));
}
} else {
exist = false;
Expand Down
10 changes: 4 additions & 6 deletions cpp/core/shuffle/Utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@
#pragma once

#include <arrow/array.h>
#include <arrow/filesystem/filesystem.h>
#include <arrow/filesystem/localfs.h>
#include <arrow/filesystem/path_util.h>
#include <arrow/ipc/writer.h>
#include <arrow/type.h>
#include <arrow/util/io_util.h>

#include <chrono>
#include <filesystem>

#include "utils/Compression.h"

namespace gluten {
Expand All @@ -36,9 +36,7 @@ static const size_t kSizeOfBinaryArrayLengthBuffer = sizeof(BinaryArrayLengthBuf
static const size_t kSizeOfIpcOffsetBuffer = sizeof(IpcOffsetBufferType);
static const std::string kGlutenSparkLocalDirs = "GLUTEN_SPARK_LOCAL_DIRS";

std::string generateUuid();

std::string getSpilledShuffleFileDir(const std::string& configuredDir, int32_t subDirId);
std::string getShuffleSpillDir(const std::string& configuredDir, int32_t subDirId);

arrow::Result<std::string> createTempShuffleFile(const std::string& dir);

Expand Down
16 changes: 13 additions & 3 deletions cpp/core/utils/StringUtil.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,16 @@

#include <filesystem>
#include <iostream>
#include <string_view>
#include <vector>

#include <boost/uuid/uuid_generators.hpp>
#include <boost/uuid/uuid_io.hpp>

#include "Exception.h"
#include "StringUtil.h"

std::vector<std::string> gluten::splitByDelim(const std::string& s, const char delimiter) {
namespace gluten {
std::vector<std::string> splitByDelim(const std::string& s, const char delimiter) {
if (s.empty()) {
return {};
}
Expand All @@ -41,7 +44,7 @@ std::vector<std::string> gluten::splitByDelim(const std::string& s, const char d
return result;
}

std::vector<std::string> gluten::splitPaths(const std::string& s, bool checkExists) {
std::vector<std::string> splitPaths(const std::string& s, bool checkExists) {
if (s.empty()) {
return {};
}
Expand All @@ -61,3 +64,10 @@ std::vector<std::string> gluten::splitPaths(const std::string& s, bool checkExis
}
return paths;
}

std::string generateUuid() {
boost::uuids::random_generator generator;
return boost::uuids::to_string(generator());
}

} // namespace gluten
2 changes: 2 additions & 0 deletions cpp/core/utils/StringUtil.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,6 @@ std::vector<std::string> splitByDelim(const std::string& s, const char delimiter

std::vector<std::string> splitPaths(const std::string& s, bool checkExists = false);

std::string generateUuid();

} // namespace gluten
137 changes: 111 additions & 26 deletions cpp/velox/benchmarks/GenericBenchmark.cc
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,50 @@ void setUpBenchmark(::benchmark::internal::Benchmark* bm) {
}
}

std::string generateUniqueSubdir(const std::string& parent, const std::string& prefix = "") {
auto path = std::filesystem::path(parent) / (prefix + generateUuid());
std::error_code ec{};
while (!std::filesystem::create_directories(path, ec)) {
if (ec) {
LOG(ERROR) << fmt::format("Failed to created spill directory: {}, error code: {}", path, ec.message());
std::exit(EXIT_FAILURE);
}
path = std::filesystem::path(parent) / (prefix + generateUuid());
}
return path;
}

std::vector<std::string> createLocalDirs() {
static const std::string kBenchmarkDirPrefix = "generic-benchmark-";
std::vector<std::string> localDirs;

auto joinedDirsC = std::getenv(gluten::kGlutenSparkLocalDirs.c_str());
// Check if local dirs are set from env.
if (joinedDirsC != nullptr && strcmp(joinedDirsC, "") > 0) {
auto joinedDirs = std::string(joinedDirsC);
auto dirs = gluten::splitPaths(joinedDirs);
for (const auto& dir : dirs) {
localDirs.push_back(generateUniqueSubdir(dir, kBenchmarkDirPrefix));
}
} else {
// Otherwise create 1 temp dir.
localDirs.push_back(generateUniqueSubdir(std::filesystem::temp_directory_path(), kBenchmarkDirPrefix));
}
return localDirs;
}

void cleanupLocalDirs(const std::vector<std::string>& localDirs) {
for (const auto& localDir : localDirs) {
std::error_code ec;
std::filesystem::remove_all(localDir, ec);
if (ec) {
LOG(WARNING) << fmt::format("Failed to remove directory: {}, error message: {}", localDir, ec.message());
} else {
LOG(INFO) << "Removed local dir: " << localDir;
}
}
}

PartitionWriterOptions createPartitionWriterOptions() {
PartitionWriterOptions partitionWriterOptions{};
// Disable writer's merge.
Expand Down Expand Up @@ -204,11 +248,10 @@ void runShuffle(
const std::shared_ptr<gluten::ResultIterator>& resultIter,
WriterMetrics& writerMetrics,
ReaderMetrics& readerMetrics,
bool readAfterWrite) {
std::string dataFile;
std::vector<std::string> localDirs;
bool isFromEnv;
GLUTEN_THROW_NOT_OK(setLocalDirsAndDataFileFromEnv(dataFile, localDirs, isFromEnv));
bool readAfterWrite,
const std::vector<std::string>& localDirs,
const std::string& dataFileDir) {
GLUTEN_ASSIGN_OR_THROW(auto dataFile, gluten::createTempShuffleFile(dataFileDir));

auto partitionWriterOptions = createPartitionWriterOptions();
auto partitionWriter = createPartitionWriter(runtime, partitionWriterOptions, dataFile, localDirs);
Expand Down Expand Up @@ -252,8 +295,12 @@ void runShuffle(
readerMetrics.decompressTime = reader->getDecompressTime();
readerMetrics.deserializeTime = reader->getDeserializeTime();
}
// Cleanup shuffle outputs
cleanupShuffleOutput(dataFile, localDirs, isFromEnv);

if (std::filesystem::remove(dataFile)) {
LOG(INFO) << "Removed shuffle data file: " << dataFile;
} else {
LOG(WARNING) << "Failed to remove shuffle data file. File does not exist: " << dataFile;
}
}

void updateBenchmarkMetrics(
Expand Down Expand Up @@ -292,7 +339,6 @@ void updateBenchmarkMetrics(
writerMetrics.bytesWritten, benchmark::Counter::kAvgIterations, benchmark::Counter::OneK::kIs1024);
}
}

} // namespace

using RuntimeFactory = std::function<VeloxRuntime*(MemoryManager* memoryManager)>;
Expand All @@ -301,6 +347,7 @@ auto BM_Generic = [](::benchmark::State& state,
const std::string& planFile,
const std::vector<std::string>& splitFiles,
const std::vector<std::string>& dataFiles,
const std::vector<std::string>& localDirs,
RuntimeFactory runtimeFactory,
FileReaderType readerType) {
setCpu(state);
Expand All @@ -316,6 +363,19 @@ auto BM_Generic = [](::benchmark::State& state,
splits.push_back(getPlanFromFile("ReadRel.LocalFiles", splitFile));
}

const auto tid = std::hash<std::thread::id>{}(std::this_thread::get_id());
const auto spillDirIndex = tid % localDirs.size();
const auto veloxSpillDir = generateUniqueSubdir(std::filesystem::path(localDirs[spillDirIndex]) / "gluten-spill");

std::vector<std::string> shuffleSpillDirs;
std::transform(localDirs.begin(), localDirs.end(), std::back_inserter(shuffleSpillDirs), [](const auto& dir) {
auto path = std::filesystem::path(dir) / "shuffle-write";
return path;
});
// Use a different directory for data file.
const auto dataFileDir = gluten::getShuffleSpillDir(
shuffleSpillDirs[(spillDirIndex + 1) % localDirs.size()], state.thread_index() % gluten::kDefaultNumSubDirs);

WriterMetrics writerMetrics{};
ReaderMetrics readerMetrics{};
int64_t readInputTime = 0;
Expand Down Expand Up @@ -343,11 +403,13 @@ auto BM_Generic = [](::benchmark::State& state,
for (auto& split : splits) {
runtime->parseSplitInfo(reinterpret_cast<uint8_t*>(split.data()), split.size(), std::nullopt);
}
auto resultIter = runtime->createResultIterator("/tmp/test-spill", std::move(inputIters), runtime->getConfMap());

auto resultIter = runtime->createResultIterator(veloxSpillDir, std::move(inputIters), runtime->getConfMap());
listenerPtr->setIterator(resultIter.get());

if (FLAGS_with_shuffle) {
runShuffle(runtime, listenerPtr, resultIter, writerMetrics, readerMetrics, false);
runShuffle(
runtime, listenerPtr, resultIter, writerMetrics, readerMetrics, false, shuffleSpillDirs, dataFileDir);
} else {
// May write the output into file.
auto veloxPlan = dynamic_cast<gluten::VeloxRuntime*>(runtime)->getVeloxPlan();
Expand Down Expand Up @@ -405,6 +467,7 @@ auto BM_Generic = [](::benchmark::State& state,

auto BM_ShuffleWriteRead = [](::benchmark::State& state,
const std::string& inputFile,
const std::vector<std::string>& localDirs,
RuntimeFactory runtimeFactory,
FileReaderType readerType) {
setCpu(state);
Expand All @@ -414,6 +477,10 @@ auto BM_ShuffleWriteRead = [](::benchmark::State& state,
auto* memoryManager = MemoryManager::create(kVeloxBackendKind, std::move(listener));
auto runtime = runtimeFactory(memoryManager);

const size_t dirIndex = std::hash<std::thread::id>{}(std::this_thread::get_id()) % localDirs.size();
const auto dataFileDir =
gluten::getShuffleSpillDir(localDirs[dirIndex], state.thread_index() % gluten::kDefaultNumSubDirs);

WriterMetrics writerMetrics{};
ReaderMetrics readerMetrics{};
int64_t readInputTime = 0;
Expand All @@ -422,7 +489,15 @@ auto BM_ShuffleWriteRead = [](::benchmark::State& state,
ScopedTimer timer(&elapsedTime);
for (auto _ : state) {
auto resultIter = getInputIteratorFromFileReader(inputFile, readerType);
runShuffle(runtime, listenerPtr, resultIter, writerMetrics, readerMetrics, FLAGS_run_shuffle_read);
runShuffle(
runtime,
listenerPtr,
resultIter,
writerMetrics,
readerMetrics,
FLAGS_run_shuffle_read,
localDirs,
dataFileDir);

auto reader = static_cast<FileReaderIterator*>(resultIter->getInputIter());
readInputTime += reader->getCollectBatchTime();
Expand Down Expand Up @@ -600,23 +675,31 @@ int main(int argc, char** argv) {
return dynamic_cast<VeloxRuntime*>(Runtime::create(kVeloxBackendKind, memoryManager, sessionConf));
};

#define GENERIC_BENCHMARK(READER_TYPE) \
do { \
auto* bm = \
::benchmark::RegisterBenchmark( \
"GenericBenchmark", BM_Generic, substraitJsonFile, splitFiles, dataFiles, runtimeFactory, READER_TYPE) \
->MeasureProcessCPUTime() \
->UseRealTime(); \
setUpBenchmark(bm); \
const auto localDirs = createLocalDirs();

#define GENERIC_BENCHMARK(READER_TYPE) \
do { \
auto* bm = ::benchmark::RegisterBenchmark( \
"GenericBenchmark", \
BM_Generic, \
substraitJsonFile, \
splitFiles, \
dataFiles, \
localDirs, \
runtimeFactory, \
READER_TYPE) \
->MeasureProcessCPUTime() \
->UseRealTime(); \
setUpBenchmark(bm); \
} while (0)

#define SHUFFLE_WRITE_READ_BENCHMARK(READER_TYPE) \
do { \
auto* bm = ::benchmark::RegisterBenchmark( \
"ShuffleWriteRead", BM_ShuffleWriteRead, dataFiles[0], runtimeFactory, READER_TYPE) \
->MeasureProcessCPUTime() \
->UseRealTime(); \
setUpBenchmark(bm); \
#define SHUFFLE_WRITE_READ_BENCHMARK(READER_TYPE) \
do { \
auto* bm = ::benchmark::RegisterBenchmark( \
"ShuffleWriteRead", BM_ShuffleWriteRead, dataFiles[0], localDirs, runtimeFactory, READER_TYPE) \
->MeasureProcessCPUTime() \
->UseRealTime(); \
setUpBenchmark(bm); \
} while (0)

if (dataFiles.empty()) {
Expand All @@ -642,5 +725,7 @@ int main(int argc, char** argv) {

gluten::VeloxBackend::get()->tearDown();

cleanupLocalDirs(localDirs);

return 0;
}
Loading

0 comments on commit 90428b9

Please sign in to comment.