From 085cf866b5616f0a63d6eb2e970b804c0e6cc4c0 Mon Sep 17 00:00:00 2001 From: neethuhaneesha Date: Thu, 19 Dec 2024 12:29:34 -0800 Subject: [PATCH] Rocksdb manual flush code changes --- fdbclient/ServerKnobs.cpp | 4 +- fdbserver/KeyValueStoreRocksDB.actor.cpp | 54 +++++++++++++++--------- 2 files changed, 35 insertions(+), 23 deletions(-) diff --git a/fdbclient/ServerKnobs.cpp b/fdbclient/ServerKnobs.cpp index ace21d837ee..d662c10f418 100644 --- a/fdbclient/ServerKnobs.cpp +++ b/fdbclient/ServerKnobs.cpp @@ -596,8 +596,8 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi init( ROCKSDB_CF_METRICS_DELAY, 900.0 ); init( ROCKSDB_MAX_LOG_FILE_SIZE, 10485760 ); // 10MB. init( ROCKSDB_KEEP_LOG_FILE_NUM, 100 ); // Keeps 1GB log per storage server. - // Does manual flushes at regular intervals(seconds), incase rocksdb did not flush. Feature disable if the value is 0. - init( ROCKSDB_MANUAL_FLUSH_TIME_INTERVAL, 0 ); if( randomize && BUGGIFY ) ROCKSDB_MANUAL_FLUSH_TIME_INTERVAL = deterministicRandom()->randomInt(4, 10); + // Does manual flushes at regular intervals(seconds), incase rocksdb did not flush. Feature disabled if the value is 0. + init( ROCKSDB_MANUAL_FLUSH_TIME_INTERVAL, 600 ); if( isSimulated ) ROCKSDB_MANUAL_FLUSH_TIME_INTERVAL = deterministicRandom()->randomInt(4, 1200); init( ROCKSDB_SKIP_STATS_UPDATE_ON_OPEN, true ); init( ROCKSDB_SKIP_FILE_SIZE_CHECK_ON_OPEN, true ); init( ROCKSDB_FULLFILE_CHECKSUM, false ); if ( randomize && BUGGIFY ) ROCKSDB_FULLFILE_CHECKSUM = true; diff --git a/fdbserver/KeyValueStoreRocksDB.actor.cpp b/fdbserver/KeyValueStoreRocksDB.actor.cpp index df940a51f37..d23cb358b7d 100644 --- a/fdbserver/KeyValueStoreRocksDB.actor.cpp +++ b/fdbserver/KeyValueStoreRocksDB.actor.cpp @@ -107,6 +107,8 @@ class SharedRocksDBState { rocksdb::Options getOptions() const { return rocksdb::Options(this->dbOptions, this->cfOptions); } rocksdb::ReadOptions getReadOptions() { return this->readOptions; } rocksdb::FlushOptions getFlushOptions() { return this->flushOptions; } + double getLastFlushTime() { return this->lastFlushTime_; } + void setLastFlushTime(double lastFlushTime) { this->lastFlushTime_ = lastFlushTime; } private: const UID id; @@ -120,6 +122,7 @@ class SharedRocksDBState { rocksdb::ColumnFamilyOptions cfOptions; rocksdb::ReadOptions readOptions; rocksdb::FlushOptions flushOptions; + std::atomic lastFlushTime_; }; SharedRocksDBState::SharedRocksDBState(UID id) @@ -374,12 +377,14 @@ class RocksDBErrorListener : public rocksdb::EventListener { class RocksDBEventListener : public rocksdb::EventListener { public: - RocksDBEventListener(std::shared_ptr lastFlushTime) : lastFlushTime(lastFlushTime){}; + RocksDBEventListener(std::shared_ptr sharedState) : sharedState(sharedState){}; - void OnFlushCompleted(rocksdb::DB* db, const rocksdb::FlushJobInfo& info) override { *lastFlushTime = now(); } + void OnFlushCompleted(rocksdb::DB* db, const rocksdb::FlushJobInfo& info) override { + sharedState->setLastFlushTime(now()); + } private: - std::shared_ptr lastFlushTime; + std::shared_ptr sharedState; }; using DB = rocksdb::DB*; @@ -986,19 +991,30 @@ ACTOR Future flowLockLogger(UID id, const FlowLock* readLock, const FlowLo } } -ACTOR Future manualFlush(UID id, - rocksdb::DB* db, - std::shared_ptr sharedState, - std::shared_ptr lastFlushTime, - CF cf) { +ACTOR Future manualFlush(UID id, rocksdb::DB* db, std::shared_ptr sharedState, CF cf) { if (SERVER_KNOBS->ROCKSDB_MANUAL_FLUSH_TIME_INTERVAL) { state rocksdb::FlushOptions fOptions = sharedState->getFlushOptions(); + state double waitTime = SERVER_KNOBS->ROCKSDB_MANUAL_FLUSH_TIME_INTERVAL; + state double currTime = 0; + state int timeElapsedAfterLastFlush = 0; loop { - wait(delay(SERVER_KNOBS->ROCKSDB_MANUAL_FLUSH_TIME_INTERVAL)); + wait(delay(waitTime)); - if ((now() - *lastFlushTime) > SERVER_KNOBS->ROCKSDB_MANUAL_FLUSH_TIME_INTERVAL) { + currTime = now(); + timeElapsedAfterLastFlush = currTime - sharedState->getLastFlushTime(); + if (timeElapsedAfterLastFlush >= SERVER_KNOBS->ROCKSDB_MANUAL_FLUSH_TIME_INTERVAL) { db->Flush(fOptions, cf); - TraceEvent e("RocksDBManualFlush", id); + waitTime = SERVER_KNOBS->ROCKSDB_MANUAL_FLUSH_TIME_INTERVAL; + TraceEvent("RocksDBManualFlush", id).detail("TimeElapsedAfterLastFlush", timeElapsedAfterLastFlush); + } else { + waitTime = SERVER_KNOBS->ROCKSDB_MANUAL_FLUSH_TIME_INTERVAL - timeElapsedAfterLastFlush; + } + + // The above code generates different waitTimes based on rocksdb background flushes which causes non + // deterministic behavior. Setting constant waitTimes in simulation to avoid this. And enable the behavior + // only in RocksdbNondeterministic(ROCKSDB_METRICS_IN_SIMULATION=true) test. + if (g_network->isSimulated() && !SERVER_KNOBS->ROCKSDB_METRICS_IN_SIMULATION) { + waitTime = SERVER_KNOBS->ROCKSDB_MANUAL_FLUSH_TIME_INTERVAL; } } } @@ -1289,11 +1305,9 @@ struct RocksDBKeyValueStore : IKeyValueStore { const FlowLock* fetchLock, std::shared_ptr errorListener, std::shared_ptr eventListener, - std::shared_ptr lastFlushTime, Counters& counters) : path(std::move(path)), metrics(metrics), readLock(readLock), fetchLock(fetchLock), - errorListener(errorListener), eventListener(eventListener), lastFlushTime(lastFlushTime), - counters(counters) {} + errorListener(errorListener), eventListener(eventListener), counters(counters) {} double getTimeEstimate() const override { return SERVER_KNOBS->COMMIT_TIME_ESTIMATE; } }; @@ -1368,10 +1382,10 @@ struct RocksDBKeyValueStore : IKeyValueStore { &a.counters, cf) && flowLockLogger(id, a.readLock, a.fetchLock) && refreshReadIteratorPool(readIterPool) && - manualFlush(id, db, sharedState, a.lastFlushTime, cf); + manualFlush(id, db, sharedState, cf); } else { a.metrics = flowLockLogger(id, a.readLock, a.fetchLock) && refreshReadIteratorPool(readIterPool) && - manualFlush(id, db, sharedState, a.lastFlushTime, cf); + manualFlush(id, db, sharedState, cf); } } else { onMainThread([&] { @@ -1384,7 +1398,7 @@ struct RocksDBKeyValueStore : IKeyValueStore { &a.counters, cf) && flowLockLogger(id, a.readLock, a.fetchLock) && refreshReadIteratorPool(readIterPool) && - manualFlush(id, db, sharedState, a.lastFlushTime, cf); + manualFlush(id, db, sharedState, cf); return Future(true); }).blockUntilReady(); } @@ -1898,8 +1912,7 @@ struct RocksDBKeyValueStore : IKeyValueStore { numReadWaiters(SERVER_KNOBS->ROCKSDB_READ_QUEUE_HARD_MAX - SERVER_KNOBS->ROCKSDB_READ_QUEUE_SOFT_MAX), numFetchWaiters(SERVER_KNOBS->ROCKSDB_FETCH_QUEUE_HARD_MAX - SERVER_KNOBS->ROCKSDB_FETCH_QUEUE_SOFT_MAX), errorListener(std::make_shared(id)), errorFuture(errorListener->getFuture()) { - lastFlushTime = std::make_shared(now()); - eventListener = std::make_shared(lastFlushTime); + eventListener = std::make_shared(sharedState); // In simluation, run the reader/writer threads as Coro threads (i.e. in the network thread. The storage engine // is still multi-threaded as background compaction threads are still present. Reads/writes to disk will also // block the network thread in a way that would be unacceptable in production but is a necessary evil here. When @@ -2093,7 +2106,7 @@ struct RocksDBKeyValueStore : IKeyValueStore { return openFuture; } auto a = std::make_unique( - path, metrics, &readSemaphore, &fetchSemaphore, errorListener, eventListener, lastFlushTime, counters); + path, metrics, &readSemaphore, &fetchSemaphore, errorListener, eventListener, counters); openFuture = a->done.getFuture(); writeThread->post(a.release()); return openFuture; @@ -2427,7 +2440,6 @@ struct RocksDBKeyValueStore : IKeyValueStore { Reference readThreads; std::shared_ptr errorListener; std::shared_ptr eventListener; - std::shared_ptr lastFlushTime; Future errorFuture; Promise closePromise; Future openFuture;