From 5730f5ee7e35fea920075af145cf14e05c6a3512 Mon Sep 17 00:00:00 2001 From: Little-Wallace Date: Mon, 19 Oct 2020 13:54:49 +0800 Subject: [PATCH 01/11] remove mutex for write Signed-off-by: Little-Wallace --- db/db_impl/db_impl.cc | 54 +++++++------ db/db_impl/db_impl.h | 17 ++-- db/db_impl/db_impl_compaction_flush.cc | 6 +- db/db_impl/db_impl_files.cc | 13 +-- db/db_impl/db_impl_write.cc | 105 ++++++++++++------------- db/error_handler.cc | 4 + db/error_handler.h | 21 ++--- db/perf_context_test.cc | 2 +- 8 files changed, 112 insertions(+), 110 deletions(-) diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index e48b29e3750..0f1b3483f2b 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -168,7 +168,7 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname, log_dir_synced_(false), log_empty_(true), persist_stats_cf_handle_(nullptr), - log_sync_cv_(&mutex_), + log_sync_cv_(&log_write_mutex_), total_log_size_(0), is_snapshot_supported_(true), write_buffer_manager_(immutable_db_options_.write_buffer_manager.get()), @@ -257,6 +257,8 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname, // we won't drop any deletion markers until SetPreserveDeletesSequenceNumber() // is called by client and this seqnum is advanced. preserve_deletes_seqnum_.store(0); + max_total_wal_size_.store(mutable_db_options_.max_total_wal_size, + std::memory_order_relaxed); } Status DBImpl::Resume() { @@ -525,25 +527,28 @@ Status DBImpl::CloseHelper() { mutex_.Lock(); } - for (auto l : logs_to_free_) { - delete l; - } - for (auto& log : logs_) { - uint64_t log_number = log.writer->get_log_number(); - Status s = log.ClearWriter(); - if (!s.ok()) { - ROCKS_LOG_WARN( - immutable_db_options_.info_log, - "Unable to Sync WAL file %s with error -- %s", - LogFileName(immutable_db_options_.wal_dir, log_number).c_str(), - s.ToString().c_str()); - // Retain the first error - if (ret.ok()) { - ret = s; + { + InstrumentedMutexLock lock(&log_write_mutex_); + for (auto l : logs_to_free_) { + delete l; + } + for (auto& log : logs_) { + uint64_t log_number = log.writer->get_log_number(); + Status s = log.ClearWriter(); + if (!s.ok()) { + ROCKS_LOG_WARN( + immutable_db_options_.info_log, + "Unable to Sync WAL file %s with error -- %s", + LogFileName(immutable_db_options_.wal_dir, log_number).c_str(), + s.ToString().c_str()); + // Retain the first error + if (ret.ok()) { + ret = s; + } } } + logs_.clear(); } - logs_.clear(); // Table cache may have table handles holding blocks from the block cache. // We need to release them before the block cache is destroyed. The block @@ -1051,6 +1056,11 @@ Status DBImpl::SetDBOptions( thread_persist_stats_.reset(); } } + if (new_options.max_total_wal_size != + mutable_db_options_.max_total_wal_size) { + max_total_wal_size_.store(new_options.max_total_wal_size, + std::memory_order_release); + } write_controller_.set_max_delayed_write_rate( new_options.delayed_write_rate); table_cache_.get()->SetCapacity(new_options.max_open_files == -1 @@ -1166,7 +1176,7 @@ Status DBImpl::SyncWAL() { uint64_t current_log_number; { - InstrumentedMutexLock l(&mutex_); + InstrumentedMutexLock l(&log_write_mutex_); assert(!logs_.empty()); // This SyncWAL() call only cares about logs up to this number. @@ -1214,7 +1224,7 @@ Status DBImpl::SyncWAL() { TEST_SYNC_POINT("DBImpl::SyncWAL:BeforeMarkLogsSynced:1"); { - InstrumentedMutexLock l(&mutex_); + InstrumentedMutexLock l(&log_write_mutex_); MarkLogsSynced(current_log_number, need_log_dir_sync, status); } TEST_SYNC_POINT("DBImpl::SyncWAL:BeforeMarkLogsSynced:2"); @@ -1243,7 +1253,7 @@ Status DBImpl::UnlockWAL() { void DBImpl::MarkLogsSynced(uint64_t up_to, bool synced_dir, const Status& status) { - mutex_.AssertHeld(); + log_write_mutex_.AssertHeld(); if (synced_dir && logfile_number_ == up_to && status.ok()) { log_dir_synced_ = true; } @@ -1252,8 +1262,6 @@ void DBImpl::MarkLogsSynced(uint64_t up_to, bool synced_dir, assert(log.getting_synced); if (status.ok() && logs_.size() > 1) { logs_to_free_.push_back(log.ReleaseWriter()); - // To modify logs_ both mutex_ and log_write_mutex_ must be held - InstrumentedMutexLock l(&log_write_mutex_); it = logs_.erase(it); } else { log.getting_synced = false; @@ -2102,7 +2110,6 @@ Status DBImpl::CreateColumnFamilyImpl(const ColumnFamilyOptions& cf_options, s = cfd->AddDirectories(); } if (s.ok()) { - single_column_family_mode_ = false; auto* cfd = versions_->GetColumnFamilySet()->GetColumnFamily(column_family_name); assert(cfd != nullptr); @@ -2119,6 +2126,7 @@ Status DBImpl::CreateColumnFamilyImpl(const ColumnFamilyOptions& cf_options, ROCKS_LOG_INFO(immutable_db_options_.info_log, "Created column family [%s] (ID %u)", column_family_name.c_str(), (unsigned)cfd->GetID()); + single_column_family_mode_.store(false, std::memory_order_release); } else { ROCKS_LOG_ERROR(immutable_db_options_.info_log, "Creating column family [%s] FAILED -- %s", diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index 67ba9d1e440..c70d49f079b 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -930,10 +930,10 @@ class DBImpl : public DB { // only used for dynamically adjusting max_total_wal_size. it is a sum of // [write_buffer_size * max_write_buffer_number] over all column families - uint64_t max_total_in_memory_state_; + std::atomic max_total_in_memory_state_; // If true, we have only one (default) column family. We use this to optimize // some code-paths - bool single_column_family_mode_; + std::atomic single_column_family_mode_; // The options to access storage files const EnvOptions env_options_; @@ -1125,7 +1125,13 @@ class DBImpl : public DB { } } }; - + struct LogContext { + explicit LogContext(bool need_sync = false) + : need_log_sync(need_sync), need_log_dir_sync(need_sync) {} + bool need_log_sync; + bool need_log_dir_sync; + log::Writer* writer; + }; struct LogFileNumberSize { explicit LogFileNumberSize(uint64_t _number) : number(_number) {} void AddSize(uint64_t new_size) { size += new_size; } @@ -1395,8 +1401,8 @@ class DBImpl : public DB { Status HandleWriteBufferFull(WriteContext* write_context); // REQUIRES: mutex locked - Status PreprocessWrite(const WriteOptions& write_options, bool* need_log_sync, - WriteContext* write_context); + Status PreprocessWrite(const WriteOptions& write_options, + LogContext* log_context, WriteContext* write_context); WriteBatch* MergeBatch(const WriteThread::WriteGroup& write_group, WriteBatch* tmp_batch, size_t* write_with_wal, @@ -1928,6 +1934,7 @@ class DBImpl : public DB { InstrumentedCondVar atomic_flush_install_cv_; bool wal_in_db_path_; + std::atomic max_total_wal_size_; }; extern Options SanitizeOptions(const std::string& db, const Options& src); diff --git a/db/db_impl/db_impl_compaction_flush.cc b/db/db_impl/db_impl_compaction_flush.cc index 80fcb79396a..ab7df9f3dcb 100644 --- a/db/db_impl/db_impl_compaction_flush.cc +++ b/db/db_impl/db_impl_compaction_flush.cc @@ -80,7 +80,7 @@ bool DBImpl::RequestCompactionToken(ColumnFamilyData* cfd, bool force, Status DBImpl::SyncClosedLogs(JobContext* job_context) { TEST_SYNC_POINT("DBImpl::SyncClosedLogs:Start"); - mutex_.AssertHeld(); + InstrumentedMutexLock l(&log_write_mutex_); autovector logs_to_sync; uint64_t current_log_number = logfile_number_; while (logs_.front().number < current_log_number && @@ -97,7 +97,7 @@ Status DBImpl::SyncClosedLogs(JobContext* job_context) { Status s; if (!logs_to_sync.empty()) { - mutex_.Unlock(); + log_write_mutex_.Unlock(); for (log::Writer* log : logs_to_sync) { ROCKS_LOG_INFO(immutable_db_options_.info_log, @@ -119,7 +119,7 @@ Status DBImpl::SyncClosedLogs(JobContext* job_context) { s = directories_.GetWalDir()->Fsync(); } - mutex_.Lock(); + log_write_mutex_.Lock(); // "number <= current_log_number - 1" is equivalent to // "number < current_log_number". diff --git a/db/db_impl/db_impl_files.cc b/db/db_impl/db_impl_files.cc index dc6ffa437ed..cd3ca294207 100644 --- a/db/db_impl/db_impl_files.cc +++ b/db/db_impl/db_impl_files.cc @@ -179,6 +179,7 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force, // logs_ is empty when called during recovery, in which case there can't yet // be any tracked obsolete logs + InstrumentedMutexLock l(&log_write_mutex_); if (!alive_log_files_.empty() && !logs_.empty()) { uint64_t min_log_number = job_context->log_number; size_t num_alive_log_files = alive_log_files_.size(); @@ -200,13 +201,8 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force, } job_context->size_log_to_delete += earliest.size; total_log_size_ -= earliest.size; - if (two_write_queues_) { - log_write_mutex_.Lock(); - } alive_log_files_.pop_front(); - if (two_write_queues_) { - log_write_mutex_.Unlock(); - } + // Current log should always stay alive since it can't have // number < MinLogNumber(). assert(alive_log_files_.size()); @@ -219,10 +215,7 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force, continue; } logs_to_free_.push_back(log.ReleaseWriter()); - { - InstrumentedMutexLock wl(&log_write_mutex_); - logs_.pop_front(); - } + logs_.pop_front(); } // Current log cannot be obsolete. assert(!logs_.empty()); diff --git a/db/db_impl/db_impl_write.cc b/db/db_impl/db_impl_write.cc index 0c0210aca90..a703820d80c 100644 --- a/db/db_impl/db_impl_write.cc +++ b/db/db_impl/db_impl_write.cc @@ -92,15 +92,11 @@ Status DBImpl::MultiBatchWriteImpl(const WriteOptions& write_options, write_thread_.WaitForMemTableWriters(); } WriteThread::WriteGroup wal_write_group; - mutex_.Lock(); - bool need_log_sync = !write_options.disableWAL && write_options.sync; - bool need_log_dir_sync = need_log_sync && !log_dir_synced_; + LogContext log_context; PERF_TIMER_STOP(write_pre_and_post_process_time); writer.status = - PreprocessWrite(write_options, &need_log_sync, &write_context); + PreprocessWrite(write_options, &log_context, &write_context); PERF_TIMER_START(write_pre_and_post_process_time); - log::Writer* log_writer = logs_.back().writer; - mutex_.Unlock(); // This can set non-OK status if callback fail. last_batch_group_size_ = @@ -148,18 +144,19 @@ Status DBImpl::MultiBatchWriteImpl(const WriteOptions& write_options, RecordTick(stats_, WRITE_DONE_BY_OTHER, wal_write_group.size - 1); } writer.status = - WriteToWAL(wal_write_group, log_writer, log_used, need_log_sync, - need_log_dir_sync, current_sequence); + WriteToWAL(wal_write_group, log_context.writer, log_used, + log_context.need_log_sync, log_context.need_log_dir_sync, + current_sequence); } } if (!writer.CallbackFailed()) { WriteStatusCheck(writer.status); } - if (need_log_sync) { - mutex_.Lock(); - MarkLogsSynced(logfile_number_, need_log_dir_sync, writer.status); - mutex_.Unlock(); + if (log_context.need_log_sync) { + InstrumentedMutexLock l(&log_write_mutex_); + MarkLogsSynced(logfile_number_, log_context.need_log_dir_sync, + writer.status); } write_thread_.ExitAsBatchGroupLeader(wal_write_group, writer.status); } @@ -391,14 +388,12 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, // when it finds suitable, and finish them in the same write batch. // This is how a write job could be done by the other writer. WriteContext write_context; + LogContext log_context(write_options.sync); WriteThread::WriteGroup write_group; bool in_parallel_group = false; uint64_t last_sequence = kMaxSequenceNumber; - mutex_.Lock(); - - bool need_log_sync = write_options.sync; - bool need_log_dir_sync = need_log_sync && !log_dir_synced_; + // The writer will only be used when two_write_queues_ is false. if (!two_write_queues_ || !disable_memtable) { // With concurrent writes we do preprocess only in the write thread that // also does write to memtable to avoid sync issue on shared data structure @@ -407,7 +402,7 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, // PreprocessWrite does its own perf timing. PERF_TIMER_STOP(write_pre_and_post_process_time); - status = PreprocessWrite(write_options, &need_log_sync, &write_context); + status = PreprocessWrite(write_options, &log_context, &write_context); if (!two_write_queues_) { // Assign it after ::PreprocessWrite since the sequence might advance // inside it by WriteRecoverableState @@ -416,9 +411,6 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, PERF_TIMER_START(write_pre_and_post_process_time); } - log::Writer* log_writer = logs_.back().writer; - - mutex_.Unlock(); // Add to log and apply to memtable. We can release the lock // during this phase since &w is currently responsible for logging @@ -501,8 +493,9 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, if (!two_write_queues_) { if (status.ok() && !write_options.disableWAL) { PERF_TIMER_GUARD(write_wal_time); - status = WriteToWAL(write_group, log_writer, log_used, need_log_sync, - need_log_dir_sync, last_sequence + 1); + status = WriteToWAL(write_group, log_context.writer, log_used, + log_context.need_log_sync, + log_context.need_log_dir_sync, last_sequence + 1); } } else { if (status.ok() && !write_options.disableWAL) { @@ -590,10 +583,10 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, WriteStatusCheck(status); } - if (need_log_sync) { - mutex_.Lock(); - MarkLogsSynced(logfile_number_, need_log_dir_sync, status); - mutex_.Unlock(); + if (log_context.need_log_sync) { + log_write_mutex_.Lock(); + MarkLogsSynced(logfile_number_, log_context.need_log_dir_sync, status); + log_write_mutex_.Unlock(); // Requesting sync with two_write_queues_ is expected to be very rare. We // hence provide a simple implementation that is not necessarily efficient. if (two_write_queues_) { @@ -644,15 +637,11 @@ Status DBImpl::PipelinedWriteImpl(const WriteOptions& write_options, if (w.callback && !w.callback->AllowWriteBatching()) { write_thread_.WaitForMemTableWriters(); } - mutex_.Lock(); - bool need_log_sync = !write_options.disableWAL && write_options.sync; - bool need_log_dir_sync = need_log_sync && !log_dir_synced_; + LogContext log_context(!write_options.disableWAL && write_options.sync); // PreprocessWrite does its own perf timing. PERF_TIMER_STOP(write_pre_and_post_process_time); - w.status = PreprocessWrite(write_options, &need_log_sync, &write_context); + w.status = PreprocessWrite(write_options, &log_context, &write_context); PERF_TIMER_START(write_pre_and_post_process_time); - log::Writer* log_writer = logs_.back().writer; - mutex_.Unlock(); // This can set non-OK status if callback fail. last_batch_group_size_ = @@ -700,18 +689,18 @@ Status DBImpl::PipelinedWriteImpl(const WriteOptions& write_options, wal_write_group.size - 1); RecordTick(stats_, WRITE_DONE_BY_OTHER, wal_write_group.size - 1); } - w.status = WriteToWAL(wal_write_group, log_writer, log_used, - need_log_sync, need_log_dir_sync, current_sequence); + w.status = WriteToWAL(wal_write_group, log_context.writer, log_used, + log_context.need_log_sync, + log_context.need_log_dir_sync, current_sequence); } if (!w.CallbackFailed()) { WriteStatusCheck(w.status); } - if (need_log_sync) { - mutex_.Lock(); - MarkLogsSynced(logfile_number_, need_log_dir_sync, w.status); - mutex_.Unlock(); + if (log_context.need_log_sync) { + InstrumentedMutexLock l(&log_write_mutex_); + MarkLogsSynced(logfile_number_, log_context.need_log_dir_sync, w.status); } write_thread_.ExitAsBatchGroupLeader(wal_write_group, w.status); @@ -845,9 +834,8 @@ Status DBImpl::WriteImplWALOnly( // TODO(myabandeh): Make preliminary checks thread-safe so we could do them // without paying the cost of obtaining the mutex. if (status.ok()) { - InstrumentedMutexLock l(&mutex_); - bool need_log_sync = false; - status = PreprocessWrite(write_options, &need_log_sync, &write_context); + LogContext log_context; + status = PreprocessWrite(write_options, &log_context, &write_context); WriteStatusCheck(status); } if (!status.ok()) { @@ -1010,22 +998,22 @@ void DBImpl::MemTableInsertStatusCheck(const Status& status) { } Status DBImpl::PreprocessWrite(const WriteOptions& write_options, - bool* need_log_sync, + LogContext* log_context, WriteContext* write_context) { - mutex_.AssertHeld(); - assert(write_context != nullptr && need_log_sync != nullptr); + assert(write_context != nullptr && log_context != nullptr); Status status; if (error_handler_.IsDBStopped()) { + InstrumentedMutexLock l(&mutex_); status = error_handler_.GetBGError(); } PERF_TIMER_GUARD(write_scheduling_flushes_compactions_time); - assert(!single_column_family_mode_ || - versions_->GetColumnFamilySet()->NumberOfColumnFamilies() == 1); - if (UNLIKELY(status.ok() && !single_column_family_mode_ && + if (UNLIKELY(status.ok() && + !single_column_family_mode_.load(std::memory_order_acquire) && total_log_size_ > GetMaxTotalWalSize())) { + InstrumentedMutexLock l(&mutex_); WaitForPendingWrites(); status = SwitchWAL(write_context); } @@ -1036,11 +1024,13 @@ Status DBImpl::PreprocessWrite(const WriteOptions& write_options, // thread is writing to another DB with the same write buffer, they may also // be flushed. We may end up with flushing much more DBs than needed. It's // suboptimal but still correct. + InstrumentedMutexLock l(&mutex_); WaitForPendingWrites(); status = HandleWriteBufferFull(write_context); } if (UNLIKELY(status.ok() && !flush_scheduler_.Empty())) { + InstrumentedMutexLock l(&mutex_); WaitForPendingWrites(); status = ScheduleFlushes(write_context); } @@ -1056,11 +1046,13 @@ Status DBImpl::PreprocessWrite(const WriteOptions& write_options, // for previous one. It might create a fairness issue that expiration // might happen for smaller writes but larger writes can go through. // Can optimize it if it is an issue. + InstrumentedMutexLock l(&mutex_); status = DelayWrite(last_batch_group_size_, write_options); PERF_TIMER_START(write_pre_and_post_process_time); } - if (status.ok() && *need_log_sync) { + InstrumentedMutexLock l(&log_write_mutex_); + if (status.ok() && log_context->need_log_sync) { // Wait until the parallel syncs are finished. Any sync process has to sync // the front log too so it is enough to check the status of front() // We do a while loop since log_sync_cv_ is signalled when any sync is @@ -1081,8 +1073,11 @@ Status DBImpl::PreprocessWrite(const WriteOptions& write_options, log.getting_synced = true; } } else { - *need_log_sync = false; + log_context->need_log_sync = false; } + log_context->writer = logs_.back().writer; + log_context->need_log_dir_sync = + log_context->need_log_dir_sync && !log_dir_synced_; return status; } @@ -1533,10 +1528,11 @@ Status DBImpl::HandleWriteBufferFull(WriteContext* write_context) { } uint64_t DBImpl::GetMaxTotalWalSize() const { - mutex_.AssertHeld(); - return mutable_db_options_.max_total_wal_size == 0 - ? 4 * max_total_in_memory_state_ - : mutable_db_options_.max_total_wal_size; + auto max_total_wal_size = max_total_wal_size_.load(std::memory_order_acquire); + if (max_total_wal_size > 0) { + return max_total_wal_size; + } + return 4 * max_total_in_memory_state_; } // REQUIRES: mutex_ is held @@ -1817,7 +1813,7 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) { cfd->GetName().c_str(), new_log_number, num_imm_unflushed); mutex_.Lock(); if (s.ok() && creating_new_log) { - log_write_mutex_.Lock(); + InstrumentedMutexLock l(&log_write_mutex_); assert(new_log != nullptr); if (!logs_.empty()) { // Alway flush the buffer of the last log before switching to a new one @@ -1838,7 +1834,6 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) { logs_.emplace_back(logfile_number_, new_log); alive_log_files_.push_back(LogFileNumberSize(logfile_number_)); } - log_write_mutex_.Unlock(); } if (!s.ok()) { diff --git a/db/error_handler.cc b/db/error_handler.cc index 317f0072372..8a275b6a155 100644 --- a/db/error_handler.cc +++ b/db/error_handler.cc @@ -220,6 +220,9 @@ Status ErrorHandler::SetBGError(const Status& bg_err, BackgroundErrorReason reas db_mutex_, &auto_recovery); if (!s.ok() && (s.severity() > bg_error_.severity())) { bg_error_ = s; + if (bg_error_.severity() >= Status::Severity::kHardError) { + stop_state_.store(true, std::memory_order_release); + } } else { // This error is less severe than previously encountered error. Don't // take any further action @@ -295,6 +298,7 @@ Status ErrorHandler::ClearBGError() { if (recovery_error_.ok()) { Status old_bg_error = bg_error_; bg_error_ = Status::OK(); + stop_state_.store(false, std::memory_order_release); recovery_in_prog_ = false; EventHelpers::NotifyOnErrorRecoveryCompleted(db_options_.listeners, old_bg_error, db_mutex_); diff --git a/db/error_handler.h b/db/error_handler.h index c2af809fc69..95f72659f4d 100644 --- a/db/error_handler.h +++ b/db/error_handler.h @@ -23,15 +23,12 @@ class ErrorHandler { recovery_error_(Status::OK()), db_mutex_(db_mutex), auto_recovery_(false), - recovery_in_prog_(false) {} + recovery_in_prog_(false), + stop_state_(false) {} ~ErrorHandler() {} void EnableAutoRecovery() { auto_recovery_ = true; } - Status::Severity GetErrorSeverity(BackgroundErrorReason reason, - Status::Code code, - Status::SubCode subcode); - Status SetBGError(const Status& bg_err, BackgroundErrorReason reason); Status GetBGError() { return bg_error_; } @@ -40,15 +37,12 @@ class ErrorHandler { Status ClearBGError(); - bool IsDBStopped() { - return !bg_error_.ok() && - bg_error_.severity() >= Status::Severity::kHardError; - } + bool IsDBStopped() { return stop_state_.load(std::memory_order_acquire); } - bool IsBGWorkStopped() { - return !bg_error_.ok() && - (bg_error_.severity() >= Status::Severity::kHardError || - !auto_recovery_); + bool IsBGWorkStopped() { + return !bg_error_.ok() && + (bg_error_.severity() >= Status::Severity::kHardError || + !auto_recovery_); } bool IsRecoveryInProgress() { return recovery_in_prog_; } @@ -67,6 +61,7 @@ class ErrorHandler { // A flag indicating whether automatic recovery from errors is enabled bool auto_recovery_; bool recovery_in_prog_; + std::atomic stop_state_; Status OverrideNoSpaceError(Status bg_error, bool* auto_recovery); void RecoverFromNoSpace(); diff --git a/db/perf_context_test.cc b/db/perf_context_test.cc index 94eabff7ff5..be24b166de3 100644 --- a/db/perf_context_test.cc +++ b/db/perf_context_test.cc @@ -383,7 +383,7 @@ void ProfileQueries(bool enabled_time = false) { EXPECT_GT(hist_write_scheduling_time.Average(), 0); #ifndef NDEBUG - ASSERT_GT(total_db_mutex_nanos, 2000U); + ASSERT_LT(total_db_mutex_nanos, 100U); #endif } From b84bd7739eb8786540e8e46f3e54d91aef4dd4e8 Mon Sep 17 00:00:00 2001 From: Little-Wallace Date: Mon, 24 Jan 2022 13:54:34 +0800 Subject: [PATCH 02/11] test: fix some perf context position Signed-off-by: Little-Wallace --- db/c.cc | 71 +++++++++++++------------- db/column_family.cc | 21 +++++--- db/db_impl/db_impl.cc | 12 +++-- db/db_impl/db_impl_compaction_flush.cc | 3 +- db/db_impl/db_impl_open.cc | 3 +- db/db_impl/db_impl_write.cc | 24 +++------ db/db_options_test.cc | 13 +++-- db/version_builder_test.cc | 1 + db/version_edit.h | 1 + db/version_set.h | 8 +-- include/rocksdb/utilities/ldb_cmd.h | 1 + monitoring/perf_flag.cc | 4 +- monitoring/perf_flag_imp.h | 3 +- options/options_helper.cc | 3 +- options/options_parser.cc | 6 ++- tools/ldb_cmd.cc | 3 +- tools/ldb_cmd_impl.h | 2 +- tools/ldb_cmd_test.cc | 1 + 18 files changed, 95 insertions(+), 85 deletions(-) diff --git a/db/c.cc b/db/c.cc index 51da66b8ba1..30d624a6a45 100644 --- a/db/c.cc +++ b/db/c.cc @@ -12,6 +12,11 @@ #include "rocksdb/c.h" #include + +#include +#include +#include + #include "port/port.h" #include "rocksdb/cache.h" #include "rocksdb/compaction_filter.h" @@ -44,43 +49,60 @@ #include "utilities/merge_operators.h" #include "utilities/rate_limiters/write_amp_based_rate_limiter.h" -#include -#include -#include - +using rocksdb::BackupableDBOptions; +using rocksdb::BackupEngine; +using rocksdb::BackupID; +using rocksdb::BackupInfo; +using rocksdb::BatchResult; +using rocksdb::BlockBasedTableOptions; +using rocksdb::BottommostLevelCompaction; using rocksdb::BytewiseComparator; using rocksdb::Cache; +using rocksdb::CheckPerfFlag; +using rocksdb::Checkpoint; using rocksdb::ColumnFamilyDescriptor; using rocksdb::ColumnFamilyHandle; using rocksdb::ColumnFamilyOptions; using rocksdb::CompactionFilter; using rocksdb::CompactionFilterFactory; using rocksdb::CompactionOptionsFIFO; +using rocksdb::CompactRangeOptions; using rocksdb::Comparator; using rocksdb::CompressionType; -using rocksdb::WALRecoveryMode; +using rocksdb::CuckooTableOptions; using rocksdb::DB; using rocksdb::DBOptions; using rocksdb::DbPath; +using rocksdb::DisablePerfFlag; +using rocksdb::EnablePerfFlag; using rocksdb::Env; using rocksdb::EnvOptions; -using rocksdb::InfoLogLevel; using rocksdb::FileLock; using rocksdb::FilterPolicy; using rocksdb::FlushOptions; +using rocksdb::InfoLogLevel; using rocksdb::IngestExternalFileOptions; using rocksdb::Iterator; +using rocksdb::LiveFileMetaData; using rocksdb::Logger; +using rocksdb::MemoryUtil; using rocksdb::MergeOperator; using rocksdb::MergeOperators; using rocksdb::NewBloomFilterPolicy; +using rocksdb::NewGenericRateLimiter; using rocksdb::NewLRUCache; +using rocksdb::NewWriteAmpBasedRateLimiter; +using rocksdb::OptimisticTransactionDB; +using rocksdb::OptimisticTransactionOptions; using rocksdb::Options; -using rocksdb::BlockBasedTableOptions; -using rocksdb::CuckooTableOptions; +using rocksdb::PerfContext; +using rocksdb::PerfLevel; +using rocksdb::PinnableSlice; using rocksdb::RandomAccessFile; using rocksdb::Range; +using rocksdb::RateLimiter; using rocksdb::ReadOptions; +using rocksdb::RestoreOptions; using rocksdb::SequentialFile; using rocksdb::Slice; using rocksdb::SliceParts; @@ -88,37 +110,16 @@ using rocksdb::SliceTransform; using rocksdb::Snapshot; using rocksdb::SstFileWriter; using rocksdb::Status; +using rocksdb::Transaction; +using rocksdb::TransactionDB; +using rocksdb::TransactionDBOptions; +using rocksdb::TransactionLogIterator; +using rocksdb::TransactionOptions; +using rocksdb::WALRecoveryMode; using rocksdb::WritableFile; using rocksdb::WriteBatch; using rocksdb::WriteBatchWithIndex; using rocksdb::WriteOptions; -using rocksdb::LiveFileMetaData; -using rocksdb::BackupEngine; -using rocksdb::BackupableDBOptions; -using rocksdb::BackupInfo; -using rocksdb::BackupID; -using rocksdb::RestoreOptions; -using rocksdb::CompactRangeOptions; -using rocksdb::BottommostLevelCompaction; -using rocksdb::RateLimiter; -using rocksdb::NewGenericRateLimiter; -using rocksdb::NewWriteAmpBasedRateLimiter; -using rocksdb::PinnableSlice; -using rocksdb::TransactionDBOptions; -using rocksdb::TransactionDB; -using rocksdb::TransactionOptions; -using rocksdb::OptimisticTransactionDB; -using rocksdb::OptimisticTransactionOptions; -using rocksdb::Transaction; -using rocksdb::Checkpoint; -using rocksdb::TransactionLogIterator; -using rocksdb::BatchResult; -using rocksdb::PerfLevel; -using rocksdb::EnablePerfFlag; -using rocksdb::DisablePerfFlag; -using rocksdb::CheckPerfFlag; -using rocksdb::PerfContext; -using rocksdb::MemoryUtil; using std::shared_ptr; using std::vector; diff --git a/db/column_family.cc b/db/column_family.cc index 2a5493e43cc..0da7524a841 100644 --- a/db/column_family.cc +++ b/db/column_family.cc @@ -738,7 +738,8 @@ WriteStallCondition ColumnFamilyData::RecalculateWriteStallConditions( bool needed_delay = write_controller->NeedsDelay(); if (write_stall_condition == WriteStallCondition::kStopped && - write_stall_cause == WriteStallCause::kMemtableLimit && !mutable_cf_options.disable_write_stall) { + write_stall_cause == WriteStallCause::kMemtableLimit && + !mutable_cf_options.disable_write_stall) { write_controller_token_ = write_controller->GetStopToken(); internal_stats_->AddCFStats(InternalStats::MEMTABLE_LIMIT_STOPS, 1); ROCKS_LOG_WARN( @@ -748,7 +749,8 @@ WriteStallCondition ColumnFamilyData::RecalculateWriteStallConditions( name_.c_str(), imm()->NumNotFlushed(), mutable_cf_options.max_write_buffer_number); } else if (write_stall_condition == WriteStallCondition::kStopped && - write_stall_cause == WriteStallCause::kL0FileCountLimit && !mutable_cf_options.disable_write_stall) { + write_stall_cause == WriteStallCause::kL0FileCountLimit && + !mutable_cf_options.disable_write_stall) { write_controller_token_ = write_controller->GetStopToken(); internal_stats_->AddCFStats(InternalStats::L0_FILE_COUNT_LIMIT_STOPS, 1); if (compaction_picker_->IsLevel0CompactionInProgress()) { @@ -759,7 +761,8 @@ WriteStallCondition ColumnFamilyData::RecalculateWriteStallConditions( "[%s] Stopping writes because we have %d level-0 files", name_.c_str(), vstorage->l0_delay_trigger_count()); } else if (write_stall_condition == WriteStallCondition::kStopped && - write_stall_cause == WriteStallCause::kPendingCompactionBytes && !mutable_cf_options.disable_write_stall) { + write_stall_cause == WriteStallCause::kPendingCompactionBytes && + !mutable_cf_options.disable_write_stall) { write_controller_token_ = write_controller->GetStopToken(); internal_stats_->AddCFStats( InternalStats::PENDING_COMPACTION_BYTES_LIMIT_STOPS, 1); @@ -769,7 +772,8 @@ WriteStallCondition ColumnFamilyData::RecalculateWriteStallConditions( "bytes %" PRIu64, name_.c_str(), compaction_needed_bytes); } else if (write_stall_condition == WriteStallCondition::kDelayed && - write_stall_cause == WriteStallCause::kMemtableLimit && !mutable_cf_options.disable_write_stall) { + write_stall_cause == WriteStallCause::kMemtableLimit && + !mutable_cf_options.disable_write_stall) { write_controller_token_ = SetupDelay(write_controller, compaction_needed_bytes, prev_compaction_needed_bytes_, was_stopped, @@ -784,7 +788,8 @@ WriteStallCondition ColumnFamilyData::RecalculateWriteStallConditions( mutable_cf_options.max_write_buffer_number, write_controller->delayed_write_rate()); } else if (write_stall_condition == WriteStallCondition::kDelayed && - write_stall_cause == WriteStallCause::kL0FileCountLimit && !mutable_cf_options.disable_write_stall) { + write_stall_cause == WriteStallCause::kL0FileCountLimit && + !mutable_cf_options.disable_write_stall) { // L0 is the last two files from stopping. bool near_stop = vstorage->l0_delay_trigger_count() >= mutable_cf_options.level0_stop_writes_trigger - 2; @@ -804,7 +809,8 @@ WriteStallCondition ColumnFamilyData::RecalculateWriteStallConditions( name_.c_str(), vstorage->l0_delay_trigger_count(), write_controller->delayed_write_rate()); } else if (write_stall_condition == WriteStallCondition::kDelayed && - write_stall_cause == WriteStallCause::kPendingCompactionBytes && !mutable_cf_options.disable_write_stall) { + write_stall_cause == WriteStallCause::kPendingCompactionBytes && + !mutable_cf_options.disable_write_stall) { // If the distance to hard limit is less than 1/4 of the gap between soft // and // hard bytes limit, we think it is near stop and speed up the slowdown. @@ -829,7 +835,8 @@ WriteStallCondition ColumnFamilyData::RecalculateWriteStallConditions( name_.c_str(), vstorage->estimated_compaction_needed_bytes(), write_controller->delayed_write_rate()); } else { - assert(write_stall_condition == WriteStallCondition::kNormal || mutable_cf_options.disable_write_stall); + assert(write_stall_condition == WriteStallCondition::kNormal || + mutable_cf_options.disable_write_stall); if (vstorage->l0_delay_trigger_count() >= GetL0ThresholdSpeedupCompaction( mutable_cf_options.level0_file_num_compaction_trigger, diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index 9cbaba2966c..f5c569900df 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -163,6 +163,7 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname, seq_per_batch_(seq_per_batch), batch_per_txn_(batch_per_txn), db_lock_(nullptr), + log_write_mutex_(stats_, env_, DB_MUTEX_WAIT_MICROS, false), shutting_down_(false), bg_cv_(&mutex_), logfile_number_(0), @@ -1019,11 +1020,12 @@ Status DBImpl::SetDBOptions( mutable_db_options_.max_background_jobs, mutable_db_options_.base_background_compactions, /* parallelize_compactions */ true); - const BGJobLimits new_bg_job_limits = GetBGJobLimits( - new_options.max_background_flushes, - new_options.max_background_compactions, - new_options.max_background_jobs, - new_options.base_background_compactions, /* parallelize_compactions */ true); + const BGJobLimits new_bg_job_limits = + GetBGJobLimits(new_options.max_background_flushes, + new_options.max_background_compactions, + new_options.max_background_jobs, + new_options.base_background_compactions, + /* parallelize_compactions */ true); const bool max_flushes_increased = new_bg_job_limits.max_flushes > current_bg_job_limits.max_flushes; diff --git a/db/db_impl/db_impl_compaction_flush.cc b/db/db_impl/db_impl_compaction_flush.cc index dfe681a97ab..b9dcb397de7 100644 --- a/db/db_impl/db_impl_compaction_flush.cc +++ b/db/db_impl/db_impl_compaction_flush.cc @@ -1988,7 +1988,8 @@ DBImpl::BGJobLimits DBImpl::GetBGJobLimits(int max_background_flushes, } if (!parallelize_compactions) { // throttle background compactions until we deem necessary - res.max_compactions = std::max(1, std::min(base_background_compactions, res.max_compactions)); + res.max_compactions = + std::max(1, std::min(base_background_compactions, res.max_compactions)); } return res; } diff --git a/db/db_impl/db_impl_open.cc b/db/db_impl/db_impl_open.cc index b1623f7eee2..630368ff82f 100644 --- a/db/db_impl/db_impl_open.cc +++ b/db/db_impl/db_impl_open.cc @@ -57,7 +57,8 @@ DBOptions SanitizeOptions(const std::string& dbname, const DBOptions& src) { } auto bg_job_limits = DBImpl::GetBGJobLimits( result.max_background_flushes, result.max_background_compactions, - result.max_background_jobs, result.base_background_compactions, true /* parallelize_compactions */); + result.max_background_jobs, result.base_background_compactions, + true /* parallelize_compactions */); result.env->IncBackgroundThreadsIfNeeded(bg_job_limits.max_compactions, Env::Priority::LOW); result.env->IncBackgroundThreadsIfNeeded(bg_job_limits.max_flushes, diff --git a/db/db_impl/db_impl_write.cc b/db/db_impl/db_impl_write.cc index a703820d80c..d3c6c6ca347 100644 --- a/db/db_impl/db_impl_write.cc +++ b/db/db_impl/db_impl_write.cc @@ -88,15 +88,15 @@ Status DBImpl::MultiBatchWriteImpl(const WriteOptions& write_options, WriteContext write_context; bool ignore_missing_faimly = write_options.ignore_missing_column_families; if (writer.state == WriteThread::STATE_GROUP_LEADER) { + PERF_TIMER_STOP(write_pre_and_post_process_time); + PERF_TIMER_GUARD(write_delay_time); if (writer.callback && !writer.callback->AllowWriteBatching()) { write_thread_.WaitForMemTableWriters(); } WriteThread::WriteGroup wal_write_group; LogContext log_context; - PERF_TIMER_STOP(write_pre_and_post_process_time); writer.status = PreprocessWrite(write_options, &log_context, &write_context); - PERF_TIMER_START(write_pre_and_post_process_time); // This can set non-OK status if callback fail. last_batch_group_size_ = @@ -132,7 +132,6 @@ Status DBImpl::MultiBatchWriteImpl(const WriteOptions& write_options, RecordTick(stats_, BYTES_WRITTEN, total_byte_size); RecordInHistogram(stats_, BYTES_PER_WRITE, total_byte_size); - PERF_TIMER_STOP(write_pre_and_post_process_time); if (!write_options.disableWAL) { PERF_TIMER_GUARD(write_wal_time); stats->AddDBStats(InternalStats::kIntStatsWriteDoneBySelf, 1); @@ -163,7 +162,6 @@ Status DBImpl::MultiBatchWriteImpl(const WriteOptions& write_options, bool is_leader_thread = false; WriteThread::WriteGroup memtable_write_group; if (writer.state == WriteThread::STATE_MEMTABLE_WRITER_LEADER) { - PERF_TIMER_GUARD(write_memtable_time); assert(writer.ShouldWriteToMemtable()); write_thread_.EnterAsMemTableWriter(&writer, &memtable_write_group); assert(immutable_db_options_.allow_concurrent_memtable_write); @@ -171,6 +169,7 @@ Status DBImpl::MultiBatchWriteImpl(const WriteOptions& write_options, is_leader_thread = true; write_thread_.LaunchParallelMemTableWriters(&memtable_write_group); } else { + PERF_TIMER_GUARD(write_memtable_time); auto version_set = versions_->GetColumnFamilySet(); memtable_write_group.running.store(0); for (auto it = memtable_write_group.begin(); @@ -194,6 +193,7 @@ Status DBImpl::MultiBatchWriteImpl(const WriteOptions& write_options, } if (writer.state == WriteThread::STATE_PARALLEL_MEMTABLE_WRITER) { assert(writer.ShouldWriteToMemtable()); + PERF_TIMER_GUARD(write_memtable_time); auto version_set = versions_->GetColumnFamilySet(); WriteBatchInternal::AsyncInsertInto( &writer, writer.sequence, version_set, &flush_scheduler_, @@ -640,8 +640,8 @@ Status DBImpl::PipelinedWriteImpl(const WriteOptions& write_options, LogContext log_context(!write_options.disableWAL && write_options.sync); // PreprocessWrite does its own perf timing. PERF_TIMER_STOP(write_pre_and_post_process_time); + PERF_TIMER_GUARD(write_delay_time); w.status = PreprocessWrite(write_options, &log_context, &write_context); - PERF_TIMER_START(write_pre_and_post_process_time); // This can set non-OK status if callback fail. last_batch_group_size_ = @@ -678,8 +678,6 @@ Status DBImpl::PipelinedWriteImpl(const WriteOptions& write_options, RecordTick(stats_, BYTES_WRITTEN, total_byte_size); RecordInHistogram(stats_, BYTES_PER_WRITE, total_byte_size); - PERF_TIMER_STOP(write_pre_and_post_process_time); - if (w.status.ok() && !write_options.disableWAL) { PERF_TIMER_GUARD(write_wal_time); stats->AddDBStats(InternalStats::kIntStatsWriteDoneBySelf, 1); @@ -752,7 +750,7 @@ Status DBImpl::UnorderedWriteMemtable(const WriteOptions& write_options, WriteCallback* callback, uint64_t log_ref, SequenceNumber seq, const size_t sub_batch_cnt) { - PERF_TIMER_GUARD(write_pre_and_post_process_time); + PERF_TIMER_GUARD(write_memtable_time); StopWatch write_sw(env_, immutable_db_options_.statistics.get(), DB_WRITE); WriteThread::Writer w(write_options, my_batch, callback, log_ref, @@ -824,6 +822,8 @@ Status DBImpl::WriteImplWALOnly( // else we are the leader of the write batch group assert(w.state == WriteThread::STATE_GROUP_LEADER); + PERF_TIMER_STOP(write_pre_and_post_process_time); + PERF_TIMER_GUARD(write_delay_time); if (publish_last_seq == kDoPublishLastSeq) { // Currently we only use kDoPublishLastSeq in unordered_write assert(immutable_db_options_.unordered_write); @@ -884,8 +884,6 @@ Status DBImpl::WriteImplWALOnly( } RecordInHistogram(stats_, BYTES_PER_WRITE, total_byte_size); - PERF_TIMER_STOP(write_pre_and_post_process_time); - PERF_TIMER_GUARD(write_wal_time); // LastAllocatedSequence is increased inside WriteToWAL under // wal_write_mutex_ to ensure ordered events in WAL @@ -934,7 +932,6 @@ Status DBImpl::WriteImplWALOnly( status = SyncWAL(); } } - PERF_TIMER_START(write_pre_and_post_process_time); if (!w.CallbackFailed()) { WriteStatusCheck(status); @@ -1036,19 +1033,15 @@ Status DBImpl::PreprocessWrite(const WriteOptions& write_options, } PERF_TIMER_STOP(write_scheduling_flushes_compactions_time); - PERF_TIMER_GUARD(write_pre_and_post_process_time); if (UNLIKELY(status.ok() && (write_controller_.IsStopped() || write_controller_.NeedsDelay()))) { - PERF_TIMER_STOP(write_pre_and_post_process_time); - PERF_TIMER_GUARD(write_delay_time); // We don't know size of curent batch so that we always use the size // for previous one. It might create a fairness issue that expiration // might happen for smaller writes but larger writes can go through. // Can optimize it if it is an issue. InstrumentedMutexLock l(&mutex_); status = DelayWrite(last_batch_group_size_, write_options); - PERF_TIMER_START(write_pre_and_post_process_time); } InstrumentedMutexLock l(&log_write_mutex_); @@ -1634,7 +1627,6 @@ Status DBImpl::ThrottleLowPriWritesIfNeeded(const WriteOptions& write_options, // is that in case the write is heavy, low pri writes may never have // a chance to run. Now we guarantee we are still slowly making // progress. - PERF_TIMER_GUARD(write_delay_time); write_controller_.low_pri_rate_limiter()->Request( my_batch->GetDataSize(), Env::IO_HIGH, nullptr /* stats */, RateLimiter::OpType::kWrite); diff --git a/db/db_options_test.cc b/db/db_options_test.cc index 1a637f934b9..46cdf7ac73d 100644 --- a/db/db_options_test.cc +++ b/db/db_options_test.cc @@ -406,10 +406,10 @@ TEST_F(DBOptionsTest, EnableAutoCompactionButDisableStall) { dbfull()->TEST_WaitForFlushMemTable(); ASSERT_EQ(2, NumTableFilesAtLevel(0)); uint64_t l0_size = SizeAtLevel(0); - + options.hard_pending_compaction_bytes_limit = l0_size; options.soft_pending_compaction_bytes_limit = l0_size; - + Reopen(options); dbfull()->TEST_WaitForCompact(); ASSERT_FALSE(dbfull()->TEST_write_controler().IsStopped()); @@ -418,15 +418,14 @@ TEST_F(DBOptionsTest, EnableAutoCompactionButDisableStall) { SyncPoint::GetInstance()->LoadDependency( {{"DBOptionsTest::EnableAutoCompactionButDisableStall:1", "BackgroundCallCompaction:0"}, - {"DBImpl::BackgroundCompaction():BeforePickCompaction", + {"DBImpl::BackgroundCompaction():BeforePickCompaction", "DBOptionsTest::EnableAutoCompactionButDisableStall:2"}, - {"DBOptionsTest::EnableAutoCompactionButDisableStall:3", + {"DBOptionsTest::EnableAutoCompactionButDisableStall:3", "DBImpl::BackgroundCompaction():AfterPickCompaction"}}); // Block background compaction. SyncPoint::GetInstance()->EnableProcessing(); - ASSERT_OK( - dbfull()->SetOptions({{"disable_auto_compactions", "false"}})); + ASSERT_OK(dbfull()->SetOptions({{"disable_auto_compactions", "false"}})); TEST_SYNC_POINT("DBOptionsTest::EnableAutoCompactionButDisableStall:1"); // Wait for stall condition recalculate. TEST_SYNC_POINT("DBOptionsTest::EnableAutoCompactionButDisableStall:2"); @@ -434,7 +433,7 @@ TEST_F(DBOptionsTest, EnableAutoCompactionButDisableStall) { ASSERT_FALSE(dbfull()->TEST_write_controler().IsStopped()); ASSERT_FALSE(dbfull()->TEST_write_controler().NeedsDelay()); ASSERT_TRUE(dbfull()->TEST_write_controler().NeedSpeedupCompaction()); - + TEST_SYNC_POINT("DBOptionsTest::EnableAutoCompactionButDisableStall:3"); // Background compaction executed. diff --git a/db/version_builder_test.cc b/db/version_builder_test.cc index 2f049ef2a64..d2df7a48ec1 100644 --- a/db/version_builder_test.cc +++ b/db/version_builder_test.cc @@ -5,6 +5,7 @@ #include #include + #include "db/version_edit.h" #include "db/version_set.h" #include "logging/logging.h" diff --git a/db/version_edit.h b/db/version_edit.h index 13f3e8b9e98..9860a04233e 100644 --- a/db/version_edit.h +++ b/db/version_edit.h @@ -13,6 +13,7 @@ #include #include #include + #include "db/dbformat.h" #include "memory/arena.h" #include "rocksdb/cache.h" diff --git a/db/version_set.h b/db/version_set.h index 583080291a7..f3c3dc510b2 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -927,10 +927,10 @@ class VersionSet { const EnvOptions& env_options, int new_levels); - // If sst_file_number is > 0, only prints manifest info for specified SST file number -Status DumpManifest(Options& options, std::string& dscname, - bool verbose, bool hex, bool json, - uint64_t sst_file_number); + // If sst_file_number is > 0, only prints manifest info for specified SST file + // number + Status DumpManifest(Options& options, std::string& dscname, bool verbose, + bool hex, bool json, uint64_t sst_file_number); #endif // ROCKSDB_LITE diff --git a/include/rocksdb/utilities/ldb_cmd.h b/include/rocksdb/utilities/ldb_cmd.h index c43ef6fb212..76184d6c48c 100644 --- a/include/rocksdb/utilities/ldb_cmd.h +++ b/include/rocksdb/utilities/ldb_cmd.h @@ -9,6 +9,7 @@ #include #include + #include #include #include diff --git a/monitoring/perf_flag.cc b/monitoring/perf_flag.cc index a3bdbda353e..534c36e3261 100644 --- a/monitoring/perf_flag.cc +++ b/monitoring/perf_flag.cc @@ -22,8 +22,8 @@ void DisablePerfFlag(uint64_t flag) { } bool CheckPerfFlag(uint64_t flag) { - return ((uint64_t)GET_FLAG(flag) & - (uint64_t)0b1 << (flag & (uint64_t)0b111)) != 0; + return ((uint64_t)GET_FLAG(flag) & (uint64_t)0b1 + << (flag & (uint64_t)0b111)) != 0; } } // namespace rocksdb diff --git a/monitoring/perf_flag_imp.h b/monitoring/perf_flag_imp.h index 453c5e03db8..ebc0b9430bf 100644 --- a/monitoring/perf_flag_imp.h +++ b/monitoring/perf_flag_imp.h @@ -1,4 +1,5 @@ #include + #include "rocksdb/perf_flag.h" namespace rocksdb { @@ -7,4 +8,4 @@ extern __thread uint8_t perf_flags[FLAGS_LEN]; #else extern uint8_t perf_flags[FLAGS_LEN]; #endif -} +} // namespace rocksdb diff --git a/options/options_helper.cc b/options/options_helper.cc index a1a998c38ad..0bebea9da95 100644 --- a/options/options_helper.cc +++ b/options/options_helper.cc @@ -165,8 +165,7 @@ ColumnFamilyOptions BuildColumnFamilyOptions( // Compaction related options cf_opts.disable_auto_compactions = mutable_cf_options.disable_auto_compactions; - cf_opts.disable_write_stall = - mutable_cf_options.disable_write_stall; + cf_opts.disable_write_stall = mutable_cf_options.disable_write_stall; cf_opts.soft_pending_compaction_bytes_limit = mutable_cf_options.soft_pending_compaction_bytes_limit; cf_opts.hard_pending_compaction_bytes_limit = diff --git a/options/options_parser.cc b/options/options_parser.cc index 4b7f95d1287..13d2eb36cf7 100644 --- a/options/options_parser.cc +++ b/options/options_parser.cc @@ -202,7 +202,8 @@ Status RocksDBOptionsParser::ParseStatement(std::string* name, Status RocksDBOptionsParser::Parse(const std::string& file_name, Env* env, bool ignore_unknown_options) { - ConfigOptions config_options; // Use default for escaped(true) and check (exact) + ConfigOptions + config_options; // Use default for escaped(true) and check (exact) config_options.ignore_unknown_options = ignore_unknown_options; config_options.env = env; return Parse(config_options, file_name); @@ -215,7 +216,8 @@ Status RocksDBOptionsParser::Parse(const ConfigOptions& config_options_in, auto ignore_unknown_options = config_options.ignore_unknown_options; std::unique_ptr seq_file; - Status s = config_options.env->NewSequentialFile(file_name, &seq_file, EnvOptions()); + Status s = + config_options.env->NewSequentialFile(file_name, &seq_file, EnvOptions()); if (!s.ok()) { return s; } diff --git a/tools/ldb_cmd.cc b/tools/ldb_cmd.cc index 1c91b8382d5..396c569a323 100644 --- a/tools/ldb_cmd.cc +++ b/tools/ldb_cmd.cc @@ -1018,7 +1018,8 @@ void DumpManifestFile(Options options, std::string file, bool verbose, bool hex, ImmutableDBOptions immutable_db_options(options); VersionSet versions(dbname, &immutable_db_options, sopt, tc.get(), &wb, &wc, /*block_cache_tracer=*/nullptr); - Status s = versions.DumpManifest(options, file, verbose, hex, json, sst_file_number); + Status s = + versions.DumpManifest(options, file, verbose, hex, json, sst_file_number); if (!s.ok()) { printf("Error in processing file %s %s\n", file.c_str(), s.ToString().c_str()); diff --git a/tools/ldb_cmd_impl.h b/tools/ldb_cmd_impl.h index fe9fc47a277..35c43a23711 100644 --- a/tools/ldb_cmd_impl.h +++ b/tools/ldb_cmd_impl.h @@ -631,4 +631,4 @@ class UnsafeRemoveSstFileCommand : public LDBCommand { uint64_t sst_file_number_; }; -} // namespace ROCKSDB_NAMESPACE +} // namespace rocksdb diff --git a/tools/ldb_cmd_test.cc b/tools/ldb_cmd_test.cc index aea02105852..5278b61b265 100644 --- a/tools/ldb_cmd_test.cc +++ b/tools/ldb_cmd_test.cc @@ -6,6 +6,7 @@ #ifndef ROCKSDB_LITE #include "rocksdb/utilities/ldb_cmd.h" + #include "test_util/sync_point.h" #include "test_util/testharness.h" From b914b6b7d200d5cc88240c49ad0ff84054aa1c6b Mon Sep 17 00:00:00 2001 From: Little-Wallace Date: Tue, 25 Jan 2022 14:46:17 +0800 Subject: [PATCH 03/11] Revert "test: fix some perf context position" This reverts commit b84bd7739eb8786540e8e46f3e54d91aef4dd4e8. Signed-off-by: Little-Wallace --- db/c.cc | 71 +++++++++++++------------- db/column_family.cc | 21 +++----- db/db_impl/db_impl.cc | 12 ++--- db/db_impl/db_impl_compaction_flush.cc | 3 +- db/db_impl/db_impl_open.cc | 3 +- db/db_impl/db_impl_write.cc | 24 ++++++--- db/db_options_test.cc | 13 ++--- db/version_builder_test.cc | 1 - db/version_edit.h | 1 - db/version_set.h | 8 +-- include/rocksdb/utilities/ldb_cmd.h | 1 - monitoring/perf_flag.cc | 4 +- monitoring/perf_flag_imp.h | 3 +- options/options_helper.cc | 3 +- options/options_parser.cc | 6 +-- tools/ldb_cmd.cc | 3 +- tools/ldb_cmd_impl.h | 2 +- tools/ldb_cmd_test.cc | 1 - 18 files changed, 85 insertions(+), 95 deletions(-) diff --git a/db/c.cc b/db/c.cc index 30d624a6a45..51da66b8ba1 100644 --- a/db/c.cc +++ b/db/c.cc @@ -12,11 +12,6 @@ #include "rocksdb/c.h" #include - -#include -#include -#include - #include "port/port.h" #include "rocksdb/cache.h" #include "rocksdb/compaction_filter.h" @@ -49,60 +44,43 @@ #include "utilities/merge_operators.h" #include "utilities/rate_limiters/write_amp_based_rate_limiter.h" -using rocksdb::BackupableDBOptions; -using rocksdb::BackupEngine; -using rocksdb::BackupID; -using rocksdb::BackupInfo; -using rocksdb::BatchResult; -using rocksdb::BlockBasedTableOptions; -using rocksdb::BottommostLevelCompaction; +#include +#include +#include + using rocksdb::BytewiseComparator; using rocksdb::Cache; -using rocksdb::CheckPerfFlag; -using rocksdb::Checkpoint; using rocksdb::ColumnFamilyDescriptor; using rocksdb::ColumnFamilyHandle; using rocksdb::ColumnFamilyOptions; using rocksdb::CompactionFilter; using rocksdb::CompactionFilterFactory; using rocksdb::CompactionOptionsFIFO; -using rocksdb::CompactRangeOptions; using rocksdb::Comparator; using rocksdb::CompressionType; -using rocksdb::CuckooTableOptions; +using rocksdb::WALRecoveryMode; using rocksdb::DB; using rocksdb::DBOptions; using rocksdb::DbPath; -using rocksdb::DisablePerfFlag; -using rocksdb::EnablePerfFlag; using rocksdb::Env; using rocksdb::EnvOptions; +using rocksdb::InfoLogLevel; using rocksdb::FileLock; using rocksdb::FilterPolicy; using rocksdb::FlushOptions; -using rocksdb::InfoLogLevel; using rocksdb::IngestExternalFileOptions; using rocksdb::Iterator; -using rocksdb::LiveFileMetaData; using rocksdb::Logger; -using rocksdb::MemoryUtil; using rocksdb::MergeOperator; using rocksdb::MergeOperators; using rocksdb::NewBloomFilterPolicy; -using rocksdb::NewGenericRateLimiter; using rocksdb::NewLRUCache; -using rocksdb::NewWriteAmpBasedRateLimiter; -using rocksdb::OptimisticTransactionDB; -using rocksdb::OptimisticTransactionOptions; using rocksdb::Options; -using rocksdb::PerfContext; -using rocksdb::PerfLevel; -using rocksdb::PinnableSlice; +using rocksdb::BlockBasedTableOptions; +using rocksdb::CuckooTableOptions; using rocksdb::RandomAccessFile; using rocksdb::Range; -using rocksdb::RateLimiter; using rocksdb::ReadOptions; -using rocksdb::RestoreOptions; using rocksdb::SequentialFile; using rocksdb::Slice; using rocksdb::SliceParts; @@ -110,16 +88,37 @@ using rocksdb::SliceTransform; using rocksdb::Snapshot; using rocksdb::SstFileWriter; using rocksdb::Status; -using rocksdb::Transaction; -using rocksdb::TransactionDB; -using rocksdb::TransactionDBOptions; -using rocksdb::TransactionLogIterator; -using rocksdb::TransactionOptions; -using rocksdb::WALRecoveryMode; using rocksdb::WritableFile; using rocksdb::WriteBatch; using rocksdb::WriteBatchWithIndex; using rocksdb::WriteOptions; +using rocksdb::LiveFileMetaData; +using rocksdb::BackupEngine; +using rocksdb::BackupableDBOptions; +using rocksdb::BackupInfo; +using rocksdb::BackupID; +using rocksdb::RestoreOptions; +using rocksdb::CompactRangeOptions; +using rocksdb::BottommostLevelCompaction; +using rocksdb::RateLimiter; +using rocksdb::NewGenericRateLimiter; +using rocksdb::NewWriteAmpBasedRateLimiter; +using rocksdb::PinnableSlice; +using rocksdb::TransactionDBOptions; +using rocksdb::TransactionDB; +using rocksdb::TransactionOptions; +using rocksdb::OptimisticTransactionDB; +using rocksdb::OptimisticTransactionOptions; +using rocksdb::Transaction; +using rocksdb::Checkpoint; +using rocksdb::TransactionLogIterator; +using rocksdb::BatchResult; +using rocksdb::PerfLevel; +using rocksdb::EnablePerfFlag; +using rocksdb::DisablePerfFlag; +using rocksdb::CheckPerfFlag; +using rocksdb::PerfContext; +using rocksdb::MemoryUtil; using std::shared_ptr; using std::vector; diff --git a/db/column_family.cc b/db/column_family.cc index 0da7524a841..2a5493e43cc 100644 --- a/db/column_family.cc +++ b/db/column_family.cc @@ -738,8 +738,7 @@ WriteStallCondition ColumnFamilyData::RecalculateWriteStallConditions( bool needed_delay = write_controller->NeedsDelay(); if (write_stall_condition == WriteStallCondition::kStopped && - write_stall_cause == WriteStallCause::kMemtableLimit && - !mutable_cf_options.disable_write_stall) { + write_stall_cause == WriteStallCause::kMemtableLimit && !mutable_cf_options.disable_write_stall) { write_controller_token_ = write_controller->GetStopToken(); internal_stats_->AddCFStats(InternalStats::MEMTABLE_LIMIT_STOPS, 1); ROCKS_LOG_WARN( @@ -749,8 +748,7 @@ WriteStallCondition ColumnFamilyData::RecalculateWriteStallConditions( name_.c_str(), imm()->NumNotFlushed(), mutable_cf_options.max_write_buffer_number); } else if (write_stall_condition == WriteStallCondition::kStopped && - write_stall_cause == WriteStallCause::kL0FileCountLimit && - !mutable_cf_options.disable_write_stall) { + write_stall_cause == WriteStallCause::kL0FileCountLimit && !mutable_cf_options.disable_write_stall) { write_controller_token_ = write_controller->GetStopToken(); internal_stats_->AddCFStats(InternalStats::L0_FILE_COUNT_LIMIT_STOPS, 1); if (compaction_picker_->IsLevel0CompactionInProgress()) { @@ -761,8 +759,7 @@ WriteStallCondition ColumnFamilyData::RecalculateWriteStallConditions( "[%s] Stopping writes because we have %d level-0 files", name_.c_str(), vstorage->l0_delay_trigger_count()); } else if (write_stall_condition == WriteStallCondition::kStopped && - write_stall_cause == WriteStallCause::kPendingCompactionBytes && - !mutable_cf_options.disable_write_stall) { + write_stall_cause == WriteStallCause::kPendingCompactionBytes && !mutable_cf_options.disable_write_stall) { write_controller_token_ = write_controller->GetStopToken(); internal_stats_->AddCFStats( InternalStats::PENDING_COMPACTION_BYTES_LIMIT_STOPS, 1); @@ -772,8 +769,7 @@ WriteStallCondition ColumnFamilyData::RecalculateWriteStallConditions( "bytes %" PRIu64, name_.c_str(), compaction_needed_bytes); } else if (write_stall_condition == WriteStallCondition::kDelayed && - write_stall_cause == WriteStallCause::kMemtableLimit && - !mutable_cf_options.disable_write_stall) { + write_stall_cause == WriteStallCause::kMemtableLimit && !mutable_cf_options.disable_write_stall) { write_controller_token_ = SetupDelay(write_controller, compaction_needed_bytes, prev_compaction_needed_bytes_, was_stopped, @@ -788,8 +784,7 @@ WriteStallCondition ColumnFamilyData::RecalculateWriteStallConditions( mutable_cf_options.max_write_buffer_number, write_controller->delayed_write_rate()); } else if (write_stall_condition == WriteStallCondition::kDelayed && - write_stall_cause == WriteStallCause::kL0FileCountLimit && - !mutable_cf_options.disable_write_stall) { + write_stall_cause == WriteStallCause::kL0FileCountLimit && !mutable_cf_options.disable_write_stall) { // L0 is the last two files from stopping. bool near_stop = vstorage->l0_delay_trigger_count() >= mutable_cf_options.level0_stop_writes_trigger - 2; @@ -809,8 +804,7 @@ WriteStallCondition ColumnFamilyData::RecalculateWriteStallConditions( name_.c_str(), vstorage->l0_delay_trigger_count(), write_controller->delayed_write_rate()); } else if (write_stall_condition == WriteStallCondition::kDelayed && - write_stall_cause == WriteStallCause::kPendingCompactionBytes && - !mutable_cf_options.disable_write_stall) { + write_stall_cause == WriteStallCause::kPendingCompactionBytes && !mutable_cf_options.disable_write_stall) { // If the distance to hard limit is less than 1/4 of the gap between soft // and // hard bytes limit, we think it is near stop and speed up the slowdown. @@ -835,8 +829,7 @@ WriteStallCondition ColumnFamilyData::RecalculateWriteStallConditions( name_.c_str(), vstorage->estimated_compaction_needed_bytes(), write_controller->delayed_write_rate()); } else { - assert(write_stall_condition == WriteStallCondition::kNormal || - mutable_cf_options.disable_write_stall); + assert(write_stall_condition == WriteStallCondition::kNormal || mutable_cf_options.disable_write_stall); if (vstorage->l0_delay_trigger_count() >= GetL0ThresholdSpeedupCompaction( mutable_cf_options.level0_file_num_compaction_trigger, diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index f5c569900df..9cbaba2966c 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -163,7 +163,6 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname, seq_per_batch_(seq_per_batch), batch_per_txn_(batch_per_txn), db_lock_(nullptr), - log_write_mutex_(stats_, env_, DB_MUTEX_WAIT_MICROS, false), shutting_down_(false), bg_cv_(&mutex_), logfile_number_(0), @@ -1020,12 +1019,11 @@ Status DBImpl::SetDBOptions( mutable_db_options_.max_background_jobs, mutable_db_options_.base_background_compactions, /* parallelize_compactions */ true); - const BGJobLimits new_bg_job_limits = - GetBGJobLimits(new_options.max_background_flushes, - new_options.max_background_compactions, - new_options.max_background_jobs, - new_options.base_background_compactions, - /* parallelize_compactions */ true); + const BGJobLimits new_bg_job_limits = GetBGJobLimits( + new_options.max_background_flushes, + new_options.max_background_compactions, + new_options.max_background_jobs, + new_options.base_background_compactions, /* parallelize_compactions */ true); const bool max_flushes_increased = new_bg_job_limits.max_flushes > current_bg_job_limits.max_flushes; diff --git a/db/db_impl/db_impl_compaction_flush.cc b/db/db_impl/db_impl_compaction_flush.cc index b9dcb397de7..dfe681a97ab 100644 --- a/db/db_impl/db_impl_compaction_flush.cc +++ b/db/db_impl/db_impl_compaction_flush.cc @@ -1988,8 +1988,7 @@ DBImpl::BGJobLimits DBImpl::GetBGJobLimits(int max_background_flushes, } if (!parallelize_compactions) { // throttle background compactions until we deem necessary - res.max_compactions = - std::max(1, std::min(base_background_compactions, res.max_compactions)); + res.max_compactions = std::max(1, std::min(base_background_compactions, res.max_compactions)); } return res; } diff --git a/db/db_impl/db_impl_open.cc b/db/db_impl/db_impl_open.cc index 630368ff82f..b1623f7eee2 100644 --- a/db/db_impl/db_impl_open.cc +++ b/db/db_impl/db_impl_open.cc @@ -57,8 +57,7 @@ DBOptions SanitizeOptions(const std::string& dbname, const DBOptions& src) { } auto bg_job_limits = DBImpl::GetBGJobLimits( result.max_background_flushes, result.max_background_compactions, - result.max_background_jobs, result.base_background_compactions, - true /* parallelize_compactions */); + result.max_background_jobs, result.base_background_compactions, true /* parallelize_compactions */); result.env->IncBackgroundThreadsIfNeeded(bg_job_limits.max_compactions, Env::Priority::LOW); result.env->IncBackgroundThreadsIfNeeded(bg_job_limits.max_flushes, diff --git a/db/db_impl/db_impl_write.cc b/db/db_impl/db_impl_write.cc index d3c6c6ca347..a703820d80c 100644 --- a/db/db_impl/db_impl_write.cc +++ b/db/db_impl/db_impl_write.cc @@ -88,15 +88,15 @@ Status DBImpl::MultiBatchWriteImpl(const WriteOptions& write_options, WriteContext write_context; bool ignore_missing_faimly = write_options.ignore_missing_column_families; if (writer.state == WriteThread::STATE_GROUP_LEADER) { - PERF_TIMER_STOP(write_pre_and_post_process_time); - PERF_TIMER_GUARD(write_delay_time); if (writer.callback && !writer.callback->AllowWriteBatching()) { write_thread_.WaitForMemTableWriters(); } WriteThread::WriteGroup wal_write_group; LogContext log_context; + PERF_TIMER_STOP(write_pre_and_post_process_time); writer.status = PreprocessWrite(write_options, &log_context, &write_context); + PERF_TIMER_START(write_pre_and_post_process_time); // This can set non-OK status if callback fail. last_batch_group_size_ = @@ -132,6 +132,7 @@ Status DBImpl::MultiBatchWriteImpl(const WriteOptions& write_options, RecordTick(stats_, BYTES_WRITTEN, total_byte_size); RecordInHistogram(stats_, BYTES_PER_WRITE, total_byte_size); + PERF_TIMER_STOP(write_pre_and_post_process_time); if (!write_options.disableWAL) { PERF_TIMER_GUARD(write_wal_time); stats->AddDBStats(InternalStats::kIntStatsWriteDoneBySelf, 1); @@ -162,6 +163,7 @@ Status DBImpl::MultiBatchWriteImpl(const WriteOptions& write_options, bool is_leader_thread = false; WriteThread::WriteGroup memtable_write_group; if (writer.state == WriteThread::STATE_MEMTABLE_WRITER_LEADER) { + PERF_TIMER_GUARD(write_memtable_time); assert(writer.ShouldWriteToMemtable()); write_thread_.EnterAsMemTableWriter(&writer, &memtable_write_group); assert(immutable_db_options_.allow_concurrent_memtable_write); @@ -169,7 +171,6 @@ Status DBImpl::MultiBatchWriteImpl(const WriteOptions& write_options, is_leader_thread = true; write_thread_.LaunchParallelMemTableWriters(&memtable_write_group); } else { - PERF_TIMER_GUARD(write_memtable_time); auto version_set = versions_->GetColumnFamilySet(); memtable_write_group.running.store(0); for (auto it = memtable_write_group.begin(); @@ -193,7 +194,6 @@ Status DBImpl::MultiBatchWriteImpl(const WriteOptions& write_options, } if (writer.state == WriteThread::STATE_PARALLEL_MEMTABLE_WRITER) { assert(writer.ShouldWriteToMemtable()); - PERF_TIMER_GUARD(write_memtable_time); auto version_set = versions_->GetColumnFamilySet(); WriteBatchInternal::AsyncInsertInto( &writer, writer.sequence, version_set, &flush_scheduler_, @@ -640,8 +640,8 @@ Status DBImpl::PipelinedWriteImpl(const WriteOptions& write_options, LogContext log_context(!write_options.disableWAL && write_options.sync); // PreprocessWrite does its own perf timing. PERF_TIMER_STOP(write_pre_and_post_process_time); - PERF_TIMER_GUARD(write_delay_time); w.status = PreprocessWrite(write_options, &log_context, &write_context); + PERF_TIMER_START(write_pre_and_post_process_time); // This can set non-OK status if callback fail. last_batch_group_size_ = @@ -678,6 +678,8 @@ Status DBImpl::PipelinedWriteImpl(const WriteOptions& write_options, RecordTick(stats_, BYTES_WRITTEN, total_byte_size); RecordInHistogram(stats_, BYTES_PER_WRITE, total_byte_size); + PERF_TIMER_STOP(write_pre_and_post_process_time); + if (w.status.ok() && !write_options.disableWAL) { PERF_TIMER_GUARD(write_wal_time); stats->AddDBStats(InternalStats::kIntStatsWriteDoneBySelf, 1); @@ -750,7 +752,7 @@ Status DBImpl::UnorderedWriteMemtable(const WriteOptions& write_options, WriteCallback* callback, uint64_t log_ref, SequenceNumber seq, const size_t sub_batch_cnt) { - PERF_TIMER_GUARD(write_memtable_time); + PERF_TIMER_GUARD(write_pre_and_post_process_time); StopWatch write_sw(env_, immutable_db_options_.statistics.get(), DB_WRITE); WriteThread::Writer w(write_options, my_batch, callback, log_ref, @@ -822,8 +824,6 @@ Status DBImpl::WriteImplWALOnly( // else we are the leader of the write batch group assert(w.state == WriteThread::STATE_GROUP_LEADER); - PERF_TIMER_STOP(write_pre_and_post_process_time); - PERF_TIMER_GUARD(write_delay_time); if (publish_last_seq == kDoPublishLastSeq) { // Currently we only use kDoPublishLastSeq in unordered_write assert(immutable_db_options_.unordered_write); @@ -884,6 +884,8 @@ Status DBImpl::WriteImplWALOnly( } RecordInHistogram(stats_, BYTES_PER_WRITE, total_byte_size); + PERF_TIMER_STOP(write_pre_and_post_process_time); + PERF_TIMER_GUARD(write_wal_time); // LastAllocatedSequence is increased inside WriteToWAL under // wal_write_mutex_ to ensure ordered events in WAL @@ -932,6 +934,7 @@ Status DBImpl::WriteImplWALOnly( status = SyncWAL(); } } + PERF_TIMER_START(write_pre_and_post_process_time); if (!w.CallbackFailed()) { WriteStatusCheck(status); @@ -1033,15 +1036,19 @@ Status DBImpl::PreprocessWrite(const WriteOptions& write_options, } PERF_TIMER_STOP(write_scheduling_flushes_compactions_time); + PERF_TIMER_GUARD(write_pre_and_post_process_time); if (UNLIKELY(status.ok() && (write_controller_.IsStopped() || write_controller_.NeedsDelay()))) { + PERF_TIMER_STOP(write_pre_and_post_process_time); + PERF_TIMER_GUARD(write_delay_time); // We don't know size of curent batch so that we always use the size // for previous one. It might create a fairness issue that expiration // might happen for smaller writes but larger writes can go through. // Can optimize it if it is an issue. InstrumentedMutexLock l(&mutex_); status = DelayWrite(last_batch_group_size_, write_options); + PERF_TIMER_START(write_pre_and_post_process_time); } InstrumentedMutexLock l(&log_write_mutex_); @@ -1627,6 +1634,7 @@ Status DBImpl::ThrottleLowPriWritesIfNeeded(const WriteOptions& write_options, // is that in case the write is heavy, low pri writes may never have // a chance to run. Now we guarantee we are still slowly making // progress. + PERF_TIMER_GUARD(write_delay_time); write_controller_.low_pri_rate_limiter()->Request( my_batch->GetDataSize(), Env::IO_HIGH, nullptr /* stats */, RateLimiter::OpType::kWrite); diff --git a/db/db_options_test.cc b/db/db_options_test.cc index 46cdf7ac73d..1a637f934b9 100644 --- a/db/db_options_test.cc +++ b/db/db_options_test.cc @@ -406,10 +406,10 @@ TEST_F(DBOptionsTest, EnableAutoCompactionButDisableStall) { dbfull()->TEST_WaitForFlushMemTable(); ASSERT_EQ(2, NumTableFilesAtLevel(0)); uint64_t l0_size = SizeAtLevel(0); - + options.hard_pending_compaction_bytes_limit = l0_size; options.soft_pending_compaction_bytes_limit = l0_size; - + Reopen(options); dbfull()->TEST_WaitForCompact(); ASSERT_FALSE(dbfull()->TEST_write_controler().IsStopped()); @@ -418,14 +418,15 @@ TEST_F(DBOptionsTest, EnableAutoCompactionButDisableStall) { SyncPoint::GetInstance()->LoadDependency( {{"DBOptionsTest::EnableAutoCompactionButDisableStall:1", "BackgroundCallCompaction:0"}, - {"DBImpl::BackgroundCompaction():BeforePickCompaction", + {"DBImpl::BackgroundCompaction():BeforePickCompaction", "DBOptionsTest::EnableAutoCompactionButDisableStall:2"}, - {"DBOptionsTest::EnableAutoCompactionButDisableStall:3", + {"DBOptionsTest::EnableAutoCompactionButDisableStall:3", "DBImpl::BackgroundCompaction():AfterPickCompaction"}}); // Block background compaction. SyncPoint::GetInstance()->EnableProcessing(); - ASSERT_OK(dbfull()->SetOptions({{"disable_auto_compactions", "false"}})); + ASSERT_OK( + dbfull()->SetOptions({{"disable_auto_compactions", "false"}})); TEST_SYNC_POINT("DBOptionsTest::EnableAutoCompactionButDisableStall:1"); // Wait for stall condition recalculate. TEST_SYNC_POINT("DBOptionsTest::EnableAutoCompactionButDisableStall:2"); @@ -433,7 +434,7 @@ TEST_F(DBOptionsTest, EnableAutoCompactionButDisableStall) { ASSERT_FALSE(dbfull()->TEST_write_controler().IsStopped()); ASSERT_FALSE(dbfull()->TEST_write_controler().NeedsDelay()); ASSERT_TRUE(dbfull()->TEST_write_controler().NeedSpeedupCompaction()); - + TEST_SYNC_POINT("DBOptionsTest::EnableAutoCompactionButDisableStall:3"); // Background compaction executed. diff --git a/db/version_builder_test.cc b/db/version_builder_test.cc index d2df7a48ec1..2f049ef2a64 100644 --- a/db/version_builder_test.cc +++ b/db/version_builder_test.cc @@ -5,7 +5,6 @@ #include #include - #include "db/version_edit.h" #include "db/version_set.h" #include "logging/logging.h" diff --git a/db/version_edit.h b/db/version_edit.h index 9860a04233e..13f3e8b9e98 100644 --- a/db/version_edit.h +++ b/db/version_edit.h @@ -13,7 +13,6 @@ #include #include #include - #include "db/dbformat.h" #include "memory/arena.h" #include "rocksdb/cache.h" diff --git a/db/version_set.h b/db/version_set.h index f3c3dc510b2..583080291a7 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -927,10 +927,10 @@ class VersionSet { const EnvOptions& env_options, int new_levels); - // If sst_file_number is > 0, only prints manifest info for specified SST file - // number - Status DumpManifest(Options& options, std::string& dscname, bool verbose, - bool hex, bool json, uint64_t sst_file_number); + // If sst_file_number is > 0, only prints manifest info for specified SST file number +Status DumpManifest(Options& options, std::string& dscname, + bool verbose, bool hex, bool json, + uint64_t sst_file_number); #endif // ROCKSDB_LITE diff --git a/include/rocksdb/utilities/ldb_cmd.h b/include/rocksdb/utilities/ldb_cmd.h index 76184d6c48c..c43ef6fb212 100644 --- a/include/rocksdb/utilities/ldb_cmd.h +++ b/include/rocksdb/utilities/ldb_cmd.h @@ -9,7 +9,6 @@ #include #include - #include #include #include diff --git a/monitoring/perf_flag.cc b/monitoring/perf_flag.cc index 534c36e3261..a3bdbda353e 100644 --- a/monitoring/perf_flag.cc +++ b/monitoring/perf_flag.cc @@ -22,8 +22,8 @@ void DisablePerfFlag(uint64_t flag) { } bool CheckPerfFlag(uint64_t flag) { - return ((uint64_t)GET_FLAG(flag) & (uint64_t)0b1 - << (flag & (uint64_t)0b111)) != 0; + return ((uint64_t)GET_FLAG(flag) & + (uint64_t)0b1 << (flag & (uint64_t)0b111)) != 0; } } // namespace rocksdb diff --git a/monitoring/perf_flag_imp.h b/monitoring/perf_flag_imp.h index ebc0b9430bf..453c5e03db8 100644 --- a/monitoring/perf_flag_imp.h +++ b/monitoring/perf_flag_imp.h @@ -1,5 +1,4 @@ #include - #include "rocksdb/perf_flag.h" namespace rocksdb { @@ -8,4 +7,4 @@ extern __thread uint8_t perf_flags[FLAGS_LEN]; #else extern uint8_t perf_flags[FLAGS_LEN]; #endif -} // namespace rocksdb +} diff --git a/options/options_helper.cc b/options/options_helper.cc index 0bebea9da95..a1a998c38ad 100644 --- a/options/options_helper.cc +++ b/options/options_helper.cc @@ -165,7 +165,8 @@ ColumnFamilyOptions BuildColumnFamilyOptions( // Compaction related options cf_opts.disable_auto_compactions = mutable_cf_options.disable_auto_compactions; - cf_opts.disable_write_stall = mutable_cf_options.disable_write_stall; + cf_opts.disable_write_stall = + mutable_cf_options.disable_write_stall; cf_opts.soft_pending_compaction_bytes_limit = mutable_cf_options.soft_pending_compaction_bytes_limit; cf_opts.hard_pending_compaction_bytes_limit = diff --git a/options/options_parser.cc b/options/options_parser.cc index 13d2eb36cf7..4b7f95d1287 100644 --- a/options/options_parser.cc +++ b/options/options_parser.cc @@ -202,8 +202,7 @@ Status RocksDBOptionsParser::ParseStatement(std::string* name, Status RocksDBOptionsParser::Parse(const std::string& file_name, Env* env, bool ignore_unknown_options) { - ConfigOptions - config_options; // Use default for escaped(true) and check (exact) + ConfigOptions config_options; // Use default for escaped(true) and check (exact) config_options.ignore_unknown_options = ignore_unknown_options; config_options.env = env; return Parse(config_options, file_name); @@ -216,8 +215,7 @@ Status RocksDBOptionsParser::Parse(const ConfigOptions& config_options_in, auto ignore_unknown_options = config_options.ignore_unknown_options; std::unique_ptr seq_file; - Status s = - config_options.env->NewSequentialFile(file_name, &seq_file, EnvOptions()); + Status s = config_options.env->NewSequentialFile(file_name, &seq_file, EnvOptions()); if (!s.ok()) { return s; } diff --git a/tools/ldb_cmd.cc b/tools/ldb_cmd.cc index 396c569a323..1c91b8382d5 100644 --- a/tools/ldb_cmd.cc +++ b/tools/ldb_cmd.cc @@ -1018,8 +1018,7 @@ void DumpManifestFile(Options options, std::string file, bool verbose, bool hex, ImmutableDBOptions immutable_db_options(options); VersionSet versions(dbname, &immutable_db_options, sopt, tc.get(), &wb, &wc, /*block_cache_tracer=*/nullptr); - Status s = - versions.DumpManifest(options, file, verbose, hex, json, sst_file_number); + Status s = versions.DumpManifest(options, file, verbose, hex, json, sst_file_number); if (!s.ok()) { printf("Error in processing file %s %s\n", file.c_str(), s.ToString().c_str()); diff --git a/tools/ldb_cmd_impl.h b/tools/ldb_cmd_impl.h index 35c43a23711..fe9fc47a277 100644 --- a/tools/ldb_cmd_impl.h +++ b/tools/ldb_cmd_impl.h @@ -631,4 +631,4 @@ class UnsafeRemoveSstFileCommand : public LDBCommand { uint64_t sst_file_number_; }; -} // namespace rocksdb +} // namespace ROCKSDB_NAMESPACE diff --git a/tools/ldb_cmd_test.cc b/tools/ldb_cmd_test.cc index 5278b61b265..aea02105852 100644 --- a/tools/ldb_cmd_test.cc +++ b/tools/ldb_cmd_test.cc @@ -6,7 +6,6 @@ #ifndef ROCKSDB_LITE #include "rocksdb/utilities/ldb_cmd.h" - #include "test_util/sync_point.h" #include "test_util/testharness.h" From 914ca62332ec49c83f22b36902160684f1a395dc Mon Sep 17 00:00:00 2001 From: Little-Wallace Date: Thu, 27 Jan 2022 14:59:54 +0800 Subject: [PATCH 04/11] fix alive_log_files_ Signed-off-by: Little-Wallace --- db/db_impl/db_impl.cc | 11 ++++++----- db/db_impl/db_impl.h | 2 +- db/db_impl/db_impl_compaction_flush.cc | 3 ++- db/db_impl/db_impl_debug.cc | 4 ++-- db/db_impl/db_impl_open.cc | 20 ++++++++++++-------- 5 files changed, 23 insertions(+), 17 deletions(-) diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index 9cbaba2966c..34ab02ea7e4 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -1019,11 +1019,12 @@ Status DBImpl::SetDBOptions( mutable_db_options_.max_background_jobs, mutable_db_options_.base_background_compactions, /* parallelize_compactions */ true); - const BGJobLimits new_bg_job_limits = GetBGJobLimits( - new_options.max_background_flushes, - new_options.max_background_compactions, - new_options.max_background_jobs, - new_options.base_background_compactions, /* parallelize_compactions */ true); + const BGJobLimits new_bg_job_limits = + GetBGJobLimits(new_options.max_background_flushes, + new_options.max_background_compactions, + new_options.max_background_jobs, + new_options.base_background_compactions, + /* parallelize_compactions */ true); const bool max_flushes_increased = new_bg_job_limits.max_flushes > current_bg_job_limits.max_flushes; diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index b26e1c0d3fc..21abe4ca2fc 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -1693,7 +1693,7 @@ class DBImpl : public DB { std::atomic total_log_size_; // If this is non-empty, we need to delete these log files in background - // threads. Protected by db mutex. + // threads. Protected by db log_write_mutex_. autovector logs_to_free_; bool is_snapshot_supported_; diff --git a/db/db_impl/db_impl_compaction_flush.cc b/db/db_impl/db_impl_compaction_flush.cc index dfe681a97ab..b9dcb397de7 100644 --- a/db/db_impl/db_impl_compaction_flush.cc +++ b/db/db_impl/db_impl_compaction_flush.cc @@ -1988,7 +1988,8 @@ DBImpl::BGJobLimits DBImpl::GetBGJobLimits(int max_background_flushes, } if (!parallelize_compactions) { // throttle background compactions until we deem necessary - res.max_compactions = std::max(1, std::min(base_background_compactions, res.max_compactions)); + res.max_compactions = + std::max(1, std::min(base_background_compactions, res.max_compactions)); } return res; } diff --git a/db/db_impl/db_impl_debug.cc b/db/db_impl/db_impl_debug.cc index 566c175735a..51371b95f30 100644 --- a/db/db_impl/db_impl_debug.cc +++ b/db/db_impl/db_impl_debug.cc @@ -187,12 +187,12 @@ void DBImpl::TEST_EndWrite(void* w) { } size_t DBImpl::TEST_LogsToFreeSize() { - InstrumentedMutexLock l(&mutex_); + InstrumentedMutexLock l(&log_write_mutex_); return logs_to_free_.size(); } uint64_t DBImpl::TEST_LogfileNumber() { - InstrumentedMutexLock l(&mutex_); + InstrumentedMutexLock l(&log_write_mutex_); return logfile_number_; } diff --git a/db/db_impl/db_impl_open.cc b/db/db_impl/db_impl_open.cc index b1623f7eee2..307e66d2438 100644 --- a/db/db_impl/db_impl_open.cc +++ b/db/db_impl/db_impl_open.cc @@ -57,7 +57,8 @@ DBOptions SanitizeOptions(const std::string& dbname, const DBOptions& src) { } auto bg_job_limits = DBImpl::GetBGJobLimits( result.max_background_flushes, result.max_background_compactions, - result.max_background_jobs, result.base_background_compactions, true /* parallelize_compactions */); + result.max_background_jobs, result.base_background_compactions, + true /* parallelize_compactions */); result.env->IncBackgroundThreadsIfNeeded(bg_job_limits.max_compactions, Env::Priority::LOW); result.env->IncBackgroundThreadsIfNeeded(bg_job_limits.max_flushes, @@ -1066,7 +1067,14 @@ Status DBImpl::RestoreAliveLogFiles(const std::vector& log_numbers) { break; } total_log_size_ += log.size; - alive_log_files_.push_back(log); + { + if (two_write_queues_) { + alive_log_files_.push_back(log); + } else { + InstrumentedMutexLock l(&log_write_mutex_); + alive_log_files_.push_back(log); + } + } // We preallocate space for logs, but then after a crash and restart, those // preallocated space are not needed anymore. It is likely only the last // log has such preallocated space, so we only truncate for the last log. @@ -1380,14 +1388,10 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname, cfd, &sv_context, *cfd->GetLatestMutableCFOptions()); } sv_context.Clean(); - if (impl->two_write_queues_) { - impl->log_write_mutex_.Lock(); - } + impl->log_write_mutex_.Lock(); impl->alive_log_files_.push_back( DBImpl::LogFileNumberSize(impl->logfile_number_)); - if (impl->two_write_queues_) { - impl->log_write_mutex_.Unlock(); - } + impl->log_write_mutex_.Unlock(); impl->DeleteObsoleteFiles(); s = impl->directories_.GetDbDir()->Fsync(); } From 1f0b313f07d34df07ef2d34c04796e6d9fec11f5 Mon Sep 17 00:00:00 2001 From: Little-Wallace Date: Thu, 27 Jan 2022 15:38:46 +0800 Subject: [PATCH 05/11] fix comment Signed-off-by: Little-Wallace --- db/db_impl/db_impl.h | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index 21abe4ca2fc..12ee1db1906 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -1667,11 +1667,10 @@ class DBImpl : public DB { std::deque alive_log_files_; // Log files that aren't fully synced, and the current log file. // Synchronization: - // - push_back() is done from write_thread_ with locked mutex_ and - // log_write_mutex_ + // - push_back() is done from write_thread_ with locked log_write_mutex_ // - pop_front() is done from any thread with locked mutex_ and // log_write_mutex_ - // - reads are done with either locked mutex_ or log_write_mutex_ + // - reads are done with locked log_write_mutex_ // - back() and items with getting_synced=true are not popped, // - The same thread that sets getting_synced=true will reset it. // - it follows that the object referred by back() can be safely read from From d17b969c6d02e547f7faccf88464ac860ea57ce8 Mon Sep 17 00:00:00 2001 From: Little-Wallace Date: Fri, 11 Feb 2022 22:17:02 +0800 Subject: [PATCH 06/11] do not hold mutex when fsync Signed-off-by: Little-Wallace --- db/db_impl/db_impl_compaction_flush.cc | 2 ++ 1 file changed, 2 insertions(+) diff --git a/db/db_impl/db_impl_compaction_flush.cc b/db/db_impl/db_impl_compaction_flush.cc index b9dcb397de7..99d6f2f8b52 100644 --- a/db/db_impl/db_impl_compaction_flush.cc +++ b/db/db_impl/db_impl_compaction_flush.cc @@ -98,6 +98,7 @@ Status DBImpl::SyncClosedLogs(JobContext* job_context) { Status s; if (!logs_to_sync.empty()) { log_write_mutex_.Unlock(); + mutex_.Unlock(); for (log::Writer* log : logs_to_sync) { ROCKS_LOG_INFO(immutable_db_options_.info_log, @@ -124,6 +125,7 @@ Status DBImpl::SyncClosedLogs(JobContext* job_context) { // "number <= current_log_number - 1" is equivalent to // "number < current_log_number". MarkLogsSynced(current_log_number - 1, true, s); + mutex_.Lock(); if (!s.ok()) { error_handler_.SetBGError(s, BackgroundErrorReason::kFlush); TEST_SYNC_POINT("DBImpl::SyncClosedLogs:Failed"); From 59ab24b6c9207d3adc4a14e711dbabb94649cac9 Mon Sep 17 00:00:00 2001 From: Little-Wallace Date: Fri, 11 Feb 2022 23:30:15 +0800 Subject: [PATCH 07/11] fix lock Signed-off-by: Little-Wallace --- db/db_impl/db_impl_compaction_flush.cc | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/db/db_impl/db_impl_compaction_flush.cc b/db/db_impl/db_impl_compaction_flush.cc index 99d6f2f8b52..8c5bd6a36fd 100644 --- a/db/db_impl/db_impl_compaction_flush.cc +++ b/db/db_impl/db_impl_compaction_flush.cc @@ -98,7 +98,6 @@ Status DBImpl::SyncClosedLogs(JobContext* job_context) { Status s; if (!logs_to_sync.empty()) { log_write_mutex_.Unlock(); - mutex_.Unlock(); for (log::Writer* log : logs_to_sync) { ROCKS_LOG_INFO(immutable_db_options_.info_log, @@ -125,9 +124,7 @@ Status DBImpl::SyncClosedLogs(JobContext* job_context) { // "number <= current_log_number - 1" is equivalent to // "number < current_log_number". MarkLogsSynced(current_log_number - 1, true, s); - mutex_.Lock(); if (!s.ok()) { - error_handler_.SetBGError(s, BackgroundErrorReason::kFlush); TEST_SYNC_POINT("DBImpl::SyncClosedLogs:Failed"); return s; } @@ -177,7 +174,9 @@ Status DBImpl::FlushMemTableToOutputFile( // flushed SST may contain data from write batches whose updates to // other column families are missing. // SyncClosedLogs() may unlock and re-lock the db_mutex. + mutex_.Unlock(); s = SyncClosedLogs(job_context); + mutex_.Lock(); } else { TEST_SYNC_POINT("DBImpl::SyncClosedLogs:Skip"); } @@ -359,7 +358,9 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles( if (logfile_number_ > 0) { // TODO (yanqin) investigate whether we should sync the closed logs for // single column family case. + mutex_.Unlock(); s = SyncClosedLogs(job_context); + mutex_.Lock(); } // exec_status stores the execution status of flush_jobs as From 46d7b1965030b13accb850935e5e36c3999dbfee Mon Sep 17 00:00:00 2001 From: Little-Wallace Date: Mon, 21 Feb 2022 17:46:00 +0800 Subject: [PATCH 08/11] address comment Signed-off-by: Little-Wallace --- db/db_impl/db_impl.h | 5 ++++- db/db_impl/db_impl_write.cc | 2 +- db/error_handler.cc | 4 ++-- db/error_handler.h | 8 +++++--- 4 files changed, 12 insertions(+), 7 deletions(-) diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index 12ee1db1906..37d1cc8bdf5 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -937,6 +937,8 @@ class DBImpl : public DB { // only used for dynamically adjusting max_total_wal_size. it is a sum of // [write_buffer_size * max_write_buffer_number] over all column families std::atomic max_total_in_memory_state_; + + std::atomic max_total_wal_size_; // If true, we have only one (default) column family. We use this to optimize // some code-paths std::atomic single_column_family_mode_; @@ -1131,6 +1133,7 @@ class DBImpl : public DB { } } }; + struct LogContext { explicit LogContext(bool need_sync = false) : need_log_sync(need_sync), need_log_dir_sync(need_sync) {} @@ -1138,6 +1141,7 @@ class DBImpl : public DB { bool need_log_dir_sync; log::Writer* writer; }; + struct LogFileNumberSize { explicit LogFileNumberSize(uint64_t _number) : number(_number) {} void AddSize(uint64_t new_size) { size += new_size; } @@ -1939,7 +1943,6 @@ class DBImpl : public DB { InstrumentedCondVar atomic_flush_install_cv_; bool wal_in_db_path_; - std::atomic max_total_wal_size_; }; extern Options SanitizeOptions(const std::string& db, const Options& src); diff --git a/db/db_impl/db_impl_write.cc b/db/db_impl/db_impl_write.cc index a703820d80c..d72c732d5f8 100644 --- a/db/db_impl/db_impl_write.cc +++ b/db/db_impl/db_impl_write.cc @@ -1532,7 +1532,7 @@ uint64_t DBImpl::GetMaxTotalWalSize() const { if (max_total_wal_size > 0) { return max_total_wal_size; } - return 4 * max_total_in_memory_state_; + return 4 * max_total_in_memory_state_.load(std::memory_order_acquire); } // REQUIRES: mutex_ is held diff --git a/db/error_handler.cc b/db/error_handler.cc index 8a275b6a155..208b50c34a8 100644 --- a/db/error_handler.cc +++ b/db/error_handler.cc @@ -221,7 +221,7 @@ Status ErrorHandler::SetBGError(const Status& bg_err, BackgroundErrorReason reas if (!s.ok() && (s.severity() > bg_error_.severity())) { bg_error_ = s; if (bg_error_.severity() >= Status::Severity::kHardError) { - stop_state_.store(true, std::memory_order_release); + db_stopped_.store(true, std::memory_order_release); } } else { // This error is less severe than previously encountered error. Don't @@ -298,7 +298,7 @@ Status ErrorHandler::ClearBGError() { if (recovery_error_.ok()) { Status old_bg_error = bg_error_; bg_error_ = Status::OK(); - stop_state_.store(false, std::memory_order_release); + db_stopped_.store(false, std::memory_order_release); recovery_in_prog_ = false; EventHelpers::NotifyOnErrorRecoveryCompleted(db_options_.listeners, old_bg_error, db_mutex_); diff --git a/db/error_handler.h b/db/error_handler.h index 95f72659f4d..5fa6ffab822 100644 --- a/db/error_handler.h +++ b/db/error_handler.h @@ -24,7 +24,7 @@ class ErrorHandler { db_mutex_(db_mutex), auto_recovery_(false), recovery_in_prog_(false), - stop_state_(false) {} + db_stopped_(false) {} ~ErrorHandler() {} void EnableAutoRecovery() { auto_recovery_ = true; } @@ -37,8 +37,10 @@ class ErrorHandler { Status ClearBGError(); - bool IsDBStopped() { return stop_state_.load(std::memory_order_acquire); } + // Do not require DB mutex held. + bool IsDBStopped() { return db_stopped_.load(std::memory_order_acquire); } + // Require DB mutex held. bool IsBGWorkStopped() { return !bg_error_.ok() && (bg_error_.severity() >= Status::Severity::kHardError || @@ -61,7 +63,7 @@ class ErrorHandler { // A flag indicating whether automatic recovery from errors is enabled bool auto_recovery_; bool recovery_in_prog_; - std::atomic stop_state_; + std::atomic db_stopped_; Status OverrideNoSpaceError(Status bg_error, bool* auto_recovery); void RecoverFromNoSpace(); From 55409a296476fd74e20d7306513f881f5ed47049 Mon Sep 17 00:00:00 2001 From: Little-Wallace Date: Wed, 23 Feb 2022 13:11:20 +0800 Subject: [PATCH 09/11] addressed Signed-off-by: Little-Wallace --- db/db_impl/db_impl.cc | 2 +- db/db_impl/db_impl_compaction_flush.cc | 5 ++++- db/db_impl/db_impl_write.cc | 2 +- 3 files changed, 6 insertions(+), 3 deletions(-) diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index 34ab02ea7e4..92918d88ae7 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -2126,6 +2126,7 @@ Status DBImpl::CreateColumnFamilyImpl(const ColumnFamilyOptions& cf_options, write_thread_.ExitUnbatched(&w); } if (s.ok()) { + single_column_family_mode_.store(false, std::memory_order_release); auto* cfd = versions_->GetColumnFamilySet()->GetColumnFamily(column_family_name); assert(cfd != nullptr); @@ -2148,7 +2149,6 @@ Status DBImpl::CreateColumnFamilyImpl(const ColumnFamilyOptions& cf_options, ROCKS_LOG_INFO(immutable_db_options_.info_log, "Created column family [%s] (ID %u)", column_family_name.c_str(), (unsigned)cfd->GetID()); - single_column_family_mode_.store(false, std::memory_order_release); } else { ROCKS_LOG_ERROR(immutable_db_options_.info_log, "Creating column family [%s] FAILED -- %s", diff --git a/db/db_impl/db_impl_compaction_flush.cc b/db/db_impl/db_impl_compaction_flush.cc index 8c5bd6a36fd..81253997326 100644 --- a/db/db_impl/db_impl_compaction_flush.cc +++ b/db/db_impl/db_impl_compaction_flush.cc @@ -173,7 +173,10 @@ Status DBImpl::FlushMemTableToOutputFile( // the host crashes after flushing and before WAL is persistent, the // flushed SST may contain data from write batches whose updates to // other column families are missing. - // SyncClosedLogs() may unlock and re-lock the db_mutex. + // We must release mutex_ before calling `SyncClosedLogs` because it + // may be blocked waiting other thread to complete the operation of + // synchronizing log file. + // SyncClosedLogs() may unlock and re-lock the log_write_mutex. mutex_.Unlock(); s = SyncClosedLogs(job_context); mutex_.Lock(); diff --git a/db/db_impl/db_impl_write.cc b/db/db_impl/db_impl_write.cc index d72c732d5f8..f9c583d147c 100644 --- a/db/db_impl/db_impl_write.cc +++ b/db/db_impl/db_impl_write.cc @@ -1191,7 +1191,7 @@ Status DBImpl::WriteToWAL(const WriteThread::WriteGroup& write_group, if (status.ok() && need_log_sync) { StopWatch sw(env_, stats_, WAL_FILE_SYNC_MICROS); - // It's safe to access logs_ with unlocked mutex_ here because: + // It's safe to access logs_ with unlocked log_write_mutex_ here because: // - we've set getting_synced=true for all logs, // so other threads won't pop from logs_ while we're here, // - only writer thread can push to logs_, and we're in From 5a171786185930f9c35df98b0281a4d946238899 Mon Sep 17 00:00:00 2001 From: Little-Wallace Date: Thu, 24 Feb 2022 13:53:06 +0800 Subject: [PATCH 10/11] release mutex before find obsolete log files Signed-off-by: Little-Wallace --- db/db_impl/db_impl.h | 2 ++ db/db_impl/db_impl_files.cc | 21 +++++++++++++-------- 2 files changed, 15 insertions(+), 8 deletions(-) diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index 37d1cc8bdf5..80733af7708 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -553,6 +553,8 @@ class DBImpl : public DB { void FindObsoleteFiles(JobContext* job_context, bool force, bool no_full_scan = false); + void FindObsoleteLogFiles(JobContext* job_context); + // Diffs the files listed in filenames and those that do not // belong to live files are possibly removed. Also, removes all the // files in sst_delete_files and log_delete_files. diff --git a/db/db_impl/db_impl_files.cc b/db/db_impl/db_impl_files.cc index 15494c248f1..60c59ed28c3 100644 --- a/db/db_impl/db_impl_files.cc +++ b/db/db_impl/db_impl_files.cc @@ -177,9 +177,20 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force, } } } + mutex_.Unlock(); + FindObsoleteLogFiles(job_context); + mutex_.Lock(); + if (job_context->HaveSomethingToDelete()) { + ++pending_purge_obsolete_files_; + if (doing_the_full_scan) { + versions_->AddLiveFiles(&job_context->sst_live); + } + } +} - // logs_ is empty when called during recovery, in which case there can't yet - // be any tracked obsolete logs +void DBImpl::FindObsoleteLogFiles(JobContext* job_context) { + // logs_ is empty when called during recovery, in which case there can't + // yet be any tracked obsolete logs InstrumentedMutexLock l(&log_write_mutex_); if (!alive_log_files_.empty() && !logs_.empty()) { uint64_t min_log_number = job_context->log_number; @@ -227,12 +238,6 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force, job_context->logs_to_free = logs_to_free_; job_context->log_recycle_files.assign(log_recycle_files_.begin(), log_recycle_files_.end()); - if (job_context->HaveSomethingToDelete()) { - ++pending_purge_obsolete_files_; - if (doing_the_full_scan) { - versions_->AddLiveFiles(&job_context->sst_live); - } - } logs_to_free_.clear(); } From bc172cbd7983e7dba13f431c37cafbdd079177f5 Mon Sep 17 00:00:00 2001 From: Little-Wallace Date: Sat, 26 Feb 2022 15:24:45 +0800 Subject: [PATCH 11/11] address comment Signed-off-by: Little-Wallace --- db/db_impl/db_impl.h | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index 80733af7708..f1483e37449 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -1619,12 +1619,11 @@ class DBImpl : public DB { // Lock over the persistent DB state. Non-nullptr iff successfully acquired. FileLock* db_lock_; - // In addition to mutex_, log_write_mutex_ protected writes to stats_history_ + // In addition to mutex_, stats_history_mutex_ protected writes to stats_history_ InstrumentedMutex stats_history_mutex_; - // In addition to mutex_, log_write_mutex_ protected writes to logs_ and - // logfile_number_. With two_write_queues it also protects alive_log_files_, - // and log_empty_. Refer to the definition of each variable below for more - // details. + // In addition to mutex_, log_write_mutex_ protected access to logs_, + // logfile_number_, alive_log_files_ and log_empty_. + // Refer to the definition of each variable below for more details. // Note: to avoid dealock, if needed to acquire both log_write_mutex_ and // mutex_, the order should be first mutex_ and then log_write_mutex_. InstrumentedMutex log_write_mutex_;