Skip to content

Commit

Permalink
Add more size classes
Browse files Browse the repository at this point in the history
Experiment to see dynamics with 2, 4, 8, 16 MB classes added.
  • Loading branch information
Orri Erling committed Jun 18, 2024
1 parent 5c123cc commit 5a4a672
Show file tree
Hide file tree
Showing 10 changed files with 182 additions and 8 deletions.
1 change: 1 addition & 0 deletions velox/common/memory/Memory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ std::shared_ptr<MemoryAllocator> createAllocator(
if (options.useMmapAllocator) {
MmapAllocator::Options mmapOptions;
mmapOptions.capacity = options.allocatorCapacity;
mmapOptions.largestSizeClass = options.largestSizeClassPages;
mmapOptions.useMmapArena = options.useMmapArena;
mmapOptions.mmapArenaCapacityRatio = options.mmapArenaCapacityRatio;
return std::make_shared<MmapAllocator>(mmapOptions);
Expand Down
3 changes: 3 additions & 0 deletions velox/common/memory/Memory.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,9 @@ struct MemoryManagerOptions {
/// std::malloc.
bool useMmapAllocator{false};

// Number of pages in largest size class in MmapAllocator.
int32_t largestSizeClassPages{256};

/// If true, allocations larger than largest size class size will be delegated
/// to ManagedMmapArena. Otherwise a system mmap call will be issued for each
/// such allocation.
Expand Down
81 changes: 78 additions & 3 deletions velox/common/memory/MemoryAllocator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "velox/common/memory/MallocAllocator.h"

#include <sys/mman.h>
#include <sys/resource.h>
#include <iostream>
#include <numeric>

Expand All @@ -28,6 +29,18 @@ DECLARE_bool(velox_memory_use_hugepages);

namespace facebook::velox::memory {

// static
std::vector<MachinePageCount> MemoryAllocator::makeSizeClassSizes(
MachinePageCount largest) {
VELOX_CHECK_LE(256, largest);
VELOX_CHECK_EQ(largest, bits::nextPowerOfTwo(largest));
std::vector<MachinePageCount> sizes;
for (auto size = 1; size <= largest; size *= 2) {
sizes.push_back(size);
}
return sizes;
}

namespace {
std::string& cacheFailureMessage() {
thread_local std::string message;
Expand Down Expand Up @@ -371,11 +384,11 @@ std::string Stats::toString() const {
totalAllocations += sizes[i].numAllocations;
}
out << fmt::format(
"Alloc: {}MB {} Gigaclocks {} Allocations, {}MB advised\n",
"Alloc: {}MB {} Gigaclocks Allocations={}, advised={} MB\n",
totalBytes >> 20,
totalClocks >> 30,
numAdvise >> 8,
totalAllocations);
totalAllocations,
numAdvise >> 8);

// Sort the size classes by decreasing clocks.
std::vector<int32_t> indices(sizes.size());
Expand Down Expand Up @@ -436,4 +449,66 @@ std::string MemoryAllocator::getAndClearFailureMessage() {
}
return allocatorErrMsg;
}

namespace {
struct TraceState {
struct rusage rusage;
Stats allocatorStats;
int64_t ioTotal;
struct timeval tv;
};

int64_t toUsec(struct timeval tv) {
return tv.tv_sec * 1000000LL + tv.tv_usec;
}

int32_t elapsedUsec(struct timeval end, struct timeval begin) {
return toUsec(end) - toUsec(begin);
}
} // namespace

void MemoryAllocator::getTracingHooks(
std::function<void()>& init,
std::function<std::string()>& report,
std::function<int64_t()> ioVolume) {
auto allocator = shared_from_this();
auto state = std::make_shared<TraceState>();
init = [state, allocator, ioVolume]() {
getrusage(RUSAGE_SELF, &state->rusage);
struct timezone tz;
gettimeofday(&state->tv, &tz);
state->allocatorStats = allocator->stats();
state->ioTotal = ioVolume ? ioVolume() : 0;
};
report = [state, allocator, ioVolume]() -> std::string {
struct rusage rusage;
getrusage(RUSAGE_SELF, &rusage);
auto newStats = allocator->stats();
float u = elapsedUsec(rusage.ru_utime, state->rusage.ru_utime);
float s = elapsedUsec(rusage.ru_stime, state->rusage.ru_stime);
auto m = allocator->stats() - state->allocatorStats;
float flt = rusage.ru_minflt - state->rusage.ru_minflt;
struct timeval tv;
struct timezone tz;
gettimeofday(&tv, &tz);
float elapsed = elapsedUsec(tv, state->tv);
int64_t io = 0;
if (ioVolume) {
io = ioVolume() - state->ioTotal;
}
std::stringstream out;
out << std::endl
<< std::endl
<< fmt::format(
"user%={} sys%={} minflt/s={}, io={} MB/s\n",
100 * u / elapsed,
100 * s / elapsed,
flt / (elapsed / 1000000),
io / (elapsed));
out << m.toString() << std::endl;
out << allocator->toString() << std::endl;
return out.str();
};
}

} // namespace facebook::velox::memory
11 changes: 10 additions & 1 deletion velox/common/memory/MemoryAllocator.h
Original file line number Diff line number Diff line change
Expand Up @@ -400,8 +400,17 @@ class MemoryAllocator : public std::enable_shared_from_this<MemoryAllocator> {
/// thread. The message is cleared after return.
std::string getAndClearFailureMessage();

void getTracingHooks(
std::function<void()>& init,
std::function<std::string()>& report,
std::function<int64_t()> ioVolume = nullptr);

protected:
explicit MemoryAllocator() = default;
MemoryAllocator(MachinePageCount largestSizeClassPages = 256)
: sizeClassSizes_(makeSizeClassSizes(largestSizeClassPages)) {}

static std::vector<MachinePageCount> makeSizeClassSizes(
MachinePageCount largest);

/// Represents a mix of blocks of different sizes for covering a single
/// allocation.
Expand Down
3 changes: 2 additions & 1 deletion velox/common/memory/MmapAllocator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@

namespace facebook::velox::memory {
MmapAllocator::MmapAllocator(const Options& options)
: kind_(MemoryAllocator::Kind::kMmap),
: MemoryAllocator(options.largestSizeClass),
kind_(MemoryAllocator::Kind::kMmap),
useMmapArena_(options.useMmapArena),
maxMallocBytes_(options.maxMallocBytes),
mallocReservedBytes_(
Expand Down
2 changes: 2 additions & 0 deletions velox/common/memory/MmapAllocator.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ class MmapAllocator : public MemoryAllocator {
/// Capacity in bytes, default unlimited.
uint64_t capacity{kMaxMemory};

int32_t largestSizeClass{256};

/// If set true, allocations larger than largest size class size will be
/// delegated to ManagedMmapArena. Otherwise a system mmap call will be
/// issued for each such allocation.
Expand Down
46 changes: 46 additions & 0 deletions velox/common/memory/tests/MemoryAllocatorTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1943,4 +1943,50 @@ VELOX_INSTANTIATE_TEST_SUITE_P(
MemoryAllocatorTestSuite,
MemoryAllocatorTest,
testing::ValuesIn({0, 1, 2}));

class MmapConfigTest : public testing::Test {
public:
protected:
void setupAllocator() {
constexpr int64_t kCapacityBytes = 900LL << 20; // 900MB.
MemoryManagerOptions options;
options.useMmapAllocator = true;
options.allocatorCapacity = kCapacityBytes;
options.largestSizeClassPages = 4096;
options.arbitratorCapacity = kCapacityBytes;
options.arbitratorReservedCapacity = 128 << 20;
options.memoryPoolReservedCapacity = 1 << 20;
options.smallAllocationReservePct = 4;
options.maxMallocBytes = 3 * 1024;
memoryManager_ = std::make_unique<MemoryManager>(options);
allocator_ = memoryManager_->allocator();
ASSERT_EQ(
AllocationTraits::numPages(memoryManager_->allocator()->capacity()),
bits::roundUp(
kCapacityBytes * (100 - options.smallAllocationReservePct) / 100 /
AllocationTraits::kPageSize,
64 * allocator_->sizeClasses().back()));
}

std::unique_ptr<MemoryManager> memoryManager_;
MemoryAllocator* allocator_;
};

TEST_F(MmapConfigTest, sizeClasses) {
setupAllocator();
Allocation result;
ASSERT_TRUE(
allocator_->allocateNonContiguous(2 * 4096 - 1, result, nullptr, 0));
auto g = folly::makeGuard([&]() { allocator_->freeNonContiguous(result); });
// Check that the allocation has one page of each size class, largest to
// smallest.
EXPECT_EQ(4096 * 2 - 1, result.numPages());
EXPECT_EQ(13, result.numRuns());
int32_t runPages = 4096;
for (auto i = 0; i < result.numRuns(); ++i) {
EXPECT_EQ(runPages, result.runAt(i).numPages());
runPages = runPages / 2;
}
}

} // namespace facebook::velox::memory
16 changes: 15 additions & 1 deletion velox/common/process/Profiler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ tsan_atomic<bool> Profiler::shouldSaveResult_;
int64_t Profiler::sampleStartTime_;
int64_t Profiler::cpuAtSampleStart_;
int64_t Profiler::cpuAtLastCheck_;
std::function<void()> Profiler::startExtra_;
std::function<std::string()> Profiler::extraReport_;

namespace {
std::string hostname;
Expand Down Expand Up @@ -158,6 +160,10 @@ void Profiler::copyToResult(const std::string* data) {
timeString(now),
100 * (cpu - cpuAtSampleStart_) / std::max<int64_t>(1, elapsed)));
out->append(std::string_view(buffer, resultSize));
if (extraReport_) {
std::string extra = extraReport_();
out->append(std::string_view(extra.data(), extra.size()));
}
out->flush();
LOG(INFO) << "PROFILE: Produced result " << target << " " << resultSize
<< " bytes";
Expand All @@ -177,6 +183,9 @@ void Profiler::makeProfileDir(std::string path) {
}

std::thread Profiler::startSample() {
if (startExtra_) {
startExtra_();
}
std::thread thread([&]() {
// We run perf under a shell because running it with fork + rexec
// and killing it with SIGINT produces a corrupt perf.data
Expand Down Expand Up @@ -295,12 +304,17 @@ bool Profiler::isRunning() {
return profileStarted_;
}

void Profiler::start(const std::string& path) {
void Profiler::start(
const std::string& path,
std::function<void()> extraStart,
std::function<std::string()> extraReport) {
{
#if !defined(linux)
VELOX_FAIL("Profiler is only available for Linux");
#endif
resultPath_ = path;
startExtra_ = extraStart;
extraReport_ = extraReport;
std::lock_guard<std::mutex> l(profileMutex_);
if (profileStarted_) {
return;
Expand Down
8 changes: 7 additions & 1 deletion velox/common/process/Profiler.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,10 @@ namespace facebook::velox::process {
class Profiler {
public:
/// Starts periodic production of perf reports.
static void start(const std::string& path);
static void start(
const std::string& path,
std::function<void()> extraStart = nullptr,
std::function<std::string()> extraReport = nullptr);

// Stops profiling background associated threads. Threads are stopped on
// return.
Expand Down Expand Up @@ -69,6 +72,9 @@ class Profiler {

// CPU time at last periodic check.
static int64_t cpuAtLastCheck_;

static std::function<void()> startExtra_;
static std::function<std::string()> extraReport_;
};

} // namespace facebook::velox::process
19 changes: 18 additions & 1 deletion velox/exec/benchmarks/HashTableBenchmark.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,18 @@
#include <gtest/gtest.h>
#include <memory>

#include "velox/common/process/Profiler.h"

DEFINE_int64(custom_size, 0, "Custom number of entries");
DEFINE_int32(custom_hit_rate, 0, "Percentage of hits in custom test");
DEFINE_int32(custom_key_spacing, 1, "Spacing between key values");

DEFINE_int32(custom_num_ways, 10, "Number of build threads");

DEFINE_bool(profile, false, "Generate perf profiles and memory stats");

DECLARE_bool(velox_time_allocations);

using namespace facebook::velox;
using namespace facebook::velox::exec;
using namespace facebook::velox::test;
Expand Down Expand Up @@ -617,7 +623,18 @@ int main(int argc, char** argv) {
options.useMmapArena = true;
options.mmapArenaCapacityRatio = 1;
memory::MemoryManager::initialize(options);

if (FLAGS_profile) {
auto allocator = memory::MemoryManager::getInstance()->allocator();
std::function<void()> reportInit;
std::function<std::string()> report;
allocator->getTracingHooks(reportInit, report, nullptr);
FLAGS_profiler_check_interval_seconds = 20;
FLAGS_profiler_min_cpu_pct = 50;
FLAGS_profiler_max_sample_seconds = 30;
FLAGS_velox_time_allocations = true;
filesystems::registerLocalFileSystem();
process::Profiler::start("/tmp/hashprof", reportInit, report);
}
auto bm = std::make_unique<HashTableBenchmark>();
std::vector<HashTableBenchmarkRun> results;

Expand Down

0 comments on commit 5a4a672

Please sign in to comment.