diff --git a/src/db.cc b/src/db.cc index 5f1cbbbca..ac6974b5f 100644 --- a/src/db.cc +++ b/src/db.cc @@ -6,6 +6,7 @@ */ #include "db.h" +#include #include "config.h" #include "praft/praft.h" @@ -18,6 +19,8 @@ namespace pikiwidb { DB::DB(int db_index, const std::string& db_path) : db_index_(db_index), db_path_(db_path + std::to_string(db_index_) + '/') {} +DB::~DB() { INFO("DB{} is closing...", db_index_); } + rocksdb::Status DB::Open() { storage::StorageOptions storage_options; storage_options.options = g_config.GetRocksDBOptions(); @@ -41,6 +44,11 @@ rocksdb::Status DB::Open() { storage_options.db_instance_num = g_config.db_instance_num.load(); storage_options.db_id = db_index_; + std::unique_ptr old_storage = std::move(storage_); + if (old_storage != nullptr) { + old_storage->Close(); + old_storage.reset(); + } storage_ = std::make_unique(); if (auto s = storage_->Open(storage_options, db_path_); !s.ok()) { diff --git a/src/db.h b/src/db.h index da4cd8f47..c7508273e 100644 --- a/src/db.h +++ b/src/db.h @@ -19,6 +19,7 @@ namespace pikiwidb { class DB { public: DB(int db_index, const std::string& db_path); + ~DB(); rocksdb::Status Open(); diff --git a/src/storage/include/storage/storage.h b/src/storage/include/storage/storage.h index 72ad8ea94..0cfc936d5 100644 --- a/src/storage/include/storage/storage.h +++ b/src/storage/include/storage/storage.h @@ -186,6 +186,8 @@ class Storage { Status Open(const StorageOptions& storage_options, const std::string& db_path); + Status Close(); + std::vector> CreateCheckpoint(const std::string& checkpoint_path); Status CreateCheckpointInternal(const std::string& checkpoint_path, int db_index); diff --git a/src/storage/src/redis.cc b/src/storage/src/redis.cc index 6417f07bb..165c8ce54 100644 --- a/src/storage/src/redis.cc +++ b/src/storage/src/redis.cc @@ -5,10 +5,12 @@ #include +#include "pstd/log.h" #include "rocksdb/env.h" #include "src/base_filter.h" #include "src/lists_filter.h" +#include "src/mutex.h" #include "src/redis.h" #include "src/strings_filter.h" #include "src/zsets_filter.h" @@ -44,20 +46,26 @@ Redis::Redis(Storage* const s, int32_t index) handles_.clear(); } +// TODO(tangruilin): 确认 rocksdb 退出失败的原因, 完善 Background 的处理逻辑 Redis::~Redis() { - rocksdb::CancelAllBackgroundWork(db_, true); - std::vector tmp_handles = handles_; - handles_.clear(); - for (auto handle : tmp_handles) { - delete handle; - } - // delete env_; - delete db_; - - if (default_compact_range_options_.canceled) { - delete default_compact_range_options_.canceled; + INFO("Redis is freeing...", index_); + if (need_close_.load()) { + INFO("DB_{} is closing...", index_); + rocksdb::CancelAllBackgroundWork(db_, true); + std::vector tmp_handles = handles_; + handles_.clear(); + for (auto& handle : tmp_handles) { + delete handle; + } + // delete env_; + delete db_; + db_ = nullptr; + INFO("Close DB_{} success!", index_); } -} + delete default_compact_range_options_.canceled; + default_compact_range_options_.canceled = nullptr; + INFO("Reis DB_{} free success!", index_); +}; Status Redis::Open(const StorageOptions& storage_options, const std::string& db_path) { append_log_function_ = storage_options.append_log_function; diff --git a/src/storage/src/redis.h b/src/storage/src/redis.h index 24d16dd18..690d4110d 100644 --- a/src/storage/src/redis.h +++ b/src/storage/src/redis.h @@ -100,6 +100,8 @@ class Redis { // Common Commands Status Open(const StorageOptions& storage_options, const std::string& db_path); + void SetNeedClose(bool need_close) { need_close_.store(need_close); } + virtual Status CompactRange(const DataType& option_type, const rocksdb::Slice* begin, const rocksdb::Slice* end, const ColumnFamilyType& type = kMetaAndData); @@ -362,6 +364,7 @@ class Redis { private: int32_t index_ = 0; + std::atomic need_close_ = false; Storage* const storage_; std::shared_ptr lock_mgr_; rocksdb::DB* db_ = nullptr; diff --git a/src/storage/src/storage.cc b/src/storage/src/storage.cc index 776ca09a7..e8e1d2386 100644 --- a/src/storage/src/storage.cc +++ b/src/storage/src/storage.cc @@ -71,9 +71,11 @@ Storage::Storage() { } Storage::~Storage() { + INFO("Storage::~Storage() begin to close storage!"); bg_tasks_should_exit_.store(true); bg_tasks_cond_var_.notify_one(); if (is_opened_.load()) { + INFO("Storage::~Storage() begin to close all instances!"); int ret = 0; if (ret = pthread_join(bg_tasks_thread_id_, nullptr); ret != 0) { ERROR("pthread_join failed with bgtask thread error : {}", ret); @@ -82,6 +84,17 @@ Storage::~Storage() { } } +Status Storage::Close() { + if (!is_opened_.load()) { + return Status::OK(); + } + is_opened_.store(false); + for (auto& inst : insts_) { + inst->SetNeedClose(true); + } + return Status::OK(); +} + static std::string AppendSubDirectory(const std::string& db_path, int index) { if (db_path.back() == '/') { return db_path + std::to_string(index); diff --git a/src/store.cc b/src/store.cc index 835823284..24a0f2ba1 100644 --- a/src/store.cc +++ b/src/store.cc @@ -17,6 +17,8 @@ namespace pikiwidb { +PStore::~PStore() { INFO("STORE is closing..."); } + PStore& PStore::Instance() { static PStore store; return store; diff --git a/src/store.h b/src/store.h index 8e8590adb..e7daf3d46 100644 --- a/src/store.h +++ b/src/store.h @@ -45,6 +45,7 @@ class PStore { PStore(const PStore&) = delete; void operator=(const PStore&) = delete; + ~PStore(); void Init(int db_number);