Skip to content

Commit

Permalink
chore: pass SinkReplyBuilder and Transaction explicitly. Part7
Browse files Browse the repository at this point in the history
Signed-off-by: Roman Gershman <[email protected]>
  • Loading branch information
romange committed Oct 24, 2024
1 parent 16f59d3 commit f6deff3
Show file tree
Hide file tree
Showing 5 changed files with 169 additions and 176 deletions.
46 changes: 23 additions & 23 deletions src/server/bloom_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -91,99 +91,99 @@ OpResult<ExistsResult> OpExists(const OpArgs& op_args, string_view key, CmdArgLi

} // namespace

void BloomFamily::Reserve(CmdArgList args, ConnectionContext* cntx) {
void BloomFamily::Reserve(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
CmdArgParser parser(args);
string_view key = parser.Next();
SbfParams params;

tie(params.error, params.init_capacity) = parser.Next<double, uint32_t>();

if (parser.Error())
return cntx->SendError(kSyntaxErr);
return builder->SendError(kSyntaxErr);

if (!params.ok())
return cntx->SendError("error rate is out of range", kSyntaxErrType);
return builder->SendError("error rate is out of range", kSyntaxErrType);

const auto cb = [&](Transaction* t, EngineShard* shard) {
return OpReserve(params, t->GetOpArgs(shard), key);
};

OpStatus res = cntx->transaction->ScheduleSingleHop(std::move(cb));
OpStatus res = tx->ScheduleSingleHop(std::move(cb));
if (res == OpStatus::KEY_EXISTS) {
return cntx->SendError("item exists");
return builder->SendError("item exists");
}
return cntx->SendError(res);
return builder->SendError(res);
}

void BloomFamily::Add(CmdArgList args, ConnectionContext* cntx) {
void BloomFamily::Add(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
string_view key = ArgS(args, 0);
args.remove_prefix(1);

const auto cb = [&](Transaction* t, EngineShard* shard) {
return OpAdd(t->GetOpArgs(shard), key, args);
};

OpResult res = cntx->transaction->ScheduleSingleHopT(std::move(cb));
OpResult res = tx->ScheduleSingleHopT(std::move(cb));
OpStatus status = res.status();
if (res) {
if (res->front())
return cntx->SendLong(*res->front());
return builder->SendLong(*res->front());
else
status = res->front().status();
}

return cntx->SendError(status);
return builder->SendError(status);
}

void BloomFamily::Exists(CmdArgList args, ConnectionContext* cntx) {
void BloomFamily::Exists(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
string_view key = ArgS(args, 0);
args.remove_prefix(1);
const auto cb = [&](Transaction* t, EngineShard* shard) {
return OpExists(t->GetOpArgs(shard), key, args);
};

OpResult res = cntx->transaction->ScheduleSingleHopT(std::move(cb));
return cntx->SendLong(res ? res->front() : 0);
OpResult res = tx->ScheduleSingleHopT(std::move(cb));
return builder->SendLong(res ? res->front() : 0);
}

void BloomFamily::MAdd(CmdArgList args, ConnectionContext* cntx) {
void BloomFamily::MAdd(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
string_view key = ArgS(args, 0);
args.remove_prefix(1);

const auto cb = [&](Transaction* t, EngineShard* shard) {
return OpAdd(t->GetOpArgs(shard), key, args);
};

OpResult res = cntx->transaction->ScheduleSingleHopT(std::move(cb));
OpResult res = tx->ScheduleSingleHopT(std::move(cb));
if (!res) {
return cntx->SendError(res.status());
return builder->SendError(res.status());
}
const AddResult& add_res = *res;
RedisReplyBuilder* rb = (RedisReplyBuilder*)cntx->reply_builder();
RedisReplyBuilder* rb = static_cast<RedisReplyBuilder*>(builder);
rb->StartArray(add_res.size());
for (const OpResult<bool>& val : add_res) {
if (val) {
cntx->SendLong(*val);
builder->SendLong(*val);
} else {
cntx->SendError(val.status());
builder->SendError(val.status());
}
}
}

void BloomFamily::MExists(CmdArgList args, ConnectionContext* cntx) {
void BloomFamily::MExists(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
string_view key = ArgS(args, 0);
args.remove_prefix(1);

const auto cb = [&](Transaction* t, EngineShard* shard) {
return OpExists(t->GetOpArgs(shard), key, args);
};

OpResult res = cntx->transaction->ScheduleSingleHopT(std::move(cb));
OpResult res = tx->ScheduleSingleHopT(std::move(cb));

RedisReplyBuilder* rb = (RedisReplyBuilder*)cntx->reply_builder();
RedisReplyBuilder* rb = static_cast<RedisReplyBuilder*>(builder);
rb->StartArray(args.size());
for (size_t i = 0; i < args.size(); ++i) {
cntx->SendLong(res ? res->at(i) : 0);
rb->SendLong(res ? res->at(i) : 0);
}
}

Expand Down
16 changes: 11 additions & 5 deletions src/server/bloom_family.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@

#include "server/common.h"

namespace facade {
class SinkReplyBuilder;
} // namespace facade

namespace dfly {

class CommandRegistry;
Expand All @@ -16,11 +20,13 @@ class BloomFamily {
static void Register(CommandRegistry* registry);

private:
static void Reserve(CmdArgList args, ConnectionContext* cntx);
static void Add(CmdArgList args, ConnectionContext* cntx);
static void MAdd(CmdArgList args, ConnectionContext* cntx);
static void Exists(CmdArgList args, ConnectionContext* cntx);
static void MExists(CmdArgList args, ConnectionContext* cntx);
using SinkReplyBuilder = facade::SinkReplyBuilder;

static void Reserve(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
static void Add(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
static void MAdd(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
static void Exists(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
static void MExists(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
};

} // namespace dfly
16 changes: 8 additions & 8 deletions src/server/error.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,14 @@ using facade::kWrongTypeErr;

#ifndef GET_OR_SEND_UNEXPECTED

#define GET_OR_SEND_UNEXPECTED(expr) \
({ \
auto expr_res = (expr); \
if (!expr_res) { \
cntx->SendError(expr_res.error()); \
return; \
} \
std::move(expr_res).value(); \
#define GET_OR_SEND_UNEXPECTED(expr) \
({ \
auto expr_res = (expr); \
if (!expr_res) { \
builder->SendError(expr_res.error()); \
return; \
} \
std::move(expr_res).value(); \
})

#endif // GET_OR_SEND_UNEXPECTED
Expand Down
Loading

0 comments on commit f6deff3

Please sign in to comment.