diff --git a/src/server/bloom_family.cc b/src/server/bloom_family.cc index 891ae2743458..98bfc3d78a28 100644 --- a/src/server/bloom_family.cc +++ b/src/server/bloom_family.cc @@ -91,7 +91,7 @@ OpResult OpExists(const OpArgs& op_args, string_view key, CmdArgLi } // namespace -void BloomFamily::Reserve(CmdArgList args, ConnectionContext* cntx) { +void BloomFamily::Reserve(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { CmdArgParser parser(args); string_view key = parser.Next(); SbfParams params; @@ -99,23 +99,23 @@ void BloomFamily::Reserve(CmdArgList args, ConnectionContext* cntx) { tie(params.error, params.init_capacity) = parser.Next(); if (parser.Error()) - return cntx->SendError(kSyntaxErr); + return builder->SendError(kSyntaxErr); if (!params.ok()) - return cntx->SendError("error rate is out of range", kSyntaxErrType); + return builder->SendError("error rate is out of range", kSyntaxErrType); const auto cb = [&](Transaction* t, EngineShard* shard) { return OpReserve(params, t->GetOpArgs(shard), key); }; - OpStatus res = cntx->transaction->ScheduleSingleHop(std::move(cb)); + OpStatus res = tx->ScheduleSingleHop(std::move(cb)); if (res == OpStatus::KEY_EXISTS) { - return cntx->SendError("item exists"); + return builder->SendError("item exists"); } - return cntx->SendError(res); + return builder->SendError(res); } -void BloomFamily::Add(CmdArgList args, ConnectionContext* cntx) { +void BloomFamily::Add(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { string_view key = ArgS(args, 0); args.remove_prefix(1); @@ -123,30 +123,30 @@ void BloomFamily::Add(CmdArgList args, ConnectionContext* cntx) { return OpAdd(t->GetOpArgs(shard), key, args); }; - OpResult res = cntx->transaction->ScheduleSingleHopT(std::move(cb)); + OpResult res = tx->ScheduleSingleHopT(std::move(cb)); OpStatus status = res.status(); if (res) { if (res->front()) - return cntx->SendLong(*res->front()); + return builder->SendLong(*res->front()); else status = res->front().status(); } - return cntx->SendError(status); + return builder->SendError(status); } -void BloomFamily::Exists(CmdArgList args, ConnectionContext* cntx) { +void BloomFamily::Exists(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { string_view key = ArgS(args, 0); args.remove_prefix(1); const auto cb = [&](Transaction* t, EngineShard* shard) { return OpExists(t->GetOpArgs(shard), key, args); }; - OpResult res = cntx->transaction->ScheduleSingleHopT(std::move(cb)); - return cntx->SendLong(res ? res->front() : 0); + OpResult res = tx->ScheduleSingleHopT(std::move(cb)); + return builder->SendLong(res ? res->front() : 0); } -void BloomFamily::MAdd(CmdArgList args, ConnectionContext* cntx) { +void BloomFamily::MAdd(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { string_view key = ArgS(args, 0); args.remove_prefix(1); @@ -154,23 +154,23 @@ void BloomFamily::MAdd(CmdArgList args, ConnectionContext* cntx) { return OpAdd(t->GetOpArgs(shard), key, args); }; - OpResult res = cntx->transaction->ScheduleSingleHopT(std::move(cb)); + OpResult res = tx->ScheduleSingleHopT(std::move(cb)); if (!res) { - return cntx->SendError(res.status()); + return builder->SendError(res.status()); } const AddResult& add_res = *res; - RedisReplyBuilder* rb = (RedisReplyBuilder*)cntx->reply_builder(); + RedisReplyBuilder* rb = static_cast(builder); rb->StartArray(add_res.size()); for (const OpResult& val : add_res) { if (val) { - cntx->SendLong(*val); + builder->SendLong(*val); } else { - cntx->SendError(val.status()); + builder->SendError(val.status()); } } } -void BloomFamily::MExists(CmdArgList args, ConnectionContext* cntx) { +void BloomFamily::MExists(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { string_view key = ArgS(args, 0); args.remove_prefix(1); @@ -178,12 +178,12 @@ void BloomFamily::MExists(CmdArgList args, ConnectionContext* cntx) { return OpExists(t->GetOpArgs(shard), key, args); }; - OpResult res = cntx->transaction->ScheduleSingleHopT(std::move(cb)); + OpResult res = tx->ScheduleSingleHopT(std::move(cb)); - RedisReplyBuilder* rb = (RedisReplyBuilder*)cntx->reply_builder(); + RedisReplyBuilder* rb = static_cast(builder); rb->StartArray(args.size()); for (size_t i = 0; i < args.size(); ++i) { - cntx->SendLong(res ? res->at(i) : 0); + rb->SendLong(res ? res->at(i) : 0); } } diff --git a/src/server/bloom_family.h b/src/server/bloom_family.h index 9b56024d865f..1bb109b48e2e 100644 --- a/src/server/bloom_family.h +++ b/src/server/bloom_family.h @@ -6,6 +6,10 @@ #include "server/common.h" +namespace facade { +class SinkReplyBuilder; +} // namespace facade + namespace dfly { class CommandRegistry; @@ -16,11 +20,13 @@ class BloomFamily { static void Register(CommandRegistry* registry); private: - static void Reserve(CmdArgList args, ConnectionContext* cntx); - static void Add(CmdArgList args, ConnectionContext* cntx); - static void MAdd(CmdArgList args, ConnectionContext* cntx); - static void Exists(CmdArgList args, ConnectionContext* cntx); - static void MExists(CmdArgList args, ConnectionContext* cntx); + using SinkReplyBuilder = facade::SinkReplyBuilder; + + static void Reserve(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); + static void Add(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); + static void MAdd(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); + static void Exists(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); + static void MExists(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); }; } // namespace dfly diff --git a/src/server/error.h b/src/server/error.h index 2ef33d5cffc5..00e1218a56b0 100644 --- a/src/server/error.h +++ b/src/server/error.h @@ -47,14 +47,14 @@ using facade::kWrongTypeErr; #ifndef GET_OR_SEND_UNEXPECTED -#define GET_OR_SEND_UNEXPECTED(expr) \ - ({ \ - auto expr_res = (expr); \ - if (!expr_res) { \ - cntx->SendError(expr_res.error()); \ - return; \ - } \ - std::move(expr_res).value(); \ +#define GET_OR_SEND_UNEXPECTED(expr) \ + ({ \ + auto expr_res = (expr); \ + if (!expr_res) { \ + builder->SendError(expr_res.error()); \ + return; \ + } \ + std::move(expr_res).value(); \ }) #endif // GET_OR_SEND_UNEXPECTED diff --git a/src/server/json_family.cc b/src/server/json_family.cc index f6bb561d8810..b96064576738 100644 --- a/src/server/json_family.cc +++ b/src/server/json_family.cc @@ -28,6 +28,7 @@ #include "server/command_registry.h" #include "server/common.h" #include "server/detail/wrapped_json_path.h" +#include "server/engine_shard_set.h" #include "server/error.h" #include "server/journal/journal.h" #include "server/search/doc_index.h" @@ -45,6 +46,8 @@ using namespace std; using namespace jsoncons; using facade::CmdArgParser; using facade::kSyntaxErrType; +using facade::RedisReplyBuilder; +using facade::SinkReplyBuilder; using JsonExpression = jsonpath::jsonpath_expression; using JsonReplaceVerify = std::function; @@ -252,7 +255,7 @@ struct JsonGetParams { std::vector> paths; }; -std::optional ParseJsonGetParams(CmdArgParser* parser, ConnectionContext* cntx) { +std::optional ParseJsonGetParams(CmdArgParser* parser, SinkReplyBuilder* builder) { JsonGetParams parsed_args; while (parser->HasNext()) { if (parser->Check("NOESCAPE")) { @@ -268,7 +271,7 @@ std::optional ParseJsonGetParams(CmdArgParser* parser, Connection auto json_path = ParseJsonPath(path_str); if (!json_path) { - cntx->SendError(json_path.error()); + builder->SendError(json_path.error()); return std::nullopt; } @@ -791,13 +794,14 @@ auto OpToggle(const OpArgs& op_args, string_view key, } template -auto ExecuteToggle(string_view key, const WrappedJsonPath& json_path, ConnectionContext* cntx) { +auto ExecuteToggle(string_view key, const WrappedJsonPath& json_path, Transaction* tx, + SinkReplyBuilder* builder) { auto cb = [&](Transaction* t, EngineShard* shard) { return OpToggle(t->GetOpArgs(shard), key, json_path); }; - auto result = cntx->transaction->ScheduleSingleHopT(std::move(cb)); - auto* rb = static_cast(cntx->reply_builder()); + auto result = tx->ScheduleSingleHopT(std::move(cb)); + auto* rb = static_cast(builder); reply_generic::Send(result, rb); } @@ -1450,7 +1454,7 @@ OpStatus OpMerge(const OpArgs& op_args, string_view key, string_view path, } // namespace -void JsonFamily::Set(CmdArgList args, ConnectionContext* cntx) { +void JsonFamily::Set(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { CmdArgParser parser{args}; auto [key, path, json_str] = parser.Next(); @@ -1460,18 +1464,16 @@ void JsonFamily::Set(CmdArgList args, ConnectionContext* cntx) { bool is_xx_condition = (res == 2), is_nx_condition = (res == 1); if (parser.Error() || parser.HasNext()) // also clear the parser error dcheck - return cntx->SendError(kSyntaxErr); + return builder->SendError(kSyntaxErr); auto cb = [&](Transaction* t, EngineShard* shard) { return OpSet(t->GetOpArgs(shard), key, path, json_path, json_str, is_nx_condition, is_xx_condition); }; - Transaction* trans = cntx->transaction; + OpResult result = tx->ScheduleSingleHopT(std::move(cb)); - OpResult result = trans->ScheduleSingleHopT(std::move(cb)); - - auto* rb = static_cast(cntx->reply_builder()); + auto* rb = static_cast(builder); if (result) { if (*result) { rb->SendOk(); @@ -1479,15 +1481,15 @@ void JsonFamily::Set(CmdArgList args, ConnectionContext* cntx) { rb->SendNull(); } } else { - cntx->SendError(result.status()); + builder->SendError(result.status()); } } // JSON.MSET key path value [key path value ...] -void JsonFamily::MSet(CmdArgList args, ConnectionContext* cntx) { +void JsonFamily::MSet(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { DCHECK_GE(args.size(), 3u); if (args.size() % 3 != 0) { - return cntx->SendError(facade::WrongNumArgsError("json.mset")); + return builder->SendError(facade::WrongNumArgsError("json.mset")); } AggregateStatus status; @@ -1499,16 +1501,16 @@ void JsonFamily::MSet(CmdArgList args, ConnectionContext* cntx) { return OpStatus::OK; }; - cntx->transaction->ScheduleSingleHop(cb); + tx->ScheduleSingleHop(cb); if (*status != OpStatus::OK) - return cntx->SendError(*status); - cntx->SendOk(); + return builder->SendError(*status); + builder->SendOk(); } // JSON.MERGE key path value // Based on https://datatracker.ietf.org/doc/html/rfc7386 spec -void JsonFamily::Merge(CmdArgList args, ConnectionContext* cntx) { +void JsonFamily::Merge(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { CmdArgParser parser{args}; string_view key = parser.Next(); string_view path = parser.Next(); @@ -1520,13 +1522,13 @@ void JsonFamily::Merge(CmdArgList args, ConnectionContext* cntx) { return OpMerge(t->GetOpArgs(shard), key, path, json_path, value); }; - OpStatus status = cntx->transaction->ScheduleSingleHop(std::move(cb)); + OpStatus status = tx->ScheduleSingleHop(std::move(cb)); if (status == OpStatus::OK) - return cntx->SendOk(); - cntx->SendError(status); + return builder->SendOk(); + builder->SendError(status); } -void JsonFamily::Resp(CmdArgList args, ConnectionContext* cntx) { +void JsonFamily::Resp(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { CmdArgParser parser{args}; string_view key = parser.Next(); string_view path = parser.NextOrDefault(); @@ -1537,27 +1539,26 @@ void JsonFamily::Resp(CmdArgList args, ConnectionContext* cntx) { return OpResp(t->GetOpArgs(shard), key, json_path); }; - Transaction* trans = cntx->transaction; - auto result = trans->ScheduleSingleHopT(std::move(cb)); - auto* rb = static_cast(cntx->reply_builder()); + auto result = tx->ScheduleSingleHopT(std::move(cb)); + auto* rb = static_cast(builder); reply_generic::Send(result, rb); } -void JsonFamily::Debug(CmdArgList args, ConnectionContext* cntx) { +void JsonFamily::Debug(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { CmdArgParser parser{args}; string_view command = parser.Next(); // The 'MEMORY' sub-command is not supported yet, calling to operation function should be added // here. if (absl::EqualsIgnoreCase(command, "help")) { - auto* rb = static_cast(cntx->reply_builder()); + auto* rb = static_cast(builder); rb->StartArray(2); rb->SendBulkString( "JSON.DEBUG FIELDS - report number of fields in the JSON element."); rb->SendBulkString("JSON.DEBUG HELP - print help message."); return; } else if (!absl::EqualsIgnoreCase(command, "fields")) { - cntx->SendError(facade::UnknownSubCmd(command, "JSON.DEBUG"), facade::kSyntaxErrType); + builder->SendError(facade::UnknownSubCmd(command, "JSON.DEBUG"), facade::kSyntaxErrType); return; } @@ -1572,19 +1573,17 @@ void JsonFamily::Debug(CmdArgList args, ConnectionContext* cntx) { return OpFields(t->GetOpArgs(shard), key, json_path); }; - Transaction* trans = cntx->transaction; - auto result = trans->ScheduleSingleHopT(std::move(cb)); - auto* rb = static_cast(cntx->reply_builder()); + auto result = tx->ScheduleSingleHopT(std::move(cb)); + auto* rb = static_cast(builder); reply_generic::Send(result, rb); } -void JsonFamily::MGet(CmdArgList args, ConnectionContext* cntx) { +void JsonFamily::MGet(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { DCHECK_GE(args.size(), 1U); string_view path = ArgS(args, args.size() - 1); WrappedJsonPath json_path = GET_OR_SEND_UNEXPECTED(ParseJsonPath(path)); - Transaction* transaction = cntx->transaction; unsigned shard_count = shard_set->size(); std::vector>> mget_resp(shard_count); @@ -1594,16 +1593,16 @@ void JsonFamily::MGet(CmdArgList args, ConnectionContext* cntx) { return OpStatus::OK; }; - OpStatus result = transaction->ScheduleSingleHop(std::move(cb)); + OpStatus result = tx->ScheduleSingleHop(std::move(cb)); CHECK_EQ(OpStatus::OK, result); std::vector> results(args.size() - 1); for (ShardId sid = 0; sid < shard_count; ++sid) { - if (!transaction->IsActive(sid)) + if (!tx->IsActive(sid)) continue; std::vector>& res = mget_resp[sid]; - ShardArgs shard_args = transaction->GetShardArgs(sid); + ShardArgs shard_args = tx->GetShardArgs(sid); unsigned src_index = 0; for (auto it = shard_args.begin(); it != shard_args.end(); ++it, ++src_index) { if (!res[src_index]) @@ -1614,11 +1613,11 @@ void JsonFamily::MGet(CmdArgList args, ConnectionContext* cntx) { } } - auto* rb = static_cast(cntx->reply_builder()); + auto* rb = static_cast(builder); reply_generic::Send(results.begin(), results.end(), rb); } -void JsonFamily::ArrIndex(CmdArgList args, ConnectionContext* cntx) { +void JsonFamily::ArrIndex(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { CmdArgParser parser{args}; string_view key = parser.Next(); string_view path = parser.Next(); @@ -1627,7 +1626,7 @@ void JsonFamily::ArrIndex(CmdArgList args, ConnectionContext* cntx) { optional search_value = JsonFromString(parser.Next()); if (!search_value) { - cntx->SendError(kSyntaxErr); + builder->SendError(kSyntaxErr); return; } @@ -1635,7 +1634,7 @@ void JsonFamily::ArrIndex(CmdArgList args, ConnectionContext* cntx) { if (parser.HasNext()) { if (!absl::SimpleAtoi(parser.Next(), &start_index)) { VLOG(1) << "Failed to convert the start index to numeric" << ArgS(args, 3); - cntx->SendError(kInvalidIntErr); + builder->SendError(kInvalidIntErr); return; } } @@ -1644,7 +1643,7 @@ void JsonFamily::ArrIndex(CmdArgList args, ConnectionContext* cntx) { if (parser.HasNext()) { if (!absl::SimpleAtoi(parser.Next(), &end_index)) { VLOG(1) << "Failed to convert the stop index to numeric" << ArgS(args, 4); - cntx->SendError(kInvalidIntErr); + builder->SendError(kInvalidIntErr); return; } } @@ -1653,20 +1652,19 @@ void JsonFamily::ArrIndex(CmdArgList args, ConnectionContext* cntx) { return OpArrIndex(t->GetOpArgs(shard), key, json_path, *search_value, start_index, end_index); }; - Transaction* trans = cntx->transaction; - auto result = trans->ScheduleSingleHopT(std::move(cb)); - auto* rb = static_cast(cntx->reply_builder()); + auto result = tx->ScheduleSingleHopT(std::move(cb)); + auto* rb = static_cast(builder); reply_generic::Send(result, rb); } -void JsonFamily::ArrInsert(CmdArgList args, ConnectionContext* cntx) { +void JsonFamily::ArrInsert(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { string_view key = ArgS(args, 0); string_view path = ArgS(args, 1); int index = -1; if (!absl::SimpleAtoi(ArgS(args, 2), &index)) { VLOG(1) << "Failed to convert the following value to numeric: " << ArgS(args, 2); - cntx->SendError(kInvalidIntErr); + builder->SendError(kInvalidIntErr); return; } @@ -1676,7 +1674,7 @@ void JsonFamily::ArrInsert(CmdArgList args, ConnectionContext* cntx) { for (size_t i = 3; i < args.size(); i++) { optional val = JsonFromString(ArgS(args, i)); if (!val) { - cntx->SendError(kSyntaxErr); + builder->SendError(kSyntaxErr); return; } @@ -1687,13 +1685,12 @@ void JsonFamily::ArrInsert(CmdArgList args, ConnectionContext* cntx) { return OpArrInsert(t->GetOpArgs(shard), key, json_path, index, new_values); }; - Transaction* trans = cntx->transaction; - auto result = trans->ScheduleSingleHopT(std::move(cb)); - auto* rb = static_cast(cntx->reply_builder()); + auto result = tx->ScheduleSingleHopT(std::move(cb)); + auto* rb = static_cast(builder); reply_generic::Send(result, rb); } -void JsonFamily::ArrAppend(CmdArgList args, ConnectionContext* cntx) { +void JsonFamily::ArrAppend(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { string_view key = ArgS(args, 0); string_view path = ArgS(args, 1); @@ -1706,7 +1703,7 @@ void JsonFamily::ArrAppend(CmdArgList args, ConnectionContext* cntx) { for (size_t i = 2; i < args.size(); ++i) { optional converted_val = JsonFromString(ArgS(args, i)); if (!converted_val) { - cntx->SendError(kSyntaxErr); + builder->SendError(kSyntaxErr); return; } append_values.emplace_back(converted_val); @@ -1716,13 +1713,12 @@ void JsonFamily::ArrAppend(CmdArgList args, ConnectionContext* cntx) { return OpArrAppend(t->GetOpArgs(shard), key, json_path, append_values); }; - Transaction* trans = cntx->transaction; - auto result = trans->ScheduleSingleHopT(std::move(cb)); - auto* rb = static_cast(cntx->reply_builder()); + auto result = tx->ScheduleSingleHopT(std::move(cb)); + auto* rb = static_cast(builder); reply_generic::Send(result, rb); } -void JsonFamily::ArrTrim(CmdArgList args, ConnectionContext* cntx) { +void JsonFamily::ArrTrim(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { string_view key = ArgS(args, 0); string_view path = ArgS(args, 1); int start_index; @@ -1730,13 +1726,13 @@ void JsonFamily::ArrTrim(CmdArgList args, ConnectionContext* cntx) { if (!absl::SimpleAtoi(ArgS(args, 2), &start_index)) { VLOG(1) << "Failed to parse array start index"; - cntx->SendError(kInvalidIntErr); + builder->SendError(kInvalidIntErr); return; } if (!absl::SimpleAtoi(ArgS(args, 3), &stop_index)) { VLOG(1) << "Failed to parse array stop index"; - cntx->SendError(kInvalidIntErr); + builder->SendError(kInvalidIntErr); return; } @@ -1746,13 +1742,12 @@ void JsonFamily::ArrTrim(CmdArgList args, ConnectionContext* cntx) { return OpArrTrim(t->GetOpArgs(shard), key, json_path, start_index, stop_index); }; - Transaction* trans = cntx->transaction; - auto result = trans->ScheduleSingleHopT(std::move(cb)); - auto* rb = static_cast(cntx->reply_builder()); + auto result = tx->ScheduleSingleHopT(std::move(cb)); + auto* rb = static_cast(builder); reply_generic::Send(result, rb); } -void JsonFamily::ArrPop(CmdArgList args, ConnectionContext* cntx) { +void JsonFamily::ArrPop(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { CmdArgParser parser{args}; string_view key = parser.Next(); string_view path = parser.NextOrDefault(); @@ -1764,13 +1759,12 @@ void JsonFamily::ArrPop(CmdArgList args, ConnectionContext* cntx) { return OpArrPop(t->GetOpArgs(shard), key, json_path, index); }; - Transaction* trans = cntx->transaction; - auto result = trans->ScheduleSingleHopT(std::move(cb)); - auto* rb = static_cast(cntx->reply_builder()); + auto result = tx->ScheduleSingleHopT(std::move(cb)); + auto* rb = static_cast(builder); reply_generic::Send(result, rb); } -void JsonFamily::Clear(CmdArgList args, ConnectionContext* cntx) { +void JsonFamily::Clear(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { CmdArgParser parser{args}; string_view key = parser.Next(); string_view path = parser.NextOrDefault(); @@ -1781,13 +1775,12 @@ void JsonFamily::Clear(CmdArgList args, ConnectionContext* cntx) { return OpClear(t->GetOpArgs(shard), key, json_path); }; - Transaction* trans = cntx->transaction; - OpResult result = trans->ScheduleSingleHopT(std::move(cb)); - auto* rb = static_cast(cntx->reply_builder()); + OpResult result = tx->ScheduleSingleHopT(std::move(cb)); + auto* rb = static_cast(builder); reply_generic::Send(result, rb); } -void JsonFamily::StrAppend(CmdArgList args, ConnectionContext* cntx) { +void JsonFamily::StrAppend(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { string_view key = ArgS(args, 0); string_view path = ArgS(args, 1); string_view value = ArgS(args, 2); @@ -1797,7 +1790,7 @@ void JsonFamily::StrAppend(CmdArgList args, ConnectionContext* cntx) { // We try parsing the value into json string object first. optional parsed_json = JsonFromString(value); if (!parsed_json || !parsed_json->is_string()) { - return cntx->SendError("expected string value", kSyntaxErrType); + return builder->SendError("expected string value", kSyntaxErrType); }; string_view json_string = parsed_json->as_string_view(); @@ -1805,13 +1798,12 @@ void JsonFamily::StrAppend(CmdArgList args, ConnectionContext* cntx) { return OpStrAppend(t->GetOpArgs(shard), key, json_path, json_string); }; - Transaction* trans = cntx->transaction; - auto result = trans->ScheduleSingleHopT(std::move(cb)); - auto* rb = static_cast(cntx->reply_builder()); + auto result = tx->ScheduleSingleHopT(std::move(cb)); + auto* rb = static_cast(builder); reply_generic::Send(result, rb); } -void JsonFamily::ObjKeys(CmdArgList args, ConnectionContext* cntx) { +void JsonFamily::ObjKeys(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { CmdArgParser parser{args}; string_view key = parser.Next(); string_view path = parser.NextOrDefault(); @@ -1822,13 +1814,12 @@ void JsonFamily::ObjKeys(CmdArgList args, ConnectionContext* cntx) { return OpObjKeys(t->GetOpArgs(shard), key, json_path); }; - Transaction* trans = cntx->transaction; - auto result = trans->ScheduleSingleHopT(std::move(cb)); - auto* rb = static_cast(cntx->reply_builder()); + auto result = tx->ScheduleSingleHopT(std::move(cb)); + auto* rb = static_cast(builder); reply_generic::Send(result, rb); } -void JsonFamily::Del(CmdArgList args, ConnectionContext* cntx) { +void JsonFamily::Del(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { CmdArgParser parser{args}; string_view key = parser.Next(); string_view path = parser.NextOrDefault(); @@ -1839,13 +1830,12 @@ void JsonFamily::Del(CmdArgList args, ConnectionContext* cntx) { return OpDel(t->GetOpArgs(shard), key, path, json_path); }; - Transaction* trans = cntx->transaction; - OpResult result = trans->ScheduleSingleHopT(std::move(cb)); - auto* rb = static_cast(cntx->reply_builder()); + OpResult result = tx->ScheduleSingleHopT(std::move(cb)); + auto* rb = static_cast(builder); reply_generic::Send(result, rb); } -void JsonFamily::NumIncrBy(CmdArgList args, ConnectionContext* cntx) { +void JsonFamily::NumIncrBy(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { string_view key = ArgS(args, 0); string_view path = ArgS(args, 1); string_view num = ArgS(args, 2); @@ -1856,12 +1846,12 @@ void JsonFamily::NumIncrBy(CmdArgList args, ConnectionContext* cntx) { return OpDoubleArithmetic(t->GetOpArgs(shard), key, json_path, num, OP_ADD); }; - OpResult result = cntx->transaction->ScheduleSingleHopT(std::move(cb)); - auto* rb = static_cast(cntx->reply_builder()); + OpResult result = tx->ScheduleSingleHopT(std::move(cb)); + auto* rb = static_cast(builder); reply_generic::Send(result, rb); } -void JsonFamily::NumMultBy(CmdArgList args, ConnectionContext* cntx) { +void JsonFamily::NumMultBy(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { string_view key = ArgS(args, 0); string_view path = ArgS(args, 1); string_view num = ArgS(args, 2); @@ -1872,12 +1862,12 @@ void JsonFamily::NumMultBy(CmdArgList args, ConnectionContext* cntx) { return OpDoubleArithmetic(t->GetOpArgs(shard), key, json_path, num, OP_MULTIPLY); }; - OpResult result = cntx->transaction->ScheduleSingleHopT(std::move(cb)); - auto* rb = static_cast(cntx->reply_builder()); + OpResult result = tx->ScheduleSingleHopT(std::move(cb)); + auto* rb = static_cast(builder); reply_generic::Send(result, rb); } -void JsonFamily::Toggle(CmdArgList args, ConnectionContext* cntx) { +void JsonFamily::Toggle(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { CmdArgParser parser{args}; string_view key = parser.Next(); string_view path = parser.NextOrDefault(); @@ -1885,13 +1875,13 @@ void JsonFamily::Toggle(CmdArgList args, ConnectionContext* cntx) { WrappedJsonPath json_path = GET_OR_SEND_UNEXPECTED(ParseJsonPath(path)); if (json_path.IsLegacyModePath()) { - ExecuteToggle(key, json_path, cntx); + ExecuteToggle(key, json_path, tx, builder); } else { - ExecuteToggle(key, json_path, cntx); + ExecuteToggle(key, json_path, tx, builder); } } -void JsonFamily::Type(CmdArgList args, ConnectionContext* cntx) { +void JsonFamily::Type(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { CmdArgParser parser{args}; string_view key = parser.Next(); string_view path = parser.NextOrDefault(); @@ -1902,13 +1892,12 @@ void JsonFamily::Type(CmdArgList args, ConnectionContext* cntx) { return OpType(t->GetOpArgs(shard), key, json_path); }; - Transaction* trans = cntx->transaction; - auto result = trans->ScheduleSingleHopT(std::move(cb)); - auto* rb = static_cast(cntx->reply_builder()); + auto result = tx->ScheduleSingleHopT(std::move(cb)); + auto* rb = static_cast(builder); reply_generic::Send(result, rb); } -void JsonFamily::ArrLen(CmdArgList args, ConnectionContext* cntx) { +void JsonFamily::ArrLen(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { CmdArgParser parser{args}; string_view key = parser.Next(); string_view path = parser.NextOrDefault(); @@ -1919,13 +1908,12 @@ void JsonFamily::ArrLen(CmdArgList args, ConnectionContext* cntx) { return OpArrLen(t->GetOpArgs(shard), key, json_path); }; - Transaction* trans = cntx->transaction; - auto result = trans->ScheduleSingleHopT(std::move(cb)); - auto* rb = static_cast(cntx->reply_builder()); + auto result = tx->ScheduleSingleHopT(std::move(cb)); + auto* rb = static_cast(builder); reply_generic::Send(result, rb); } -void JsonFamily::ObjLen(CmdArgList args, ConnectionContext* cntx) { +void JsonFamily::ObjLen(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { CmdArgParser parser{args}; string_view key = parser.Next(); string_view path = parser.NextOrDefault(); @@ -1936,13 +1924,12 @@ void JsonFamily::ObjLen(CmdArgList args, ConnectionContext* cntx) { return OpObjLen(t->GetOpArgs(shard), key, json_path); }; - Transaction* trans = cntx->transaction; - auto result = trans->ScheduleSingleHopT(std::move(cb)); - auto* rb = static_cast(cntx->reply_builder()); + auto result = tx->ScheduleSingleHopT(std::move(cb)); + auto* rb = static_cast(builder); reply_generic::Send(result, rb); } -void JsonFamily::StrLen(CmdArgList args, ConnectionContext* cntx) { +void JsonFamily::StrLen(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { CmdArgParser parser{args}; string_view key = parser.Next(); string_view path = parser.NextOrDefault(); @@ -1953,33 +1940,31 @@ void JsonFamily::StrLen(CmdArgList args, ConnectionContext* cntx) { return OpStrLen(t->GetOpArgs(shard), key, json_path); }; - Transaction* trans = cntx->transaction; - auto result = trans->ScheduleSingleHopT(std::move(cb)); - auto* rb = static_cast(cntx->reply_builder()); + auto result = tx->ScheduleSingleHopT(std::move(cb)); + auto* rb = static_cast(builder); reply_generic::Send(result, rb); } -void JsonFamily::Get(CmdArgList args, ConnectionContext* cntx) { +void JsonFamily::Get(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { DCHECK_GE(args.size(), 1U); facade::CmdArgParser parser{args}; string_view key = parser.Next(); - auto params = ParseJsonGetParams(&parser, cntx); + auto params = ParseJsonGetParams(&parser, builder); if (!params) { return; // ParseJsonGetParams should have already sent an error } if (auto err = parser.Error(); err) - return cntx->SendError(err->MakeReply()); + return builder->SendError(err->MakeReply()); auto cb = [&](Transaction* t, EngineShard* shard) { return OpJsonGet(t->GetOpArgs(shard), key, params.value()); }; - Transaction* trans = cntx->transaction; - OpResult result = trans->ScheduleSingleHopT(std::move(cb)); - auto* rb = static_cast(cntx->reply_builder()); + OpResult result = tx->ScheduleSingleHopT(std::move(cb)); + auto* rb = static_cast(builder); if (result == OpStatus::KEY_NOTFOUND) { rb->SendNull(); // Match Redis diff --git a/src/server/json_family.h b/src/server/json_family.h index a8345d898556..979da9a25b68 100644 --- a/src/server/json_family.h +++ b/src/server/json_family.h @@ -5,44 +5,46 @@ #pragma once #include "server/common.h" -#include "server/engine_shard_set.h" + +namespace facade { +class SinkReplyBuilder; +} // namespace facade namespace dfly { class ConnectionContext; class CommandRegistry; -using facade::OpResult; -using facade::OpStatus; -using facade::RedisReplyBuilder; class JsonFamily { public: static void Register(CommandRegistry* registry); private: - static void Get(CmdArgList args, ConnectionContext* cntx); - static void MGet(CmdArgList args, ConnectionContext* cntx); - static void Type(CmdArgList args, ConnectionContext* cntx); - static void StrLen(CmdArgList args, ConnectionContext* cntx); - static void ObjLen(CmdArgList args, ConnectionContext* cntx); - static void ArrLen(CmdArgList args, ConnectionContext* cntx); - static void Toggle(CmdArgList args, ConnectionContext* cntx); - static void NumIncrBy(CmdArgList args, ConnectionContext* cntx); - static void NumMultBy(CmdArgList args, ConnectionContext* cntx); - static void Del(CmdArgList args, ConnectionContext* cntx); - static void ObjKeys(CmdArgList args, ConnectionContext* cntx); - static void StrAppend(CmdArgList args, ConnectionContext* cntx); - static void Clear(CmdArgList args, ConnectionContext* cntx); - static void ArrPop(CmdArgList args, ConnectionContext* cntx); - static void ArrTrim(CmdArgList args, ConnectionContext* cntx); - static void ArrInsert(CmdArgList args, ConnectionContext* cntx); - static void ArrAppend(CmdArgList args, ConnectionContext* cntx); - static void ArrIndex(CmdArgList args, ConnectionContext* cntx); - static void Debug(CmdArgList args, ConnectionContext* cntx); - static void Resp(CmdArgList args, ConnectionContext* cntx); - static void Set(CmdArgList args, ConnectionContext* cntx); - static void MSet(CmdArgList args, ConnectionContext* cntx); - static void Merge(CmdArgList args, ConnectionContext* cntx); + using SinkReplyBuilder = facade::SinkReplyBuilder; + + static void Get(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); + static void MGet(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); + static void Type(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); + static void StrLen(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); + static void ObjLen(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); + static void ArrLen(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); + static void Toggle(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); + static void NumIncrBy(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); + static void NumMultBy(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); + static void Del(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); + static void ObjKeys(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); + static void StrAppend(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); + static void Clear(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); + static void ArrPop(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); + static void ArrTrim(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); + static void ArrInsert(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); + static void ArrAppend(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); + static void ArrIndex(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); + static void Debug(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); + static void Resp(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); + static void Set(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); + static void MSet(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); + static void Merge(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); }; } // namespace dfly