Skip to content

Commit

Permalink
fix: fix pthread_lock when exiting and add some logs for exit
Browse files Browse the repository at this point in the history
Signed-off-by: tangruilin <[email protected]>
  • Loading branch information
Tangruilin committed May 27, 2024
1 parent 128d408 commit cb647d3
Show file tree
Hide file tree
Showing 8 changed files with 50 additions and 12 deletions.
8 changes: 8 additions & 0 deletions src/db.cc
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
*/

#include "db.h"
#include <algorithm>

#include "config.h"
#include "praft/praft.h"
Expand All @@ -18,6 +19,8 @@ namespace pikiwidb {
DB::DB(int db_index, const std::string& db_path)
: db_index_(db_index), db_path_(db_path + std::to_string(db_index_) + '/') {}

DB::~DB() { INFO("DB{} is closing...", db_index_); }

rocksdb::Status DB::Open() {
storage::StorageOptions storage_options;
storage_options.options = g_config.GetRocksDBOptions();
Expand All @@ -41,6 +44,11 @@ rocksdb::Status DB::Open() {
storage_options.db_instance_num = g_config.db_instance_num.load();
storage_options.db_id = db_index_;

std::unique_ptr<storage::Storage> old_storage = std::move(storage_);
if (old_storage != nullptr) {
old_storage->Close();
old_storage.reset();
}
storage_ = std::make_unique<storage::Storage>();

if (auto s = storage_->Open(storage_options, db_path_); !s.ok()) {
Expand Down
1 change: 1 addition & 0 deletions src/db.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ namespace pikiwidb {
class DB {
public:
DB(int db_index, const std::string& db_path);
~DB();

rocksdb::Status Open();

Expand Down
2 changes: 2 additions & 0 deletions src/storage/include/storage/storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,8 @@ class Storage {

Status Open(const StorageOptions& storage_options, const std::string& db_path);

Status Close();

std::vector<std::future<Status>> CreateCheckpoint(const std::string& checkpoint_path);

Status CreateCheckpointInternal(const std::string& checkpoint_path, int db_index);
Expand Down
32 changes: 20 additions & 12 deletions src/storage/src/redis.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@

#include <sstream>

#include "pstd/log.h"
#include "rocksdb/env.h"

#include "src/base_filter.h"
#include "src/lists_filter.h"
#include "src/mutex.h"
#include "src/redis.h"
#include "src/strings_filter.h"
#include "src/zsets_filter.h"
Expand Down Expand Up @@ -44,20 +46,26 @@ Redis::Redis(Storage* const s, int32_t index)
handles_.clear();
}

// TODO(tangruilin): 确认 rocksdb 退出失败的原因, 完善 Background 的处理逻辑
Redis::~Redis() {
rocksdb::CancelAllBackgroundWork(db_, true);
std::vector<rocksdb::ColumnFamilyHandle*> tmp_handles = handles_;
handles_.clear();
for (auto handle : tmp_handles) {
delete handle;
}
// delete env_;
delete db_;

if (default_compact_range_options_.canceled) {
delete default_compact_range_options_.canceled;
INFO("Redis is freeing...", index_);
if (need_close_.load()) {
INFO("DB_{} is closing...", index_);
rocksdb::CancelAllBackgroundWork(db_, true);
std::vector<rocksdb::ColumnFamilyHandle*> tmp_handles = handles_;
handles_.clear();
for (auto& handle : tmp_handles) {
delete handle;
}
// delete env_;
delete db_;
db_ = nullptr;
INFO("Close DB_{} success!", index_);
}
}
delete default_compact_range_options_.canceled;
default_compact_range_options_.canceled = nullptr;
INFO("Reis DB_{} free success!", index_);
};

Status Redis::Open(const StorageOptions& storage_options, const std::string& db_path) {
append_log_function_ = storage_options.append_log_function;
Expand Down
3 changes: 3 additions & 0 deletions src/storage/src/redis.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ class Redis {
// Common Commands
Status Open(const StorageOptions& storage_options, const std::string& db_path);

void SetNeedClose(bool need_close) { need_close_.store(need_close); }

virtual Status CompactRange(const DataType& option_type, const rocksdb::Slice* begin, const rocksdb::Slice* end,
const ColumnFamilyType& type = kMetaAndData);

Expand Down Expand Up @@ -362,6 +364,7 @@ class Redis {

private:
int32_t index_ = 0;
std::atomic<bool> need_close_ = false;
Storage* const storage_;
std::shared_ptr<LockMgr> lock_mgr_;
rocksdb::DB* db_ = nullptr;
Expand Down
13 changes: 13 additions & 0 deletions src/storage/src/storage.cc
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,11 @@ Storage::Storage() {
}

Storage::~Storage() {
INFO("Storage::~Storage() begin to close storage!");
bg_tasks_should_exit_.store(true);
bg_tasks_cond_var_.notify_one();
if (is_opened_.load()) {
INFO("Storage::~Storage() begin to close all instances!");
int ret = 0;
if (ret = pthread_join(bg_tasks_thread_id_, nullptr); ret != 0) {
ERROR("pthread_join failed with bgtask thread error : {}", ret);
Expand All @@ -82,6 +84,17 @@ Storage::~Storage() {
}
}

Status Storage::Close() {
if (!is_opened_.load()) {
return Status::OK();
}
is_opened_.store(false);
for (auto& inst : insts_) {
inst->SetNeedClose(true);
}
return Status::OK();
}

static std::string AppendSubDirectory(const std::string& db_path, int index) {
if (db_path.back() == '/') {
return db_path + std::to_string(index);
Expand Down
2 changes: 2 additions & 0 deletions src/store.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

namespace pikiwidb {

PStore::~PStore() { INFO("STORE is closing..."); }

PStore& PStore::Instance() {
static PStore store;
return store;
Expand Down
1 change: 1 addition & 0 deletions src/store.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ class PStore {

PStore(const PStore&) = delete;
void operator=(const PStore&) = delete;
~PStore();

void Init(int db_number);

Expand Down

0 comments on commit cb647d3

Please sign in to comment.