Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat:zpopmax && zpopmin to do cache #2922

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 15 additions & 9 deletions include/pika_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -149,29 +149,35 @@ class PikaCache : public pstd::noncopyable, public std::enable_shared_from_this<
rocksdb::Status ZCard(std::string& key, uint32_t* len, const std::shared_ptr<DB>& db);
rocksdb::Status ZCount(std::string& key, std::string& min, std::string& max, uint64_t* len, ZCountCmd* cmd);
rocksdb::Status ZIncrby(std::string& key, std::string& member, double increment);
rocksdb::Status ZIncrbyIfKeyExist(std::string& key, std::string& member, double increment, ZIncrbyCmd* cmd, const std::shared_ptr<DB>& db);
rocksdb::Status ZRange(std::string& key, int64_t start, int64_t stop, std::vector<storage::ScoreMember>* score_members,
const std::shared_ptr<DB>& db);
rocksdb::Status ZIncrbyIfKeyExist(std::string& key, std::string& member, double increment, ZIncrbyCmd* cmd,
const std::shared_ptr<DB>& db);
rocksdb::Status ZRange(std::string& key, int64_t start, int64_t stop,
std::vector<storage::ScoreMember>* score_members, const std::shared_ptr<DB>& db);
rocksdb::Status ZRangebyscore(std::string& key, std::string& min, std::string& max,
std::vector<storage::ScoreMember>* score_members, ZRangebyscoreCmd* cmd);
rocksdb::Status ZRank(std::string& key, std::string& member, int64_t* rank, const std::shared_ptr<DB>& db);
rocksdb::Status ZRem(std::string& key, std::vector<std::string>& members, std::shared_ptr<DB> db);
chejinge marked this conversation as resolved.
Show resolved Hide resolved
rocksdb::Status ZRemrangebyrank(std::string& key, std::string& min, std::string& max, int32_t ele_deleted = 0,
const std::shared_ptr<DB>& db = nullptr);
rocksdb::Status ZRemrangebyscore(std::string& key, std::string& min, std::string& max, const std::shared_ptr<DB>& db);
rocksdb::Status ZRevrange(std::string& key, int64_t start, int64_t stop, std::vector<storage::ScoreMember>* score_members,
const std::shared_ptr<DB>& db);
rocksdb::Status ZRevrange(std::string& key, int64_t start, int64_t stop,
std::vector<storage::ScoreMember>* score_members, const std::shared_ptr<DB>& db);
rocksdb::Status ZRevrangebyscore(std::string& key, std::string& min, std::string& max,
std::vector<storage::ScoreMember>* score_members, ZRevrangebyscoreCmd* cmd,
const std::shared_ptr<DB>& db);
rocksdb::Status ZRevrangebylex(std::string& key, std::string& min, std::string& max, std::vector<std::string>* members,
const std::shared_ptr<DB>& db);
rocksdb::Status ZRevrank(std::string& key, std::string& member, int64_t *rank, const std::shared_ptr<DB>& db);
rocksdb::Status ZRevrangebylex(std::string& key, std::string& min, std::string& max,
std::vector<std::string>* members, const std::shared_ptr<DB>& db);
rocksdb::Status ZRevrank(std::string& key, std::string& member, int64_t* rank, const std::shared_ptr<DB>& db);
rocksdb::Status ZScore(std::string& key, std::string& member, double* score, const std::shared_ptr<DB>& db);
rocksdb::Status ZRangebylex(std::string& key, std::string& min, std::string& max, std::vector<std::string>* members, const std::shared_ptr<DB>& db);
rocksdb::Status ZRangebylex(std::string& key, std::string& min, std::string& max, std::vector<std::string>* members,
const std::shared_ptr<DB>& db);
rocksdb::Status ZLexcount(std::string& key, std::string& min, std::string& max, uint64_t* len,
const std::shared_ptr<DB>& db);
rocksdb::Status ZRemrangebylex(std::string& key, std::string& min, std::string& max, const std::shared_ptr<DB>& db);
rocksdb::Status ZPopMin(std::string& key, int64_t count, std::vector<storage::ScoreMember>* score_members,
const std::shared_ptr<DB>& db);
rocksdb::Status ZPopMax(std::string& key, int64_t count, std::vector<storage::ScoreMember>* score_members,
const std::shared_ptr<DB>& db);

// Bit Commands
rocksdb::Status SetBit(std::string& key, size_t offset, int64_t value);
Expand Down
4 changes: 4 additions & 0 deletions include/pika_zset.h
Original file line number Diff line number Diff line change
Expand Up @@ -603,6 +603,8 @@ class ZPopmaxCmd : public Cmd {
void Do() override;
void Split(const HintKeys& hint_keys) override {};
void Merge() override {};
void DoThroughDB() override;
void DoUpdateCache() override;
Cmd* Clone() override { return new ZPopmaxCmd(*this); }

private:
Expand All @@ -623,6 +625,8 @@ class ZPopminCmd : public Cmd {
void Do() override;
void Split(const HintKeys& hint_keys) override {};
void Merge() override {};
void DoThroughDB() override;
void DoUpdateCache() override;
Cmd* Clone() override { return new ZPopminCmd(*this); }

private:
Expand Down
2 changes: 2 additions & 0 deletions src/cache/include/cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,8 @@ class RedisCache {
std::vector<std::string> *members);
Status ZLexcount(std::string& key, std::string &min, std::string &max, uint64_t *len);
Status ZRemrangebylex(std::string& key, std::string &min, std::string &max);
Status ZPopMin(std::string& key, int64_t count, std::vector<storage::ScoreMember>* score_members);
Status ZPopMax(std::string& key, int64_t count, std::vector<storage::ScoreMember>* score_members);

// Bit Commands
Status SetBit(std::string& key, size_t offset, int64_t value);
Expand Down
69 changes: 69 additions & 0 deletions src/cache/src/zset.cc
Original file line number Diff line number Diff line change
Expand Up @@ -405,5 +405,74 @@ Status RedisCache::ZRemrangebylex(std::string& key, std::string &min, std::strin
return Status::OK();
}

Status RedisCache::ZPopMin(std::string &key, int64_t count, std::vector<storage::ScoreMember> *score_members) {
zitem *items = nullptr;
unsigned long items_size = 0;
robj *kobj = createObject(OBJ_STRING, sdsnewlen(key.data(), key.size()));
DEFER { DecrObjectsRefCount(kobj); };

int ret = RcZrange(cache_, kobj, 0, -1, &items, &items_size);
chejinge marked this conversation as resolved.
Show resolved Hide resolved
if (C_OK != ret) {
if (REDIS_KEY_NOT_EXIST == ret) {
return Status::NotFound("key not in cache");
}
return Status::Corruption("RcZrange failed");
}

unsigned long to_return = std::min(static_cast<unsigned long>(count), items_size);
for (unsigned long i = 0; i < to_return; ++i) {
storage::ScoreMember sm;
sm.score = items[i].score;
sm.member.assign(items[i].member, sdslen(items[i].member));
score_members->push_back(sm);
}

robj **members_obj = (robj **)zcallocate(sizeof(robj *) * items_size);
for (unsigned long i = 0; i < items_size; ++i) {
members_obj[i] = createObject(OBJ_STRING, sdsnewlen(items[i].member, sdslen(items[i].member)));
}
Comment on lines +430 to +433
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Optimize memory allocation for members_obj.

members_obj is allocated with items_size elements, but only to_return elements are used.

Allocate only the necessary memory:

-robj **members_obj = (robj **)zcallocate(sizeof(robj *) * items_size);
-for (unsigned long i = 0; i < items_size; ++i) {
+robj **members_obj = (robj **)zcallocate(sizeof(robj *) * to_return);
+for (unsigned long i = 0; i < to_return; ++i) {
   members_obj[i] = createObject(OBJ_STRING, sdsnewlen(items[i].member, sdslen(items[i].member)));
 }

This change reduces memory usage and improves efficiency.

Committable suggestion was skipped due to low confidence.

DEFER { FreeObjectList(members_obj, items_size); };

RcZRem(cache_, kobj, members_obj, to_return);
Mixficsol marked this conversation as resolved.
Show resolved Hide resolved

FreeZitemList(items, items_size);
return Status::OK();
}

Status RedisCache::ZPopMax(std::string &key, int64_t count, std::vector<storage::ScoreMember> *score_members) {
zitem *items = nullptr;
unsigned long items_size = 0;
robj *kobj = createObject(OBJ_STRING, sdsnewlen(key.data(), key.size()));
DEFER { DecrObjectsRefCount(kobj); };

int ret = RcZrange(cache_, kobj, 0, -1, &items, &items_size);
Mixficsol marked this conversation as resolved.
Show resolved Hide resolved
if (C_OK != ret) {
if (REDIS_KEY_NOT_EXIST == ret) {
return Status::NotFound("key not in cache");
}
return Status::Corruption("RcZrange failed");
}

unsigned long to_return = std::min(static_cast<unsigned long>(count), items_size);
for (unsigned long i = items_size - to_return; i < items_size; ++i) {
storage::ScoreMember sm;
sm.score = items[i].score;
sm.member.assign(items[i].member, sdslen(items[i].member));
score_members->push_back(sm);
}

robj **members_obj = (robj **)zcallocate(sizeof(robj *) * items_size);
for (unsigned long i = items_size - 1; i >= 0; --i) {
members_obj[items_size - 1 - i] = createObject(OBJ_STRING, sdsnewlen(items[i].member, sdslen(items[i].member)));
}
Mixficsol marked this conversation as resolved.
Show resolved Hide resolved
Mixficsol marked this conversation as resolved.
Show resolved Hide resolved

Mixficsol marked this conversation as resolved.
Show resolved Hide resolved
DEFER { FreeObjectList(members_obj, items_size); };

RcZRem(cache_, kobj, members_obj, to_return);
Mixficsol marked this conversation as resolved.
Show resolved Hide resolved

Mixficsol marked this conversation as resolved.
Show resolved Hide resolved
FreeZitemList(items, items_size);
return Status::OK();
}
Mixficsol marked this conversation as resolved.
Show resolved Hide resolved

} // namespace cache
/* EOF */
30 changes: 30 additions & 0 deletions src/pika_cache.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1465,6 +1465,36 @@ Status PikaCache::ZRemrangebylex(std::string& key, std::string &min, std::string
}
}

Status PikaCache::ZPopMin(std::string &key, int64_t count, std::vector<storage::ScoreMember> *score_members,
const std::shared_ptr<DB> &db) {
int cache_index = CacheIndex(key);
std::lock_guard lm(*cache_mutexs_[cache_index]);

auto cache_obj = caches_[cache_index];
Status s;

if (cache_obj->Exists(key)) {
return cache_obj->ZPopMin(key, count, score_members);
} else {
return Status::NotFound("key not in cache");
}
}

Status PikaCache::ZPopMax(std::string &key, int64_t count, std::vector<storage::ScoreMember> *score_members,
const std::shared_ptr<DB> &db) {
int cache_index = CacheIndex(key);
std::lock_guard lm(*cache_mutexs_[cache_index]);

auto cache_obj = caches_[cache_index];
Status s;

if (cache_obj->Exists(key)) {
return cache_obj->ZPopMax(key, count, score_members);
} else {
return Status::NotFound("key not in cache");
}
}

/*-----------------------------------------------------------------------------
* Bit Commands
*----------------------------------------------------------------------------*/
Expand Down
4 changes: 2 additions & 2 deletions src/pika_command.cc
Original file line number Diff line number Diff line change
Expand Up @@ -600,11 +600,11 @@ void InitCmdTable(CmdTable* cmd_table) {
cmd_table->insert(std::pair<std::string, std::unique_ptr<Cmd>>(kCmdNameZRemrangebylex, std::move(zremrangebylexptr)));
////ZPopmax
std::unique_ptr<Cmd> zpopmaxptr = std::make_unique<ZPopmaxCmd>(
kCmdNameZPopmax, -2, kCmdFlagsWrite | kCmdFlagsZset | kCmdFlagsFast);
kCmdNameZPopmax, -2, kCmdFlagsWrite | kCmdFlagsZset | kCmdFlagsFast | kCmdFlagsDoThroughDB | kCmdFlagsUpdateCache);
cmd_table->insert(std::pair<std::string, std::unique_ptr<Cmd>>(kCmdNameZPopmax, std::move(zpopmaxptr)));
////ZPopmin
std::unique_ptr<Cmd> zpopminptr = std::make_unique<ZPopminCmd>(
kCmdNameZPopmin, -2, kCmdFlagsWrite | kCmdFlagsZset | kCmdFlagsFast);
kCmdNameZPopmin, -2, kCmdFlagsWrite | kCmdFlagsZset | kCmdFlagsFast | kCmdFlagsDoThroughDB | kCmdFlagsUpdateCache);
Comment on lines +603 to +607
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codebase verification

Inconsistency Found in Zset Commands Flag Usage

While ZPopmaxCmd and ZPopminCmd have been updated with the kCmdFlagsDoThroughDB and kCmdFlagsUpdateCache flags, several other Zset commands are missing these flags. This inconsistency could lead to unexpected behavior or performance issues.

Affected Files:

  • include/pika_zset.h:
    • ZAddCmd
    • ZCardCmd
    • ZIncrbyCmd
    • ZRangeCmd
    • ZRevrangeCmd
    • ZCountCmd
    • ZRemCmd
    • ZUnionstoreCmd
    • ZInterstoreCmd
    • ZRankCmd
    • ZRevrankCmd
    • ZScoreCmd
    • ZRangebylexCmd
    • ZLexcountCmd
    • ZRemrangebyrankCmd
    • ZRemrangebyscoreCmd
    • ZRemrangebylexCmd
    • ZScanCmd
    • ZRangebyscoreCmd
    • ZRevrangebyscoreCmd

Please update these commands to include the kCmdFlagsDoThroughDB and kCmdFlagsUpdateCache flags to ensure consistent behavior across all Zset operations.

🔗 Analysis chain

Summary: Effective implementation of cache functionality for zpopmax and zpopmin

The changes to both ZPopmaxCmd and ZPopminCmd constructors successfully implement cache interaction for these commands. By adding the kCmdFlagsDoThroughDB and kCmdFlagsUpdateCache flags, the modifications ensure that:

  1. Both commands will interact directly with the database, maintaining data consistency.
  2. The cache will be updated after database operations, keeping it in sync.

These changes are consistent across both commands and align well with the PR objective. The implementation should improve performance while maintaining data integrity for sorted set operations.

Consider adding unit tests to verify the new cache interaction behavior for both commands.

To ensure the changes are properly integrated, run the following verification script:

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash

# Check if the new flags are used consistently across the codebase
echo "Checking usage of new flags:"
rg "kCmdFlagsDoThroughDB|kCmdFlagsUpdateCache" --type cpp

# Verify if there are any other Zset commands that might need similar updates
echo "Checking other Zset commands:"
rg "std::make_unique<Z\w+Cmd>" --type cpp

Length of output: 19379

cmd_table->insert(std::pair<std::string, std::unique_ptr<Cmd>>(kCmdNameZPopmin, std::move(zpopminptr)));

// Set
Expand Down
22 changes: 22 additions & 0 deletions src/pika_zset.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1507,6 +1507,17 @@ void ZPopmaxCmd::Do() {
}
}

void ZPopmaxCmd::DoThroughDB() {
Do();
}

void ZPopmaxCmd::DoUpdateCache() {
std::vector<storage::ScoreMember> score_members;
if (s_.ok() || s_.IsNotFound()) {
db_->cache()->ZPopMax(key_, count_, &score_members, db_);
}
}

void ZPopminCmd::DoInitial() {
if (!CheckArg(argv_.size())) {
res_.SetRes(CmdRes::kWrongNum, kCmdNameZPopmin);
Expand All @@ -1523,6 +1534,17 @@ void ZPopminCmd::DoInitial() {
}
}

void ZPopminCmd::DoThroughDB() {
Do();
}

void ZPopminCmd::DoUpdateCache() {
std::vector<storage::ScoreMember> score_members;
if (s_.ok() || s_.IsNotFound()) {
db_->cache()->ZPopMin(key_, count_, &score_members, db_);
}
}
Comment on lines +1541 to +1546
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Consider adding error handling, validating count_, and reusing code.

The implementation looks good overall, but there are several points to consider:

  1. The method doesn't handle the case where s_ is not ok and not "NotFound". This might lead to unexpected behavior if there's an error.

  2. The count_ variable is used without checking its value. If it's negative or zero, it might lead to unexpected behavior.

  3. This method is very similar to ZPopmaxCmd::DoUpdateCache, suggesting a potential for code reuse.

Consider the following improvements:

  1. Add error handling and validate count_:
 void ZPopminCmd::DoUpdateCache() {
   std::vector<storage::ScoreMember> score_members;
-  if (s_.ok() || s_.IsNotFound()) {
+  if (s_.ok()) {
+    if (count_ > 0) {
       db_->cache()->ZPopMin(key_, count_, &score_members, db_);
+    }
+  } else if (!s_.IsNotFound()) {
+    // Log error or handle it appropriately
   }
 }
  1. To reduce code duplication, consider creating a template method in a base class that both ZPopmaxCmd and ZPopminCmd can inherit from. This would allow you to reuse the same code for both ZPopMax and ZPopMin operations.

Committable suggestion was skipped due to low confidence.


void ZPopminCmd::Do() {
std::vector<storage::ScoreMember> score_members;
rocksdb::Status s = db_->storage()->ZPopMin(key_, count_, &score_members);
Expand Down
32 changes: 32 additions & 0 deletions tests/integration/zset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1092,6 +1092,38 @@ var _ = Describe("Zset Commands", func() {
Expect(err).To(MatchError(ContainSubstring("ERR wrong number of arguments for 'zpopmin' command")))
})

It("should Zpopmin test", func() {
err := client.ZAdd(ctx, "zpopzset1", redis.Z{
Score: 1,
Member: "m1",
}).Err()
Expect(err).NotTo(HaveOccurred())

err = client.ZAdd(ctx, "zpopzset1", redis.Z{
Score: 3,
Member: "m3",
}).Err()
Expect(err).NotTo(HaveOccurred())

err = client.ZAdd(ctx, "zpopzset1", redis.Z{
Score: 4,
Member: "m4",
}).Err()
Expect(err).NotTo(HaveOccurred())

max, err := client.ZPopMax(ctx, "zpopzset1", 1).Result()
Expect(err).NotTo(HaveOccurred())
Expect(max).To(Equal([]redis.Z{{Score: 4, Member: "m4"}}))

min, err := client.ZPopMin(ctx, "zpopzset1", 1).Result()
Expect(err).NotTo(HaveOccurred())
Expect(min).To(Equal([]redis.Z{{Score: 1, Member: "m1"}}))

rangeResult, err := client.ZRange(ctx, "zpopzset1", 0, -1).Result()
Expect(err).NotTo(HaveOccurred())
Expect(rangeResult).To(Equal([]string{"m3"}))
})

It("should ZRange", func() {
err := client.ZAdd(ctx, "zset", redis.Z{Score: 1, Member: "one"}).Err()
Expect(err).NotTo(HaveOccurred())
Expand Down
Loading