From 867f0a48ef54692383142d0dacedd73ba5c4bf1c Mon Sep 17 00:00:00 2001 From: neethuhaneesha Date: Thu, 19 Dec 2024 12:29:34 -0800 Subject: [PATCH] Rocksdb manual flush code changes --- fdbclient/ServerKnobs.cpp | 4 +-- fdbserver/KeyValueStoreRocksDB.actor.cpp | 45 +++++++++++++----------- 2 files changed, 27 insertions(+), 22 deletions(-) diff --git a/fdbclient/ServerKnobs.cpp b/fdbclient/ServerKnobs.cpp index 01f581b3b33..b24f7cda0f4 100644 --- a/fdbclient/ServerKnobs.cpp +++ b/fdbclient/ServerKnobs.cpp @@ -573,8 +573,8 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi init( ROCKSDB_CF_METRICS_DELAY, 900.0 ); init( ROCKSDB_MAX_LOG_FILE_SIZE, 10485760 ); // 10MB. init( ROCKSDB_KEEP_LOG_FILE_NUM, 100 ); // Keeps 1GB log per storage server. - // Does manual flushes at regular intervals(seconds), incase rocksdb did not flush. Feature disable if the value is 0. - init( ROCKSDB_MANUAL_FLUSH_TIME_INTERVAL, 0 ); if( randomize && BUGGIFY ) ROCKSDB_MANUAL_FLUSH_TIME_INTERVAL = deterministicRandom()->randomInt(4, 10); + // Does manual flushes at regular intervals(seconds), incase rocksdb did not flush. Feature disabled if the value is 0. + init( ROCKSDB_MANUAL_FLUSH_TIME_INTERVAL, 600 ); if( isSimulated ) ROCKSDB_MANUAL_FLUSH_TIME_INTERVAL = deterministicRandom()->randomInt(4, 1200); init( ROCKSDB_SKIP_STATS_UPDATE_ON_OPEN, true ); init( ROCKSDB_SKIP_FILE_SIZE_CHECK_ON_OPEN, true ); init( ROCKSDB_FULLFILE_CHECKSUM, false ); if ( randomize && BUGGIFY ) ROCKSDB_FULLFILE_CHECKSUM = true; diff --git a/fdbserver/KeyValueStoreRocksDB.actor.cpp b/fdbserver/KeyValueStoreRocksDB.actor.cpp index 4aa8cf2ca5c..b052212b7df 100644 --- a/fdbserver/KeyValueStoreRocksDB.actor.cpp +++ b/fdbserver/KeyValueStoreRocksDB.actor.cpp @@ -106,6 +106,8 @@ class SharedRocksDBState { rocksdb::Options getOptions() const { return rocksdb::Options(this->dbOptions, this->cfOptions); } rocksdb::ReadOptions getReadOptions() { return this->readOptions; } rocksdb::FlushOptions getFlushOptions() { return this->flushOptions; } + double getLastFlushTime() const { return this->lastFlushTime_; } + void setLastFlushTime(double lastFlushTime) { this->lastFlushTime_ = lastFlushTime; } private: const UID id; @@ -119,6 +121,7 @@ class SharedRocksDBState { rocksdb::ColumnFamilyOptions cfOptions; rocksdb::ReadOptions readOptions; rocksdb::FlushOptions flushOptions; + std::atomic lastFlushTime_; }; SharedRocksDBState::SharedRocksDBState(UID id) @@ -373,12 +376,14 @@ class RocksDBErrorListener : public rocksdb::EventListener { class RocksDBEventListener : public rocksdb::EventListener { public: - RocksDBEventListener(std::shared_ptr lastFlushTime) : lastFlushTime(lastFlushTime){}; + RocksDBEventListener(std::shared_ptr sharedState) : sharedState(sharedState){}; - void OnFlushCompleted(rocksdb::DB* db, const rocksdb::FlushJobInfo& info) override { *lastFlushTime = now(); } + void OnFlushCompleted(rocksdb::DB* db, const rocksdb::FlushJobInfo& info) override { + sharedState->setLastFlushTime(now()); + } private: - std::shared_ptr lastFlushTime; + std::shared_ptr sharedState; }; using DB = rocksdb::DB*; @@ -985,19 +990,23 @@ ACTOR Future flowLockLogger(UID id, const FlowLock* readLock, const FlowLo } } -ACTOR Future manualFlush(UID id, - rocksdb::DB* db, - std::shared_ptr sharedState, - std::shared_ptr lastFlushTime, - CF cf) { +ACTOR Future manualFlush(UID id, rocksdb::DB* db, std::shared_ptr sharedState, CF cf) { if (SERVER_KNOBS->ROCKSDB_MANUAL_FLUSH_TIME_INTERVAL) { state rocksdb::FlushOptions fOptions = sharedState->getFlushOptions(); + state double waitTime = SERVER_KNOBS->ROCKSDB_MANUAL_FLUSH_TIME_INTERVAL; + state double currTime = 0; + state int timeElapsedAfterLastFlush = 0; loop { - wait(delay(SERVER_KNOBS->ROCKSDB_MANUAL_FLUSH_TIME_INTERVAL)); + wait(delay(waitTime)); - if ((now() - *lastFlushTime) > SERVER_KNOBS->ROCKSDB_MANUAL_FLUSH_TIME_INTERVAL) { + currTime = now(); + timeElapsedAfterLastFlush = currTime - sharedState->getLastFlushTime(); + if (timeElapsedAfterLastFlush >= SERVER_KNOBS->ROCKSDB_MANUAL_FLUSH_TIME_INTERVAL) { db->Flush(fOptions, cf); - TraceEvent e("RocksDBManualFlush", id); + waitTime = SERVER_KNOBS->ROCKSDB_MANUAL_FLUSH_TIME_INTERVAL; + TraceEvent("RocksDBManualFlush", id).detail("TimeElapsedAfterLastFlush", timeElapsedAfterLastFlush); + } else { + waitTime = SERVER_KNOBS->ROCKSDB_MANUAL_FLUSH_TIME_INTERVAL - timeElapsedAfterLastFlush; } } } @@ -1288,11 +1297,9 @@ struct RocksDBKeyValueStore : IKeyValueStore { const FlowLock* fetchLock, std::shared_ptr errorListener, std::shared_ptr eventListener, - std::shared_ptr lastFlushTime, Counters& counters) : path(std::move(path)), metrics(metrics), readLock(readLock), fetchLock(fetchLock), - errorListener(errorListener), eventListener(eventListener), lastFlushTime(lastFlushTime), - counters(counters) {} + errorListener(errorListener), eventListener(eventListener), counters(counters) {} double getTimeEstimate() const override { return SERVER_KNOBS->COMMIT_TIME_ESTIMATE; } }; @@ -1361,7 +1368,7 @@ struct RocksDBKeyValueStore : IKeyValueStore { rocksDBMetricLogger( id, sharedState, options.statistics, perfContextMetrics, db, readIterPool, &a.counters, cf) && flowLockLogger(id, a.readLock, a.fetchLock) && refreshReadIteratorPool(readIterPool) && - manualFlush(id, db, sharedState, a.lastFlushTime, cf); + manualFlush(id, db, sharedState, cf); } else { onMainThread([&] { a.metrics = rocksDBMetricLogger(id, @@ -1373,7 +1380,7 @@ struct RocksDBKeyValueStore : IKeyValueStore { &a.counters, cf) && flowLockLogger(id, a.readLock, a.fetchLock) && refreshReadIteratorPool(readIterPool) && - manualFlush(id, db, sharedState, a.lastFlushTime, cf); + manualFlush(id, db, sharedState, cf); return Future(true); }).blockUntilReady(); } @@ -1887,8 +1894,7 @@ struct RocksDBKeyValueStore : IKeyValueStore { numReadWaiters(SERVER_KNOBS->ROCKSDB_READ_QUEUE_HARD_MAX - SERVER_KNOBS->ROCKSDB_READ_QUEUE_SOFT_MAX), numFetchWaiters(SERVER_KNOBS->ROCKSDB_FETCH_QUEUE_HARD_MAX - SERVER_KNOBS->ROCKSDB_FETCH_QUEUE_SOFT_MAX), errorListener(std::make_shared(id)), errorFuture(errorListener->getFuture()) { - lastFlushTime = std::make_shared(now()); - eventListener = std::make_shared(lastFlushTime); + eventListener = std::make_shared(sharedState); // In simluation, run the reader/writer threads as Coro threads (i.e. in the network thread. The storage engine // is still multi-threaded as background compaction threads are still present. Reads/writes to disk will also // block the network thread in a way that would be unacceptable in production but is a necessary evil here. When @@ -2082,7 +2088,7 @@ struct RocksDBKeyValueStore : IKeyValueStore { return openFuture; } auto a = std::make_unique( - path, metrics, &readSemaphore, &fetchSemaphore, errorListener, eventListener, lastFlushTime, counters); + path, metrics, &readSemaphore, &fetchSemaphore, errorListener, eventListener, counters); openFuture = a->done.getFuture(); writeThread->post(a.release()); return openFuture; @@ -2400,7 +2406,6 @@ struct RocksDBKeyValueStore : IKeyValueStore { Reference readThreads; std::shared_ptr errorListener; std::shared_ptr eventListener; - std::shared_ptr lastFlushTime; Future errorFuture; Promise closePromise; Future openFuture;