Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rocksdb manual flush code changes #11849

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions fdbclient/ServerKnobs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -596,8 +596,8 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
init( ROCKSDB_CF_METRICS_DELAY, 900.0 );
init( ROCKSDB_MAX_LOG_FILE_SIZE, 10485760 ); // 10MB.
init( ROCKSDB_KEEP_LOG_FILE_NUM, 100 ); // Keeps 1GB log per storage server.
// Does manual flushes at regular intervals(seconds), incase rocksdb did not flush. Feature disable if the value is 0.
init( ROCKSDB_MANUAL_FLUSH_TIME_INTERVAL, 0 ); if( randomize && BUGGIFY ) ROCKSDB_MANUAL_FLUSH_TIME_INTERVAL = deterministicRandom()->randomInt(4, 10);
// Does manual flushes at regular intervals(seconds), incase rocksdb did not flush. Feature disabled if the value is 0.
init( ROCKSDB_MANUAL_FLUSH_TIME_INTERVAL, 600 ); if( isSimulated ) ROCKSDB_MANUAL_FLUSH_TIME_INTERVAL = deterministicRandom()->randomInt(4, 1200);
init( ROCKSDB_SKIP_STATS_UPDATE_ON_OPEN, true );
init( ROCKSDB_SKIP_FILE_SIZE_CHECK_ON_OPEN, true );
init( ROCKSDB_FULLFILE_CHECKSUM, false ); if ( randomize && BUGGIFY ) ROCKSDB_FULLFILE_CHECKSUM = true;
Expand Down
54 changes: 33 additions & 21 deletions fdbserver/KeyValueStoreRocksDB.actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@ class SharedRocksDBState {
rocksdb::Options getOptions() const { return rocksdb::Options(this->dbOptions, this->cfOptions); }
rocksdb::ReadOptions getReadOptions() { return this->readOptions; }
rocksdb::FlushOptions getFlushOptions() { return this->flushOptions; }
double getLastFlushTime() { return this->lastFlushTime_; }
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method can be made const

void setLastFlushTime(double lastFlushTime) { this->lastFlushTime_ = lastFlushTime; }

private:
const UID id;
Expand All @@ -120,6 +122,7 @@ class SharedRocksDBState {
rocksdb::ColumnFamilyOptions cfOptions;
rocksdb::ReadOptions readOptions;
rocksdb::FlushOptions flushOptions;
std::atomic<double> lastFlushTime_;
};

SharedRocksDBState::SharedRocksDBState(UID id)
Expand Down Expand Up @@ -374,12 +377,14 @@ class RocksDBErrorListener : public rocksdb::EventListener {

class RocksDBEventListener : public rocksdb::EventListener {
public:
RocksDBEventListener(std::shared_ptr<double> lastFlushTime) : lastFlushTime(lastFlushTime){};
RocksDBEventListener(std::shared_ptr<SharedRocksDBState> sharedState) : sharedState(sharedState){};

void OnFlushCompleted(rocksdb::DB* db, const rocksdb::FlushJobInfo& info) override { *lastFlushTime = now(); }
void OnFlushCompleted(rocksdb::DB* db, const rocksdb::FlushJobInfo& info) override {
sharedState->setLastFlushTime(now());
}

private:
std::shared_ptr<double> lastFlushTime;
std::shared_ptr<SharedRocksDBState> sharedState;
};

using DB = rocksdb::DB*;
Expand Down Expand Up @@ -986,19 +991,30 @@ ACTOR Future<Void> flowLockLogger(UID id, const FlowLock* readLock, const FlowLo
}
}

ACTOR Future<Void> manualFlush(UID id,
rocksdb::DB* db,
std::shared_ptr<SharedRocksDBState> sharedState,
std::shared_ptr<double> lastFlushTime,
CF cf) {
ACTOR Future<Void> manualFlush(UID id, rocksdb::DB* db, std::shared_ptr<SharedRocksDBState> sharedState, CF cf) {
if (SERVER_KNOBS->ROCKSDB_MANUAL_FLUSH_TIME_INTERVAL) {
state rocksdb::FlushOptions fOptions = sharedState->getFlushOptions();
state double waitTime = SERVER_KNOBS->ROCKSDB_MANUAL_FLUSH_TIME_INTERVAL;
state double currTime = 0;
state int timeElapsedAfterLastFlush = 0;
loop {
wait(delay(SERVER_KNOBS->ROCKSDB_MANUAL_FLUSH_TIME_INTERVAL));
wait(delay(waitTime));

if ((now() - *lastFlushTime) > SERVER_KNOBS->ROCKSDB_MANUAL_FLUSH_TIME_INTERVAL) {
currTime = now();
timeElapsedAfterLastFlush = currTime - sharedState->getLastFlushTime();
if (timeElapsedAfterLastFlush >= SERVER_KNOBS->ROCKSDB_MANUAL_FLUSH_TIME_INTERVAL) {
db->Flush(fOptions, cf);
TraceEvent e("RocksDBManualFlush", id);
waitTime = SERVER_KNOBS->ROCKSDB_MANUAL_FLUSH_TIME_INTERVAL;
TraceEvent("RocksDBManualFlush", id).detail("TimeElapsedAfterLastFlush", timeElapsedAfterLastFlush);
} else {
waitTime = SERVER_KNOBS->ROCKSDB_MANUAL_FLUSH_TIME_INTERVAL - timeElapsedAfterLastFlush;
}

// The above code generates different waitTimes based on rocksdb background flushes which causes non
// deterministic behavior. Setting constant waitTimes in simulation to avoid this. And enable the behavior
// only in RocksdbNondeterministic(ROCKSDB_METRICS_IN_SIMULATION=true) test.
if (g_network->isSimulated() && !SERVER_KNOBS->ROCKSDB_METRICS_IN_SIMULATION) {
waitTime = SERVER_KNOBS->ROCKSDB_MANUAL_FLUSH_TIME_INTERVAL;
Comment on lines +1016 to +1017
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this code is not related to metrics, can we rename ROCKSDB_METRICS_IN_SIMULATION to something like ROCKSDB_ENABLE_NONDETERMINISM? That way we have one catch all knob for anything related to rocksdb non determinism.

}
}
}
Expand Down Expand Up @@ -1289,11 +1305,9 @@ struct RocksDBKeyValueStore : IKeyValueStore {
const FlowLock* fetchLock,
std::shared_ptr<RocksDBErrorListener> errorListener,
std::shared_ptr<RocksDBEventListener> eventListener,
std::shared_ptr<double> lastFlushTime,
Counters& counters)
: path(std::move(path)), metrics(metrics), readLock(readLock), fetchLock(fetchLock),
errorListener(errorListener), eventListener(eventListener), lastFlushTime(lastFlushTime),
counters(counters) {}
errorListener(errorListener), eventListener(eventListener), counters(counters) {}

double getTimeEstimate() const override { return SERVER_KNOBS->COMMIT_TIME_ESTIMATE; }
};
Expand Down Expand Up @@ -1368,10 +1382,10 @@ struct RocksDBKeyValueStore : IKeyValueStore {
&a.counters,
cf) &&
flowLockLogger(id, a.readLock, a.fetchLock) && refreshReadIteratorPool(readIterPool) &&
manualFlush(id, db, sharedState, a.lastFlushTime, cf);
manualFlush(id, db, sharedState, cf);
} else {
a.metrics = flowLockLogger(id, a.readLock, a.fetchLock) && refreshReadIteratorPool(readIterPool) &&
manualFlush(id, db, sharedState, a.lastFlushTime, cf);
manualFlush(id, db, sharedState, cf);
}
} else {
onMainThread([&] {
Expand All @@ -1384,7 +1398,7 @@ struct RocksDBKeyValueStore : IKeyValueStore {
&a.counters,
cf) &&
flowLockLogger(id, a.readLock, a.fetchLock) && refreshReadIteratorPool(readIterPool) &&
manualFlush(id, db, sharedState, a.lastFlushTime, cf);
manualFlush(id, db, sharedState, cf);
return Future<bool>(true);
}).blockUntilReady();
}
Expand Down Expand Up @@ -1898,8 +1912,7 @@ struct RocksDBKeyValueStore : IKeyValueStore {
numReadWaiters(SERVER_KNOBS->ROCKSDB_READ_QUEUE_HARD_MAX - SERVER_KNOBS->ROCKSDB_READ_QUEUE_SOFT_MAX),
numFetchWaiters(SERVER_KNOBS->ROCKSDB_FETCH_QUEUE_HARD_MAX - SERVER_KNOBS->ROCKSDB_FETCH_QUEUE_SOFT_MAX),
errorListener(std::make_shared<RocksDBErrorListener>(id)), errorFuture(errorListener->getFuture()) {
lastFlushTime = std::make_shared<double>(now());
eventListener = std::make_shared<RocksDBEventListener>(lastFlushTime);
eventListener = std::make_shared<RocksDBEventListener>(sharedState);
// In simluation, run the reader/writer threads as Coro threads (i.e. in the network thread. The storage engine
// is still multi-threaded as background compaction threads are still present. Reads/writes to disk will also
// block the network thread in a way that would be unacceptable in production but is a necessary evil here. When
Expand Down Expand Up @@ -2093,7 +2106,7 @@ struct RocksDBKeyValueStore : IKeyValueStore {
return openFuture;
}
auto a = std::make_unique<Writer::OpenAction>(
path, metrics, &readSemaphore, &fetchSemaphore, errorListener, eventListener, lastFlushTime, counters);
path, metrics, &readSemaphore, &fetchSemaphore, errorListener, eventListener, counters);
openFuture = a->done.getFuture();
writeThread->post(a.release());
return openFuture;
Expand Down Expand Up @@ -2427,7 +2440,6 @@ struct RocksDBKeyValueStore : IKeyValueStore {
Reference<IThreadPool> readThreads;
std::shared_ptr<RocksDBErrorListener> errorListener;
std::shared_ptr<RocksDBEventListener> eventListener;
std::shared_ptr<double> lastFlushTime;
Future<Void> errorFuture;
Promise<Void> closePromise;
Future<Void> openFuture;
Expand Down