Skip to content

Commit

Permalink
feat:zpopmin && zpopmax to cache
Browse files Browse the repository at this point in the history
  • Loading branch information
brother-jin committed Oct 10, 2024
1 parent 8385dd4 commit 06aca57
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 40 deletions.
26 changes: 15 additions & 11 deletions include/pika_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -149,31 +149,35 @@ class PikaCache : public pstd::noncopyable, public std::enable_shared_from_this<
rocksdb::Status ZCard(std::string& key, uint32_t* len, const std::shared_ptr<DB>& db);
rocksdb::Status ZCount(std::string& key, std::string& min, std::string& max, uint64_t* len, ZCountCmd* cmd);
rocksdb::Status ZIncrby(std::string& key, std::string& member, double increment);
rocksdb::Status ZIncrbyIfKeyExist(std::string& key, std::string& member, double increment, ZIncrbyCmd* cmd, const std::shared_ptr<DB>& db);
rocksdb::Status ZRange(std::string& key, int64_t start, int64_t stop, std::vector<storage::ScoreMember>* score_members,
const std::shared_ptr<DB>& db);
rocksdb::Status ZIncrbyIfKeyExist(std::string& key, std::string& member, double increment, ZIncrbyCmd* cmd,
const std::shared_ptr<DB>& db);
rocksdb::Status ZRange(std::string& key, int64_t start, int64_t stop,
std::vector<storage::ScoreMember>* score_members, const std::shared_ptr<DB>& db);
rocksdb::Status ZRangebyscore(std::string& key, std::string& min, std::string& max,
std::vector<storage::ScoreMember>* score_members, ZRangebyscoreCmd* cmd);
rocksdb::Status ZRank(std::string& key, std::string& member, int64_t* rank, const std::shared_ptr<DB>& db);
rocksdb::Status ZRem(std::string& key, std::vector<std::string>& members, std::shared_ptr<DB> db);
rocksdb::Status ZRemrangebyrank(std::string& key, std::string& min, std::string& max, int32_t ele_deleted = 0,
const std::shared_ptr<DB>& db = nullptr);
rocksdb::Status ZRemrangebyscore(std::string& key, std::string& min, std::string& max, const std::shared_ptr<DB>& db);
rocksdb::Status ZRevrange(std::string& key, int64_t start, int64_t stop, std::vector<storage::ScoreMember>* score_members,
const std::shared_ptr<DB>& db);
rocksdb::Status ZRevrange(std::string& key, int64_t start, int64_t stop,
std::vector<storage::ScoreMember>* score_members, const std::shared_ptr<DB>& db);
rocksdb::Status ZRevrangebyscore(std::string& key, std::string& min, std::string& max,
std::vector<storage::ScoreMember>* score_members, ZRevrangebyscoreCmd* cmd,
const std::shared_ptr<DB>& db);
rocksdb::Status ZRevrangebylex(std::string& key, std::string& min, std::string& max, std::vector<std::string>* members,
const std::shared_ptr<DB>& db);
rocksdb::Status ZRevrank(std::string& key, std::string& member, int64_t *rank, const std::shared_ptr<DB>& db);
rocksdb::Status ZRevrangebylex(std::string& key, std::string& min, std::string& max,
std::vector<std::string>* members, const std::shared_ptr<DB>& db);
rocksdb::Status ZRevrank(std::string& key, std::string& member, int64_t* rank, const std::shared_ptr<DB>& db);
rocksdb::Status ZScore(std::string& key, std::string& member, double* score, const std::shared_ptr<DB>& db);
rocksdb::Status ZRangebylex(std::string& key, std::string& min, std::string& max, std::vector<std::string>* members, const std::shared_ptr<DB>& db);
rocksdb::Status ZRangebylex(std::string& key, std::string& min, std::string& max, std::vector<std::string>* members,
const std::shared_ptr<DB>& db);
rocksdb::Status ZLexcount(std::string& key, std::string& min, std::string& max, uint64_t* len,
const std::shared_ptr<DB>& db);
rocksdb::Status ZRemrangebylex(std::string& key, std::string& min, std::string& max, const std::shared_ptr<DB>& db);
rocksdb::Status ZPopMin(std::string& key, int64_t count, std::vector<storage::ScoreMember>* score_members, const std::shared_ptr<DB>& db);
rocksdb::Status ZPopMax(std::string& key, int64_t count, std::vector<storage::ScoreMember>* score_members, const std::shared_ptr<DB>& db);
rocksdb::Status ZPopMin(std::string& key, int64_t count, std::vector<storage::ScoreMember>* score_members,
const std::shared_ptr<DB>& db);
rocksdb::Status ZPopMax(std::string& key, int64_t count, std::vector<storage::ScoreMember>* score_members,
const std::shared_ptr<DB>& db);

// Bit Commands
rocksdb::Status SetBit(std::string& key, size_t offset, int64_t value);
Expand Down
33 changes: 12 additions & 21 deletions src/cache/src/zset.cc
Original file line number Diff line number Diff line change
Expand Up @@ -405,13 +405,11 @@ Status RedisCache::ZRemrangebylex(std::string& key, std::string &min, std::strin
return Status::OK();
}

Status RedisCache::ZPopMin(std::string& key, int64_t count, std::vector<storage::ScoreMember>* score_members) {
zitem* items = nullptr;
Status RedisCache::ZPopMin(std::string &key, int64_t count, std::vector<storage::ScoreMember> *score_members) {
zitem *items = nullptr;
unsigned long items_size = 0;
robj* kobj = createObject(OBJ_STRING, sdsnewlen(key.data(), key.size()));
DEFER {
DecrObjectsRefCount(kobj);
};
robj *kobj = createObject(OBJ_STRING, sdsnewlen(key.data(), key.size()));
DEFER { DecrObjectsRefCount(kobj); };

int ret = RcZrange(cache_, kobj, 0, -1, &items, &items_size);
if (C_OK != ret) {
Expand All @@ -429,27 +427,23 @@ Status RedisCache::ZPopMin(std::string& key, int64_t count, std::vector<storage:
score_members->push_back(sm);
}

robj** members_obj = (robj**)zcallocate(sizeof(robj*) * items_size);
robj **members_obj = (robj **)zcallocate(sizeof(robj *) * items_size);
for (unsigned long i = 0; i < items_size; ++i) {
members_obj[i] = createObject(OBJ_STRING, sdsnewlen(items[i].member, sdslen(items[i].member)));
}
DEFER {
FreeObjectList(members_obj, items_size);
};
DEFER { FreeObjectList(members_obj, items_size); };

RcZRem(cache_, kobj, members_obj, to_return);

FreeZitemList(items, items_size);
return Status::OK();
}

Status RedisCache::ZPopMax(std::string& key, int64_t count, std::vector<storage::ScoreMember>* score_members) {
zitem* items = nullptr;
Status RedisCache::ZPopMax(std::string &key, int64_t count, std::vector<storage::ScoreMember> *score_members) {
zitem *items = nullptr;
unsigned long items_size = 0;
robj* kobj = createObject(OBJ_STRING, sdsnewlen(key.data(), key.size()));
DEFER {
DecrObjectsRefCount(kobj);
};
robj *kobj = createObject(OBJ_STRING, sdsnewlen(key.data(), key.size()));
DEFER { DecrObjectsRefCount(kobj); };

int ret = RcZrange(cache_, kobj, 0, -1, &items, &items_size);
if (C_OK != ret) {
Expand All @@ -467,21 +461,18 @@ Status RedisCache::ZPopMax(std::string& key, int64_t count, std::vector<storage:
score_members->push_back(sm);
}

robj** members_obj = (robj**)zcallocate(sizeof(robj*) * items_size);
robj **members_obj = (robj **)zcallocate(sizeof(robj *) * items_size);
for (unsigned long i = items_size - 1; i >= 0; --i) {
members_obj[items_size - 1 - i] = createObject(OBJ_STRING, sdsnewlen(items[i].member, sdslen(items[i].member)));
}

DEFER {
FreeObjectList(members_obj, items_size);
};
DEFER { FreeObjectList(members_obj, items_size); };

RcZRem(cache_, kobj, members_obj, to_return);

FreeZitemList(items, items_size);
return Status::OK();
}


} // namespace cache
/* EOF */
6 changes: 4 additions & 2 deletions src/pika_cache.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1465,7 +1465,8 @@ Status PikaCache::ZRemrangebylex(std::string& key, std::string &min, std::string
}
}

Status PikaCache::ZPopMin(std::string& key, int64_t count, std::vector<storage::ScoreMember>* score_members, const std::shared_ptr<DB>& db) {
Status PikaCache::ZPopMin(std::string &key, int64_t count, std::vector<storage::ScoreMember> *score_members,
const std::shared_ptr<DB> &db) {
int cache_index = CacheIndex(key);
std::lock_guard lm(*cache_mutexs_[cache_index]);

Expand All @@ -1479,7 +1480,8 @@ Status PikaCache::ZPopMin(std::string& key, int64_t count, std::vector<storage::
}
}

Status PikaCache::ZPopMax(std::string& key, int64_t count, std::vector<storage::ScoreMember>* score_members, const std::shared_ptr<DB>& db) {
Status PikaCache::ZPopMax(std::string &key, int64_t count, std::vector<storage::ScoreMember> *score_members,
const std::shared_ptr<DB> &db) {
int cache_index = CacheIndex(key);
std::lock_guard lm(*cache_mutexs_[cache_index]);

Expand Down
12 changes: 6 additions & 6 deletions src/pika_zset.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1507,13 +1507,13 @@ void ZPopmaxCmd::Do() {
}
}

void ZPopmaxCmd::DoThroughDB(){
void ZPopmaxCmd::DoThroughDB() {
Do();
}

void ZPopmaxCmd::DoUpdateCache(){
void ZPopmaxCmd::DoUpdateCache() {
std::vector<storage::ScoreMember> score_members;
if(s_.ok() || s_.IsNotFound()){
if (s_.ok() || s_.IsNotFound()) {
db_->cache()->ZPopMax(key_, count_, &score_members, db_);
}
}
Expand All @@ -1534,13 +1534,13 @@ void ZPopminCmd::DoInitial() {
}
}

void ZPopminCmd::DoThroughDB(){
void ZPopminCmd::DoThroughDB() {
Do();
}

void ZPopminCmd::DoUpdateCache(){
void ZPopminCmd::DoUpdateCache() {
std::vector<storage::ScoreMember> score_members;
if(s_.ok() || s_.IsNotFound()){
if (s_.ok() || s_.IsNotFound()) {
db_->cache()->ZPopMin(key_, count_, &score_members, db_);
}
}
Expand Down

0 comments on commit 06aca57

Please sign in to comment.