From 5c1a5fd2c8ef3bbaba006982791f04073ceb77c5 Mon Sep 17 00:00:00 2001 From: Ryoji Kurosawa Date: Mon, 3 Feb 2025 19:32:57 +0900 Subject: [PATCH] refactoring to pass tx for statement resolution --- mock/jogasaki/utils/runner.cpp | 17 +++++++++-------- src/jogasaki/api/impl/database.cpp | 11 ++++++++--- src/jogasaki/api/impl/database.h | 3 +++ src/jogasaki/api/impl/service.cpp | 15 ++++++++------- src/jogasaki/datastore/register_lob.cpp | 16 ++++++++-------- src/jogasaki/datastore/register_lob.h | 2 +- src/jogasaki/plan/compiler.cpp | 17 ++++++++--------- src/jogasaki/scheduler/flat_task.cpp | 14 ++++---------- test/jogasaki/api/api_test.cpp | 14 +++++++------- test/jogasaki/api/host_variables_test.cpp | 2 +- test/jogasaki/api/service_api_test.cpp | 2 +- 11 files changed, 58 insertions(+), 55 deletions(-) diff --git a/mock/jogasaki/utils/runner.cpp b/mock/jogasaki/utils/runner.cpp index f233a584..898b36c9 100644 --- a/mock/jogasaki/utils/runner.cpp +++ b/mock/jogasaki/utils/runner.cpp @@ -63,6 +63,14 @@ runner& runner::run() { } } + api::transaction_handle tx{tx_}; + std::shared_ptr holder{}; + if(! tx) { + holder = utils::create_transaction(*db_); + tx = *holder; + } + auto tc = api::get_transaction_context(tx); + notnull(prepared); api::impl::parameter_set empty_params{}; std::unique_ptr stmt{}; @@ -70,6 +78,7 @@ runner& runner::run() { prepared, params_ ? maybe_shared_ptr{params_} : maybe_shared_ptr{&empty_params}, stmt, + tc, *out ); res != status::ok) { exec_fail(expect_error_ ? "" : (*out)->message()); @@ -84,17 +93,9 @@ runner& runner::run() { std::cout << ss.str() << std::endl; } - api::transaction_handle tx{tx_}; - std::shared_ptr holder{}; - if(! tx) { - holder = utils::create_transaction(*db_); - tx = *holder; - } - status res{}; std::shared_ptr temp_stats{}; auto* out_stats = stats_ ? stats_ : &temp_stats; - auto tc = api::get_transaction_context(tx); if(output_records_) { // call api for query std::unique_ptr rs{}; diff --git a/src/jogasaki/api/impl/database.cpp b/src/jogasaki/api/impl/database.cpp index 31766465..9db9f1b0 100644 --- a/src/jogasaki/api/impl/database.cpp +++ b/src/jogasaki/api/impl/database.cpp @@ -466,12 +466,13 @@ status database::prepare( status database::create_executable(std::string_view sql, std::unique_ptr& statement) { std::shared_ptr info{}; - return create_executable(sql, statement, info); + return create_executable(sql, statement, nullptr, info); // using nullptr because this function is left for testing } status database::create_executable( std::string_view sql, std::unique_ptr& statement, + std::shared_ptr tx, std::shared_ptr& out, plan::compile_option const& option ) { @@ -481,7 +482,7 @@ status database::create_executable( } std::unique_ptr exec{}; auto parameters = std::make_shared(); - if(auto rc = resolve_common(*prepared, parameters, exec, out); rc != status::ok) { + if(auto rc = resolve_common(*prepared, parameters, exec, std::move(tx), out); rc != status::ok) { return rc; } statement = std::make_unique( @@ -656,19 +657,21 @@ status database::resolve( std::unique_ptr& statement ) { std::shared_ptr info{}; - return resolve(prepared, parameters, statement, info); + return resolve(prepared, parameters, statement, nullptr, info); //FIXME nullptr is used for testing } status database::resolve( api::statement_handle prepared, maybe_shared_ptr parameters, std::unique_ptr& statement, + std::shared_ptr tx, std::shared_ptr& out ) { return resolve_common( *reinterpret_cast(prepared.get()), //NOLINT std::move(parameters), statement, + std::move(tx), out ); } @@ -677,6 +680,7 @@ status database::resolve_common( impl::prepared_statement const& prepared, maybe_shared_ptr parameters, std::unique_ptr& statement, + std::shared_ptr tx, std::shared_ptr& out ) { auto resource = std::make_shared(&global::page_pool()); @@ -685,6 +689,7 @@ status database::resolve_common( ctx->storage_provider(tables_); ctx->aggregate_provider(aggregate_functions_); ctx->function_provider(scalar_functions_); + ctx->transaction(std::move(tx)); auto& ps = unsafe_downcast(prepared).body(); ctx->variable_provider(ps->host_variables()); ctx->prepared_statement(ps); diff --git a/src/jogasaki/api/impl/database.h b/src/jogasaki/api/impl/database.h index 14946453..cc489864 100644 --- a/src/jogasaki/api/impl/database.h +++ b/src/jogasaki/api/impl/database.h @@ -146,6 +146,7 @@ class database : public api::database { [[nodiscard]] status create_executable( std::string_view sql, std::unique_ptr& statement, + std::shared_ptr tx, std::shared_ptr& out, plan::compile_option const& option = {} ); @@ -160,6 +161,7 @@ class database : public api::database { api::statement_handle prepared, maybe_shared_ptr parameters, std::unique_ptr& statement, + std::shared_ptr tx, std::shared_ptr& out ); @@ -339,6 +341,7 @@ class database : public api::database { impl::prepared_statement const& prepared, maybe_shared_ptr parameters, std::unique_ptr& statement, + std::shared_ptr tx, std::shared_ptr& out ); diff --git a/src/jogasaki/api/impl/service.cpp b/src/jogasaki/api/impl/service.cpp index a8c53518..745e7302 100644 --- a/src/jogasaki/api/impl/service.cpp +++ b/src/jogasaki/api/impl/service.cpp @@ -425,7 +425,7 @@ void service::command_execute_statement( } std::unique_ptr e{}; std::shared_ptr err_info{}; - if(auto rc = get_impl(*db_).create_executable(sql, e, err_info); rc != jogasaki::status::ok) { + if(auto rc = get_impl(*db_).create_executable(sql, e, get_transaction_context(tx), err_info); rc != jogasaki::status::ok) { abort_tx(tx, req_info, err_info); details::error(*res, err_info.get(), req_info); return; @@ -510,7 +510,8 @@ void service::command_execute_prepared_statement( std::unique_ptr e{}; std::shared_ptr err_info{}; - if(auto rc = get_impl(*db_).resolve(handle, std::shared_ptr{std::move(params)}, e, err_info); + + if(auto rc = get_impl(*db_).resolve(handle, std::shared_ptr{std::move(params)}, e, get_transaction_context(tx), err_info); rc != jogasaki::status::ok) { abort_tx(tx, req_info, err_info); details::error(*res, err_info.get(), req_info); @@ -683,7 +684,7 @@ void service::command_explain( std::unique_ptr e{}; std::shared_ptr err_info{}; - if(auto rc = get_impl(*db_).resolve(handle, std::shared_ptr{std::move(params)}, e, err_info); + if(auto rc = get_impl(*db_).resolve(handle, std::shared_ptr{std::move(params)}, e, nullptr, err_info); //FIXME stop using executable statement for explain rc != jogasaki::status::ok) { details::error(*res, err_info.get(), req_info); req->status(scheduler::request_detail_status::finishing); @@ -737,7 +738,7 @@ void service::command_explain_by_text( std::unique_ptr e{}; err_info = {}; auto params = jogasaki::api::create_parameter_set(); - if(auto rc = get_impl(*db_).resolve(statement, maybe_shared_ptr{params.get()}, e, err_info); + if(auto rc = get_impl(*db_).resolve(statement, maybe_shared_ptr{params.get()}, e, nullptr, err_info); //FIXME stop using executable statement for explain rc != jogasaki::status::ok) { details::error(*res, err_info.get(), req_info); req->status(scheduler::request_detail_status::finishing); @@ -1379,7 +1380,7 @@ void service::execute_query( std::unique_ptr e{}; std::shared_ptr err_info{}; if(q.has_sql()) { - if(auto rc = get_impl(*db_).create_executable(q.sql(), e, err_info); rc != jogasaki::status::ok) { + if(auto rc = get_impl(*db_).create_executable(q.sql(), e, get_transaction_context(tx), err_info); rc != jogasaki::status::ok) { VLOG(log_error) << log_location_prefix << "error in db_->create_executable() : " << rc; details::error(*res, err_info.get(), req_info); return; @@ -1387,7 +1388,7 @@ void service::execute_query( has_result_records = e->meta() != nullptr; } else { jogasaki::api::statement_handle statement{q.sid(), reinterpret_cast(db_)}; //NOLINT(cppcoreguidelines-pro-type-reinterpret-cast) - if(auto rc = get_impl(*db_).resolve(statement, q.params(), e, err_info); rc != jogasaki::status::ok) { + if(auto rc = get_impl(*db_).resolve(statement, q.params(), e, get_transaction_context(tx), err_info); rc != jogasaki::status::ok) { details::error(*res, err_info.get(), req_info); return; } @@ -1591,7 +1592,7 @@ void service::execute_dump( BOOST_ASSERT(! q.has_sql()); //NOLINT jogasaki::api::statement_handle statement{q.sid(), reinterpret_cast(db_)}; //NOLINT(cppcoreguidelines-pro-type-reinterpret-cast) std::shared_ptr err_info{}; - if(auto rc = get_impl(*db_).resolve(statement, q.params(), e, err_info); rc != jogasaki::status::ok) { + if(auto rc = get_impl(*db_).resolve(statement, q.params(), e, get_transaction_context(tx), err_info); rc != jogasaki::status::ok) { details::error(*res, err_info.get(), req_info); return; } diff --git a/src/jogasaki/datastore/register_lob.cpp b/src/jogasaki/datastore/register_lob.cpp index 42751afe..99676c8c 100644 --- a/src/jogasaki/datastore/register_lob.cpp +++ b/src/jogasaki/datastore/register_lob.cpp @@ -19,19 +19,19 @@ namespace jogasaki::datastore { -status register_lob(std::string_view path, kvs::database* db, transaction_context* tx, limestone::api::blob_id_type& out) { - auto* ds = get_datastore(db); - if (tx) { - if (! tx->blob_pool()) { - // TODO exception / error handling with limestone - tx->blob_pool(ds->acquire_blob_pool()); - } - } else { +status register_lob(std::string_view path, transaction_context* tx, limestone::api::blob_id_type& out) { + if (! tx) { // for testing + auto* ds = get_datastore(nullptr); auto pool = ds->acquire_blob_pool(); out = pool->register_file(boost::filesystem::path{std::string{path}}, false); return status::ok; } + auto* ds = get_datastore(tx->database()); + if (! tx->blob_pool()) { + // TODO exception / error handling with limestone + tx->blob_pool(ds->acquire_blob_pool()); + } out = tx->blob_pool()->register_file(boost::filesystem::path{std::string{path}}, false); return status::ok; } diff --git a/src/jogasaki/datastore/register_lob.h b/src/jogasaki/datastore/register_lob.h index 954d2824..3e97082d 100644 --- a/src/jogasaki/datastore/register_lob.h +++ b/src/jogasaki/datastore/register_lob.h @@ -35,6 +35,6 @@ namespace jogasaki::datastore { * @return status::ok when successful * @return any other error otherwise */ -status register_lob(std::string_view path, kvs::database* db, transaction_context* tx, limestone::api::blob_id_type& out); +status register_lob(std::string_view path, transaction_context* tx, limestone::api::blob_id_type& out); } // namespace jogasaki::datastore diff --git a/src/jogasaki/plan/compiler.cpp b/src/jogasaki/plan/compiler.cpp index cbb31a5e..6e78333f 100644 --- a/src/jogasaki/plan/compiler.cpp +++ b/src/jogasaki/plan/compiler.cpp @@ -863,7 +863,8 @@ status validate_host_variables( std::shared_ptr create_host_variables( parameter_set const* parameters, - std::shared_ptr const& info + std::shared_ptr const& info, + compiler_context& ctx ) { if (parameters == nullptr || info == nullptr) { return {}; @@ -881,16 +882,14 @@ std::shared_ptr create_host_variables( data::any a{}; if (e.value().type_index() == data::value::index) { limestone::api::blob_id_type blob_id{}; - //TODO pass kvs db and tx - if (auto res = datastore::register_lob(e.value().ref().path(), nullptr, nullptr, blob_id); res != status::ok) { + if (auto res = datastore::register_lob(e.value().ref().path(), ctx.transaction().get(), blob_id); res != status::ok) { // error handling std::abort(); } a = data::any{std::in_place_type, blob_reference{blob_id, lob_data_provider::datastore}}; } else if (e.value().type_index() == data::value::index) { limestone::api::blob_id_type blob_id{}; - //TODO pass kvs db and tx - if (auto res = datastore::register_lob(e.value().ref().path(), nullptr, nullptr, blob_id); res != status::ok) { + if (auto res = datastore::register_lob(e.value().ref().path(), ctx.transaction().get(), blob_id); res != status::ok) { // error handling std::abort(); } @@ -917,7 +916,7 @@ void create_mirror_for_write( std::shared_ptr const& mirrors, parameter_set const* parameters ) { - auto vars = create_host_variables(parameters, mirrors->host_variable_info()); + auto vars = create_host_variables(parameters, mirrors->host_variable_info(), ctx); auto& node = unsafe_downcast(*statement); auto& index = yugawara::binding::extract(node.destination()); auto write = std::make_shared( @@ -953,7 +952,7 @@ void create_mirror_for_empty_statement( parameter_set const* parameters ) { auto ops = std::make_shared(); - auto vars = create_host_variables(parameters, mirrors->host_variable_info()); + auto vars = create_host_variables(parameters, mirrors->host_variable_info(), ctx); ctx.executable_statement( std::make_shared( std::move(statement), @@ -999,7 +998,7 @@ void create_mirror_for_ddl( default: throw_exception(std::logic_error{""}); } - auto vars = create_host_variables(parameters, mirrors->host_variable_info()); + auto vars = create_host_variables(parameters, mirrors->host_variable_info(), ctx); ctx.executable_statement( std::make_shared( std::move(statement), @@ -1020,7 +1019,7 @@ void create_mirror_for_execute( std::shared_ptr const& mirrors, parameter_set const* parameters ) { - auto vars = create_host_variables(parameters, mirrors->host_variable_info()); + auto vars = create_host_variables(parameters, mirrors->host_variable_info(), ctx); std::unordered_map steps{}; yugawara::binding::factory bindings{}; auto mirror = std::make_shared(); diff --git a/src/jogasaki/scheduler/flat_task.cpp b/src/jogasaki/scheduler/flat_task.cpp index e6a325aa..1c4d868a 100644 --- a/src/jogasaki/scheduler/flat_task.cpp +++ b/src/jogasaki/scheduler/flat_task.cpp @@ -20,7 +20,6 @@ #include #include -#include #include #include #include @@ -52,8 +51,6 @@ namespace jogasaki::scheduler { -using takatori::util::string_builder; - void flat_task::bootstrap(tateyama::task_scheduler::context& ctx) { log_entry << *this; trace_scope_name("bootstrap"); //NOLINT @@ -375,14 +372,11 @@ void flat_task::resolve(tateyama::task_scheduler::context& ctx) { log_entry << *this; (void)ctx; auto& e = sctx_->executable_statement_; + + std::shared_ptr info{}; if(auto res = sctx_->database_->resolve(sctx_->prepared_, - maybe_shared_ptr{sctx_->parameters_}, e); res != status::ok) { - set_error( - *req_context_, - error_code::sql_execution_exception, - string_builder{} << "creating parallel execution plan failed. status:" << res << string_builder::to_string, - res - ); + maybe_shared_ptr{sctx_->parameters_}, e, req_context_->transaction(), info); res != status::ok) { + req_context_->error_info(info); } else { executor::execute_async_on_context( *sctx_->database_, diff --git a/test/jogasaki/api/api_test.cpp b/test/jogasaki/api/api_test.cpp index e3ebbdac..a6f0b79b 100644 --- a/test/jogasaki/api/api_test.cpp +++ b/test/jogasaki/api/api_test.cpp @@ -97,7 +97,7 @@ using namespace std::string_view_literals; TEST_F(api_test, syntax_error) { std::unique_ptr stmt{}; std::shared_ptr info{}; - ASSERT_EQ(status::err_parse_error, get_impl(*db_).create_executable("AAA", stmt, info)); + ASSERT_EQ(status::err_parse_error, get_impl(*db_).create_executable("AAA", stmt, nullptr, info)); EXPECT_EQ(error_code::syntax_exception, info->code()); std::cerr << info->message() << std::endl; } @@ -105,7 +105,7 @@ TEST_F(api_test, syntax_error) { TEST_F(api_test, missing_table) { std::unique_ptr stmt{}; std::shared_ptr info{}; - ASSERT_EQ(status::err_compiler_error, get_impl(*db_).create_executable("select * from dummy", stmt, info)); + ASSERT_EQ(status::err_compiler_error, get_impl(*db_).create_executable("select * from dummy", stmt, nullptr, info)); EXPECT_EQ(error_code::symbol_analyze_exception, info->code()); std::cerr << info->message() << std::endl; } @@ -113,7 +113,7 @@ TEST_F(api_test, missing_table) { TEST_F(api_test, invalid_column_name) { std::unique_ptr stmt{}; std::shared_ptr info{}; - ASSERT_EQ(status::err_compiler_error, get_impl(*db_).create_executable("INSERT INTO T0(dummy) VALUES(1)", stmt, info)); + ASSERT_EQ(status::err_compiler_error, get_impl(*db_).create_executable("INSERT INTO T0(dummy) VALUES(1)", stmt, nullptr, info)); EXPECT_EQ(error_code::symbol_analyze_exception, info->code()); std::cerr << info->message() << std::endl; } @@ -123,7 +123,7 @@ TEST_F(api_test, inconsistent_type_in_write) { // analyzer option cast_literals_in_context = false can be used to keep the old behavior std::unique_ptr stmt{}; std::shared_ptr info{}; - ASSERT_EQ(status::ok, get_impl(*db_).create_executable("INSERT INTO T0(C0) VALUES('X')", stmt, info)); + ASSERT_EQ(status::ok, get_impl(*db_).create_executable("INSERT INTO T0(C0) VALUES('X')", stmt, nullptr, info)); auto tx = utils::create_transaction(*db_); auto err = execute(*tx, *stmt); ASSERT_EQ(error_code::value_evaluation_exception, err->code()); @@ -133,7 +133,7 @@ TEST_F(api_test, inconsistent_type_in_write) { TEST_F(api_test, inconsistent_type_in_query) { std::unique_ptr stmt{}; std::shared_ptr info{}; - ASSERT_EQ(status::err_compiler_error, get_impl(*db_).create_executable("select C1 from T0 where C1='X'", stmt, info)); + ASSERT_EQ(status::err_compiler_error, get_impl(*db_).create_executable("select C1 from T0 where C1='X'", stmt, nullptr, info)); EXPECT_EQ(error_code::type_analyze_exception, info->code()); std::cerr << info->message() << std::endl; } @@ -591,7 +591,7 @@ TEST_F(api_test, unresolved_parameters) { std::shared_ptr info{}; auto ps = api::create_parameter_set(); std::unique_ptr exec{}; - ASSERT_EQ(status::err_unresolved_host_variable, get_impl(*db_).resolve(prepared, std::shared_ptr{std::move(ps)}, exec, info)); + ASSERT_EQ(status::err_unresolved_host_variable, get_impl(*db_).resolve(prepared, std::shared_ptr{std::move(ps)}, exec, nullptr, info)); EXPECT_EQ(error_code::unresolved_placeholder_exception, info->code()); std::cerr << info->message() << std::endl; ASSERT_EQ(status::ok,db_->destroy_statement(prepared)); @@ -602,7 +602,7 @@ TEST_F(api_test, unresolved_parameters) { std::shared_ptr info{}; auto ps = api::create_parameter_set(); std::unique_ptr exec{}; - ASSERT_EQ(status::err_unresolved_host_variable, get_impl(*db_).resolve(query, std::shared_ptr{std::move(ps)}, exec, info)); + ASSERT_EQ(status::err_unresolved_host_variable, get_impl(*db_).resolve(query, std::shared_ptr{std::move(ps)}, exec, nullptr, info)); EXPECT_EQ(error_code::unresolved_placeholder_exception, info->code()); std::cerr << info->message() << std::endl; ASSERT_EQ(status::ok,db_->destroy_statement(query)); diff --git a/test/jogasaki/api/host_variables_test.cpp b/test/jogasaki/api/host_variables_test.cpp index 8fdcb70d..bc6c90fd 100644 --- a/test/jogasaki/api/host_variables_test.cpp +++ b/test/jogasaki/api/host_variables_test.cpp @@ -582,7 +582,7 @@ TEST_F(host_variables_test, missing_colon) { } TEST_F(host_variables_test, blob_types) { - global::config_pool()->mock_datastore(true); + // global::config_pool()->mock_datastore(true); execute_statement("create table t (c0 int primary key, c1 blob, c2 clob)"); std::unordered_map variables{ {"p0", api::field_type_kind::int4}, diff --git a/test/jogasaki/api/service_api_test.cpp b/test/jogasaki/api/service_api_test.cpp index edeb872c..25c02f00 100644 --- a/test/jogasaki/api/service_api_test.cpp +++ b/test/jogasaki/api/service_api_test.cpp @@ -1329,7 +1329,7 @@ TEST_F(service_api_test, boolean_type) { } TEST_F(service_api_test, blob_types) { - global::config_pool()->mock_datastore(true); + // global::config_pool()->mock_datastore(true); execute_statement("create table t (c0 int primary key, c1 blob, c2 clob)"); std::uint64_t tx_handle{}; test_begin(tx_handle);