Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: pass SinkReplyBuilder and Transaction explicitly. Part7 #3988

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 23 additions & 23 deletions src/server/bloom_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -91,99 +91,99 @@ OpResult<ExistsResult> OpExists(const OpArgs& op_args, string_view key, CmdArgLi

} // namespace

void BloomFamily::Reserve(CmdArgList args, ConnectionContext* cntx) {
void BloomFamily::Reserve(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
CmdArgParser parser(args);
string_view key = parser.Next();
SbfParams params;

tie(params.error, params.init_capacity) = parser.Next<double, uint32_t>();

if (parser.Error())
return cntx->SendError(kSyntaxErr);
return builder->SendError(kSyntaxErr);

if (!params.ok())
return cntx->SendError("error rate is out of range", kSyntaxErrType);
return builder->SendError("error rate is out of range", kSyntaxErrType);

const auto cb = [&](Transaction* t, EngineShard* shard) {
return OpReserve(params, t->GetOpArgs(shard), key);
};

OpStatus res = cntx->transaction->ScheduleSingleHop(std::move(cb));
OpStatus res = tx->ScheduleSingleHop(std::move(cb));
if (res == OpStatus::KEY_EXISTS) {
return cntx->SendError("item exists");
return builder->SendError("item exists");
}
return cntx->SendError(res);
return builder->SendError(res);
}

void BloomFamily::Add(CmdArgList args, ConnectionContext* cntx) {
void BloomFamily::Add(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
string_view key = ArgS(args, 0);
args.remove_prefix(1);

const auto cb = [&](Transaction* t, EngineShard* shard) {
return OpAdd(t->GetOpArgs(shard), key, args);
};

OpResult res = cntx->transaction->ScheduleSingleHopT(std::move(cb));
OpResult res = tx->ScheduleSingleHopT(std::move(cb));
OpStatus status = res.status();
if (res) {
if (res->front())
return cntx->SendLong(*res->front());
return builder->SendLong(*res->front());
else
status = res->front().status();
}

return cntx->SendError(status);
return builder->SendError(status);
}

void BloomFamily::Exists(CmdArgList args, ConnectionContext* cntx) {
void BloomFamily::Exists(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
string_view key = ArgS(args, 0);
args.remove_prefix(1);
const auto cb = [&](Transaction* t, EngineShard* shard) {
return OpExists(t->GetOpArgs(shard), key, args);
};

OpResult res = cntx->transaction->ScheduleSingleHopT(std::move(cb));
return cntx->SendLong(res ? res->front() : 0);
OpResult res = tx->ScheduleSingleHopT(std::move(cb));
return builder->SendLong(res ? res->front() : 0);
}

void BloomFamily::MAdd(CmdArgList args, ConnectionContext* cntx) {
void BloomFamily::MAdd(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
string_view key = ArgS(args, 0);
args.remove_prefix(1);

const auto cb = [&](Transaction* t, EngineShard* shard) {
return OpAdd(t->GetOpArgs(shard), key, args);
};

OpResult res = cntx->transaction->ScheduleSingleHopT(std::move(cb));
OpResult res = tx->ScheduleSingleHopT(std::move(cb));
if (!res) {
return cntx->SendError(res.status());
return builder->SendError(res.status());
}
const AddResult& add_res = *res;
RedisReplyBuilder* rb = (RedisReplyBuilder*)cntx->reply_builder();
RedisReplyBuilder* rb = static_cast<RedisReplyBuilder*>(builder);
rb->StartArray(add_res.size());
for (const OpResult<bool>& val : add_res) {
if (val) {
cntx->SendLong(*val);
builder->SendLong(*val);
} else {
cntx->SendError(val.status());
builder->SendError(val.status());
}
}
}

void BloomFamily::MExists(CmdArgList args, ConnectionContext* cntx) {
void BloomFamily::MExists(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
string_view key = ArgS(args, 0);
args.remove_prefix(1);

const auto cb = [&](Transaction* t, EngineShard* shard) {
return OpExists(t->GetOpArgs(shard), key, args);
};

OpResult res = cntx->transaction->ScheduleSingleHopT(std::move(cb));
OpResult res = tx->ScheduleSingleHopT(std::move(cb));

RedisReplyBuilder* rb = (RedisReplyBuilder*)cntx->reply_builder();
RedisReplyBuilder* rb = static_cast<RedisReplyBuilder*>(builder);
rb->StartArray(args.size());
for (size_t i = 0; i < args.size(); ++i) {
cntx->SendLong(res ? res->at(i) : 0);
rb->SendLong(res ? res->at(i) : 0);
}
}

Expand Down
16 changes: 11 additions & 5 deletions src/server/bloom_family.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@

#include "server/common.h"

namespace facade {
class SinkReplyBuilder;
} // namespace facade

namespace dfly {

class CommandRegistry;
Expand All @@ -16,11 +20,13 @@ class BloomFamily {
static void Register(CommandRegistry* registry);

private:
static void Reserve(CmdArgList args, ConnectionContext* cntx);
static void Add(CmdArgList args, ConnectionContext* cntx);
static void MAdd(CmdArgList args, ConnectionContext* cntx);
static void Exists(CmdArgList args, ConnectionContext* cntx);
static void MExists(CmdArgList args, ConnectionContext* cntx);
using SinkReplyBuilder = facade::SinkReplyBuilder;

static void Reserve(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
static void Add(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
static void MAdd(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
static void Exists(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
static void MExists(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
};

} // namespace dfly
16 changes: 8 additions & 8 deletions src/server/error.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,14 @@ using facade::kWrongTypeErr;

#ifndef GET_OR_SEND_UNEXPECTED

#define GET_OR_SEND_UNEXPECTED(expr) \
({ \
auto expr_res = (expr); \
if (!expr_res) { \
cntx->SendError(expr_res.error()); \
return; \
} \
std::move(expr_res).value(); \
#define GET_OR_SEND_UNEXPECTED(expr) \
({ \
auto expr_res = (expr); \
if (!expr_res) { \
builder->SendError(expr_res.error()); \
return; \
} \
std::move(expr_res).value(); \
})

#endif // GET_OR_SEND_UNEXPECTED
Expand Down
Loading
Loading