Skip to content

Commit

Permalink
Add scan to redis client
Browse files Browse the repository at this point in the history
  • Loading branch information
tmigimatsu committed May 21, 2020
1 parent aefefc1 commit b736d08
Showing 1 changed file with 94 additions and 7 deletions.
101 changes: 94 additions & 7 deletions include/ctrl_utils/redis_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,14 @@
#ifndef CTRL_UTILS_REDIS_CLIENT_H_
#define CTRL_UTILS_REDIS_CLIENT_H_

#include <exception> // std::exception
#include <future> // std::future, std::promise
#include <functional> // std::function
#include <string> // std::string
#include <sstream> // std::stringstream
#include <tuple> // std::tuple, std::get
#include <utility> // std::pair, std::integer_sequence
#include <exception> // std::exception
#include <functional> // std::function
#include <future> // std::future, std::promise
#include <sstream> // std::stringstream
#include <string> // std::string
#include <tuple> // std::tuple, std::get
#include <unordered_set> // std::unordered_set
#include <utility> // std::pair, std::integer_sequence

#include <cpp_redis/cpp_redis>

Expand Down Expand Up @@ -419,6 +420,13 @@ class RedisClient : public ::cpp_redis::client {
TSub sync_request(const std::string& key_pub, const TPub& value_pub,
const std::string& key_sub);

RedisClient& scan(const std::string& pattern,
std::function<void(std::unordered_set<std::string>&&)>&& callback);

std::future<std::unordered_set<std::string>> scan(const std::string& pattern);

std::unordered_set<std::string> sync_scan(const std::string& pattern);

private:

template<typename T>
Expand All @@ -437,6 +445,10 @@ class RedisClient : public ::cpp_redis::client {
std::vector<std::pair<std::string, std::string>>& key_valstr,
std::index_sequence<Is...>);

RedisClient& scan(size_t cursor, const std::string& pattern,
std::unordered_set<std::string>&& keys,
std::function<void(std::unordered_set<std::string>&&)>&& callback);

std::string host_;
std::size_t port_;

Expand Down Expand Up @@ -795,6 +807,81 @@ TSub RedisClient::sync_request(const std::string& key_pub, const TPub& value_pub
return fut_value.get();
}

inline RedisClient& RedisClient::scan(
size_t cursor,
const std::string& pattern,
std::unordered_set<std::string>&& keys,
std::function<void(std::unordered_set<std::string>&&)>&& callback) {
cpp_redis::client::scan(
cursor, pattern,
[this, pattern, keys = std::move(keys),
callback = std::move(callback)](cpp_redis::reply& reply) mutable {
// Parse reply array
if (!reply.is_array()) {
throw std::runtime_error("RedisClient::scan(): Invalid reply.");
}
const std::vector<cpp_redis::reply> replies = reply.as_array();

// Parse cursor, keys array
if (replies.size() != 2 || !replies[0].is_string() ||
!replies[1].is_array()) {
throw std::runtime_error("RedisClient::scan(): Invalid reply.");
}

// Parse cursor
if (replies.size() < 1 || !replies[0].is_string()) {
throw std::runtime_error("RedisClient::scan(): Invalid reply.");
}
size_t cursor_next = FromString<size_t>(replies[0].as_string());
if (cursor_next == 0) {
callback(std::move(keys));
return;
}

// Parse keys array
if (replies.size() < 2 || !replies[1].is_array()) {
throw std::runtime_error("RedisClient::scan(): Invalid reply.");
}
const std::vector<cpp_redis::reply> keys_batch = replies[1].as_array();

// Parse keys
for (const cpp_redis::reply& key : keys_batch) {
if (!key.is_string()) {
throw std::runtime_error("RedisClient::scan(): Invalid reply.");
}
keys.insert(key.as_string());
}

scan(cursor_next, pattern, std::move(keys), std::move(callback));
commit();
});
return *this;
}

inline RedisClient& RedisClient::scan(
const std::string& pattern,
std::function<void(std::unordered_set<std::string>&&)>&& callback) {
return scan(0, pattern, {}, std::move(callback));
}

inline std::future<std::unordered_set<std::string>> RedisClient::scan(
const std::string& pattern) {
auto promise = std::make_shared<std::promise<std::unordered_set<std::string>>>();
scan(pattern, [promise](std::unordered_set<std::string>&& keys) mutable {
if (!promise) return;
promise->set_value(std::move(keys));
promise.reset();
});
return promise->get_future();
}

inline std::unordered_set<std::string> RedisClient::sync_scan(
const std::string& pattern) {
std::future<std::unordered_set<std::string>> fut_keys = scan(pattern);
commit();
return fut_keys.get();
}

} // namespace ctrl_utils

#endif // CTRL_UTILS_REDIS_CLIENT_H_

0 comments on commit b736d08

Please sign in to comment.