From c69940e83d6f0d357728e6f2db9acddacfe4ba78 Mon Sep 17 00:00:00 2001 From: Daniel Byrne Date: Wed, 11 Sep 2024 19:17:42 -0700 Subject: [PATCH] This commit adds support for ampFactor in the BinaryReplayGenerator. --- cachelib/cachebench/cache/CacheValue.h | 2 +- cachelib/cachebench/util/Request.h | 2 + .../workload/BinaryKVReplayGenerator.h | 63 +++++++++++++++---- 3 files changed, 54 insertions(+), 13 deletions(-) diff --git a/cachelib/cachebench/cache/CacheValue.h b/cachelib/cachebench/cache/CacheValue.h index 1f2477fa0a..786b06af68 100644 --- a/cachelib/cachebench/cache/CacheValue.h +++ b/cachelib/cachebench/cache/CacheValue.h @@ -70,7 +70,7 @@ class CACHELIB_PACKED_ATTR CacheValue { // static function to make sure the item size is at least sizeof CacheValue // so we have enough space for fields used by sanity checks static size_t getSize(size_t size) { - return std::max(size, sizeof(CacheValue)); + return std::max((uint32_t)size, sizeof(CacheValue)); } // static function to help to initialize CacheValue with given item's data // pointer. diff --git a/cachelib/cachebench/util/Request.h b/cachelib/cachebench/util/Request.h index 60dd727de9..c29a0ca53f 100644 --- a/cachelib/cachebench/util/Request.h +++ b/cachelib/cachebench/util/Request.h @@ -170,6 +170,8 @@ struct Request { OpType getOp() const noexcept { return op.load(); } void setOp(OpType o) noexcept { op = o; } + inline void updateKey(std::string_view k) { key = k; } + std::string_view key; // size iterators in case this request is diff --git a/cachelib/cachebench/workload/BinaryKVReplayGenerator.h b/cachelib/cachebench/workload/BinaryKVReplayGenerator.h index 305f9f65f0..e97629972c 100644 --- a/cachelib/cachebench/workload/BinaryKVReplayGenerator.h +++ b/cachelib/cachebench/workload/BinaryKVReplayGenerator.h @@ -77,6 +77,8 @@ class BinaryKVReplayGenerator : public ReplayGeneratorBase { // per thread: the number of requests to run through // the trace before jumping to next offset static constexpr size_t kRunLength = 10000; + static constexpr size_t kMinKeySize = 16; + static constexpr size_t maxAmpFactor = 10000; // StressorCtx keeps track of the state including the submission queues // per stressor thread. Since there is only one request generator thread, @@ -94,7 +96,11 @@ class BinaryKVReplayGenerator : public ReplayGeneratorBase { reinterpret_cast(0), OpType::kGet, 0), - runIdx_(0) {} + runIdx_(0) { + for (int i = 0; i < maxAmpFactor; i++) { + suffixes.push_back(folly::sformat("{:05d}", i)); + } + } bool isFinished() { return finished_.load(std::memory_order_relaxed); } void markFinish() { finished_.store(true, std::memory_order_relaxed); } @@ -103,10 +109,39 @@ class BinaryKVReplayGenerator : public ReplayGeneratorBase { reqIdx_ = id_ * kRunLength; } - uint32_t id_{0}; - uint64_t reqIdx_{0}; + std::string_view updateKeyWithAmpFactor(std::string_view key) { + if (ampFactor_ > 0) { + // trunkcate the key so we don't overflow + // max ampFactor is 10000, so we reserve at least 5 bytes + size_t keySize = key.size() > kMinKeySize ? + std::max(key.size() - 5, kMinKeySize) : + key.size(); + // copy the key into the currKey memory + std::memcpy(currKey_, key.data(), keySize); + std::memcpy(currKey_ + keySize, suffixes[ampFactor_].data(), + suffixes[ampFactor_].size()); + // add null terminating + currKey_[keySize + suffixes[ampFactor_].size()] = '\0'; + ampFactor_--; + } else { + // copy the key into the currKey memory + std::memcpy(currKey_, key.data(), key.size()); + // add null terminating + currKey_[key.size()] = '\0'; + } + return std::string_view(currKey_); + } + Request request_; + uint64_t reqIdx_{0}; uint64_t runIdx_{0}; + uint32_t id_{0}; + uint32_t ampFactor_{0}; + std::vector suffixes; + // space for the current key, with the + // ampFactor suffix + char currKey_[256]; + // Thread that finish its operations mark it here, so we will skip // further request on its shard std::atomic finished_{false}; @@ -159,10 +194,18 @@ const Request& BinaryKVReplayGenerator::getReq(uint8_t, BinaryRequest* prevReq = reinterpret_cast(*(r.requestId)); if (prevReq != nullptr && prevReq->repeats_ > 1) { prevReq->repeats_ = prevReq->repeats_ - 1; - } else { + } else if (stressorCtx.ampFactor_ == 0) { BinaryRequest* req = nullptr; try { req = binaryStream_.getNextPtr(stressorCtx.reqIdx_ + stressorCtx.runIdx_); + stressorCtx.ampFactor_ = ampFactor_; + // update the binary request index + if (stressorCtx.runIdx_ < kRunLength) { + stressorCtx.runIdx_++; + } else { + stressorCtx.runIdx_ = 0; + stressorCtx.reqIdx_ += numShards_ * kRunLength; + } } catch (const EndOfTrace& e) { if (config_.repeatTraceReplay) { XLOGF_EVERY_MS( @@ -170,6 +213,7 @@ const Request& BinaryKVReplayGenerator::getReq(uint8_t, "{} Reached the end of trace files. Restarting from beginning.", stressorCtx.id_); stressorCtx.resetIdx(); + stressorCtx.ampFactor_ = ampFactor_; req = binaryStream_.getNextPtr(stressorCtx.reqIdx_ + stressorCtx.runIdx_); } else { @@ -180,20 +224,15 @@ const Request& BinaryKVReplayGenerator::getReq(uint8_t, XDCHECK_NE(req, nullptr); XDCHECK_LT(req->op_, 12); - auto key = req->getKey(); OpType op = static_cast(req->op_); req->valueSize_ = (req->valueSize_) * ampSizeFactor_; - r.update(key, + r.update(stressorCtx.updateKeyWithAmpFactor(req->getKey()), const_cast(reinterpret_cast(&req->valueSize_)), op, req->ttl_, reinterpret_cast(req)); - if (stressorCtx.runIdx_ < kRunLength) { - stressorCtx.runIdx_++; - } else { - stressorCtx.runIdx_ = 0; - stressorCtx.reqIdx_ += numShards_ * kRunLength; - } + } else { + r.updateKey(stressorCtx.updateKeyWithAmpFactor(prevReq->getKey())); } return r; }