Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Debugging deterministic simulation with RocksDB #11558

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions fdbserver/KeyValueStoreRocksDB.actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1863,8 +1863,8 @@ struct RocksDBKeyValueStore : IKeyValueStore {
// substantially more confidence in the correctness.
// TODO: Adapt the simulation framework to not advance time quickly when background reads/writes are occurring.
if (g_network->isSimulated()) {
writeThread = CoroThreadPool::createThreadPool();
readThreads = CoroThreadPool::createThreadPool();
writeThread = CoroThreadPool::createThreadPool(true);
readThreads = CoroThreadPool::createThreadPool(true);
} else {
writeThread = createGenericThreadPool(/*stackSize=*/0, SERVER_KNOBS->ROCKSDB_WRITER_THREAD_PRIORITY);
readThreads = createGenericThreadPool(/*stackSize=*/0, SERVER_KNOBS->ROCKSDB_READER_THREAD_PRIORITY);
Expand Down
6 changes: 3 additions & 3 deletions fdbserver/KeyValueStoreShardedRocksDB.actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3592,9 +3592,9 @@ struct ShardedRocksDBKeyValueStore : IKeyValueStore {
// occurring.
if (g_network->isSimulated()) {
TraceEvent(SevDebug, "ShardedRocksDB").detail("Info", "Use Coro threads in simulation.");
writeThread = CoroThreadPool::createThreadPool();
compactionThread = CoroThreadPool::createThreadPool();
readThreads = CoroThreadPool::createThreadPool();
writeThread = CoroThreadPool::createThreadPool(true);
compactionThread = CoroThreadPool::createThreadPool(true);
readThreads = CoroThreadPool::createThreadPool(true);
} else {
writeThread = createGenericThreadPool(/*stackSize=*/0, SERVER_KNOBS->ROCKSDB_WRITER_THREAD_PRIORITY);
compactionThread = createGenericThreadPool(0, SERVER_KNOBS->ROCKSDB_COMPACTION_THREAD_PRIORITY);
Expand Down
4 changes: 2 additions & 2 deletions fdbserver/SimulatedCluster.actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1782,7 +1782,7 @@ void rocksdbStorageEngineConfig(SimulationConfig* simCfg) {
// background threads.
TraceEvent(SevWarnAlways, "RocksDBNonDeterminism")
.detail("Explanation", "The RocksDB storage engine is threaded and non-deterministic");
noUnseed = true;
noUnseed = false;
}

void shardedRocksDBStorageEngineConfig(SimulationConfig* simCfg) {
Expand All @@ -1793,7 +1793,7 @@ void shardedRocksDBStorageEngineConfig(SimulationConfig* simCfg) {
// background threads.
TraceEvent(SevWarnAlways, "ShardedRocksDBNonDeterminism")
.detail("Explanation", "The Sharded RocksDB storage engine is threaded and non-deterministic");
noUnseed = true;
noUnseed = false;
}

const std::unordered_map<SimulationStorageEngine, StorageEngineConfigFunc> STORAGE_ENGINE_CONFIG_MAPPER = {
Expand Down
39 changes: 25 additions & 14 deletions fdbserver/coroimpl/CoroFlowCoro.actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,9 @@ class WorkPool final : public IThreadPool, public ReferenceCounted<WorkPool<Thre
bool stop;
ThreadReturnPromise<Void> stopped;
ThreadReturnPromise<Void> error;
bool immediate = false;

Worker(Pool* pool, IThreadPoolReceiver* userData) : pool(pool), userData(userData), stop(false) {}
Worker(Pool* pool, IThreadPoolReceiver* userData, bool immediate) : pool(pool), userData(userData), stop(false), immediate(immediate) {}

void run() override {
try {
Expand All @@ -144,6 +145,9 @@ class WorkPool final : public IThreadPool, public ReferenceCounted<WorkPool<Thre
PThreadAction a = pool->work.front();
pool->work.pop_front();
pool->queueLock.leave();
if (immediate) {
ASSERT(false);
}
(*a)(userData);
if (IS_CORO)
CoroThreadPool::waitFor(yield());
Expand Down Expand Up @@ -174,6 +178,7 @@ class WorkPool final : public IThreadPool, public ReferenceCounted<WorkPool<Thre
Reference<Pool> pool;
Future<Void> m_stopOnError; // must be last, because its cancellation calls stop()!
Error error;
bool immediate;

ACTOR Future<Void> stopOnError(WorkPool* w) {
try {
Expand All @@ -193,13 +198,14 @@ class WorkPool final : public IThreadPool, public ReferenceCounted<WorkPool<Thre
}

public:
WorkPool() : pool(new Pool) { m_stopOnError = stopOnError(this); }
WorkPool() : pool(new Pool), immediate(false) { m_stopOnError = stopOnError(this); }
WorkPool(bool immediate) : pool(new Pool), immediate(immediate) { m_stopOnError = stopOnError(this); }

Future<Void> getError() const override { return pool->anyError.getResult(); }
void addThread(IThreadPoolReceiver* userData, const char*) override {
checkError();

auto w = new Worker(pool.getPtr(), userData);
auto w = new Worker(pool.getPtr(), userData, immediate);
pool->queueLock.enter();
pool->workers.push_back(w);
pool->queueLock.leave();
Expand All @@ -216,15 +222,20 @@ class WorkPool final : public IThreadPool, public ReferenceCounted<WorkPool<Thre
void post(PThreadAction action) override {
checkError();

pool->queueLock.enter();
pool->work.push_back(action);
if (!pool->idle.empty()) {
Worker* c = pool->idle.back();
pool->idle.pop_back();
pool->queueLock.leave();
c->unblock();
} else
pool->queueLock.leave();
if (!immediate) {
pool->queueLock.enter();
pool->work.push_back(action);
if (!pool->idle.empty()) {
Worker* c = pool->idle.back();
pool->idle.pop_back();
pool->queueLock.leave();
c->unblock();
} else
pool->queueLock.leave();
} else {
ASSERT(pool->workers.size() > 0 && !pool->workers[0]->stop);
(*action)(pool->workers[0]->userData);
}
}
Future<Void> stop(Error const& e) override {
if (error.code() == invalid_error_code) {
Expand Down Expand Up @@ -299,6 +310,6 @@ void CoroThreadPool::init() {
}
}

Reference<IThreadPool> CoroThreadPool::createThreadPool() {
return Reference<IThreadPool>(new CoroPool);
Reference<IThreadPool> CoroThreadPool::createThreadPool(bool immediate) {
return Reference<IThreadPool>(new CoroPool(immediate));
}
2 changes: 1 addition & 1 deletion fdbserver/include/fdbserver/CoroFlow.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ class CoroThreadPool {
static void init();
static void waitFor(Future<Void> what);

static Reference<IThreadPool> createThreadPool();
static Reference<IThreadPool> createThreadPool(bool immediate = false);

protected:
CoroThreadPool() {}
Expand Down
14 changes: 9 additions & 5 deletions flow/flow.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -124,13 +124,17 @@ Reference<IRandom> deterministicRandom() {
}

Reference<IRandom> nondeterministicRandom() {
static thread_local Reference<IRandom> random;
if (!random) {
random = Reference<IRandom>(new DeterministicRandom(platform::getRandomSeed()));
}
return random;
return deterministicRandom();
}

/* Reference<IRandom> nondeterministicRandom() {
static thread_local Reference<IRandom> random;
if (!random) {
random = Reference<IRandom>(new DeterministicRandom(platform::getRandomSeed()));
}
return random;
}*/

std::string UID::toString() const {
return fmt::format("{:016x}{:016x}", part[0], part[1]);
}
Expand Down