Skip to content

Commit

Permalink
refactoring to pass tx for statement resolution
Browse files Browse the repository at this point in the history
  • Loading branch information
kuron99 committed Feb 3, 2025
1 parent 28dad09 commit 5c1a5fd
Show file tree
Hide file tree
Showing 11 changed files with 58 additions and 55 deletions.
17 changes: 9 additions & 8 deletions mock/jogasaki/utils/runner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,22 @@ runner& runner::run() {
}
}

api::transaction_handle tx{tx_};
std::shared_ptr<api::transaction_handle> holder{};
if(! tx) {
holder = utils::create_transaction(*db_);
tx = *holder;
}
auto tc = api::get_transaction_context(tx);

notnull(prepared);
api::impl::parameter_set empty_params{};
std::unique_ptr<api::executable_statement> stmt{};
if(auto res = get_impl(*db_).resolve(
prepared,
params_ ? maybe_shared_ptr{params_} : maybe_shared_ptr{&empty_params},
stmt,
tc,
*out
); res != status::ok) {
exec_fail(expect_error_ ? "" : (*out)->message());
Expand All @@ -84,17 +93,9 @@ runner& runner::run() {
std::cout << ss.str() << std::endl;
}

api::transaction_handle tx{tx_};
std::shared_ptr<api::transaction_handle> holder{};
if(! tx) {
holder = utils::create_transaction(*db_);
tx = *holder;
}

status res{};
std::shared_ptr<request_statistics> temp_stats{};
auto* out_stats = stats_ ? stats_ : &temp_stats;
auto tc = api::get_transaction_context(tx);
if(output_records_) {
// call api for query
std::unique_ptr<api::result_set> rs{};
Expand Down
11 changes: 8 additions & 3 deletions src/jogasaki/api/impl/database.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -466,12 +466,13 @@ status database::prepare(

status database::create_executable(std::string_view sql, std::unique_ptr<api::executable_statement>& statement) {
std::shared_ptr<error::error_info> info{};
return create_executable(sql, statement, info);
return create_executable(sql, statement, nullptr, info); // using nullptr because this function is left for testing
}

status database::create_executable(
std::string_view sql,
std::unique_ptr<api::executable_statement>& statement,
std::shared_ptr<transaction_context> tx,
std::shared_ptr<error::error_info>& out,
plan::compile_option const& option
) {
Expand All @@ -481,7 +482,7 @@ status database::create_executable(
}
std::unique_ptr<api::executable_statement> exec{};
auto parameters = std::make_shared<impl::parameter_set>();
if(auto rc = resolve_common(*prepared, parameters, exec, out); rc != status::ok) {
if(auto rc = resolve_common(*prepared, parameters, exec, std::move(tx), out); rc != status::ok) {
return rc;
}
statement = std::make_unique<impl::executable_statement>(
Expand Down Expand Up @@ -656,19 +657,21 @@ status database::resolve(
std::unique_ptr<api::executable_statement>& statement
) {
std::shared_ptr<error::error_info> info{};
return resolve(prepared, parameters, statement, info);
return resolve(prepared, parameters, statement, nullptr, info); //FIXME nullptr is used for testing
}

status database::resolve(
api::statement_handle prepared,
maybe_shared_ptr<api::parameter_set const> parameters,
std::unique_ptr<api::executable_statement>& statement,
std::shared_ptr<transaction_context> tx,
std::shared_ptr<error::error_info>& out
) {
return resolve_common(
*reinterpret_cast<impl::prepared_statement*>(prepared.get()), //NOLINT
std::move(parameters),
statement,
std::move(tx),
out
);
}
Expand All @@ -677,6 +680,7 @@ status database::resolve_common(
impl::prepared_statement const& prepared,
maybe_shared_ptr<api::parameter_set const> parameters,
std::unique_ptr<api::executable_statement>& statement,
std::shared_ptr<transaction_context> tx,
std::shared_ptr<error::error_info>& out
) {
auto resource = std::make_shared<memory::lifo_paged_memory_resource>(&global::page_pool());
Expand All @@ -685,6 +689,7 @@ status database::resolve_common(
ctx->storage_provider(tables_);
ctx->aggregate_provider(aggregate_functions_);
ctx->function_provider(scalar_functions_);
ctx->transaction(std::move(tx));
auto& ps = unsafe_downcast<impl::prepared_statement>(prepared).body();
ctx->variable_provider(ps->host_variables());
ctx->prepared_statement(ps);
Expand Down
3 changes: 3 additions & 0 deletions src/jogasaki/api/impl/database.h
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ class database : public api::database {
[[nodiscard]] status create_executable(
std::string_view sql,
std::unique_ptr<api::executable_statement>& statement,
std::shared_ptr<transaction_context> tx,
std::shared_ptr<error::error_info>& out,
plan::compile_option const& option = {}
);
Expand All @@ -160,6 +161,7 @@ class database : public api::database {
api::statement_handle prepared,
maybe_shared_ptr<api::parameter_set const> parameters,
std::unique_ptr<api::executable_statement>& statement,
std::shared_ptr<transaction_context> tx,
std::shared_ptr<error::error_info>& out
);

Expand Down Expand Up @@ -339,6 +341,7 @@ class database : public api::database {
impl::prepared_statement const& prepared,
maybe_shared_ptr<api::parameter_set const> parameters,
std::unique_ptr<api::executable_statement>& statement,
std::shared_ptr<transaction_context> tx,
std::shared_ptr<error::error_info>& out
);

Expand Down
15 changes: 8 additions & 7 deletions src/jogasaki/api/impl/service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,7 @@ void service::command_execute_statement(
}
std::unique_ptr<jogasaki::api::executable_statement> e{};
std::shared_ptr<error::error_info> err_info{};
if(auto rc = get_impl(*db_).create_executable(sql, e, err_info); rc != jogasaki::status::ok) {
if(auto rc = get_impl(*db_).create_executable(sql, e, get_transaction_context(tx), err_info); rc != jogasaki::status::ok) {
abort_tx(tx, req_info, err_info);
details::error<sql::response::ExecuteResult>(*res, err_info.get(), req_info);
return;
Expand Down Expand Up @@ -510,7 +510,8 @@ void service::command_execute_prepared_statement(

std::unique_ptr<jogasaki::api::executable_statement> e{};
std::shared_ptr<error::error_info> err_info{};
if(auto rc = get_impl(*db_).resolve(handle, std::shared_ptr{std::move(params)}, e, err_info);

if(auto rc = get_impl(*db_).resolve(handle, std::shared_ptr{std::move(params)}, e, get_transaction_context(tx), err_info);
rc != jogasaki::status::ok) {
abort_tx(tx, req_info, err_info);
details::error<sql::response::ExecuteResult>(*res, err_info.get(), req_info);
Expand Down Expand Up @@ -683,7 +684,7 @@ void service::command_explain(

std::unique_ptr<jogasaki::api::executable_statement> e{};
std::shared_ptr<error::error_info> err_info{};
if(auto rc = get_impl(*db_).resolve(handle, std::shared_ptr{std::move(params)}, e, err_info);
if(auto rc = get_impl(*db_).resolve(handle, std::shared_ptr{std::move(params)}, e, nullptr, err_info); //FIXME stop using executable statement for explain
rc != jogasaki::status::ok) {
details::error<sql::response::Explain>(*res, err_info.get(), req_info);
req->status(scheduler::request_detail_status::finishing);
Expand Down Expand Up @@ -737,7 +738,7 @@ void service::command_explain_by_text(
std::unique_ptr<jogasaki::api::executable_statement> e{};
err_info = {};
auto params = jogasaki::api::create_parameter_set();
if(auto rc = get_impl(*db_).resolve(statement, maybe_shared_ptr{params.get()}, e, err_info);
if(auto rc = get_impl(*db_).resolve(statement, maybe_shared_ptr{params.get()}, e, nullptr, err_info); //FIXME stop using executable statement for explain
rc != jogasaki::status::ok) {
details::error<sql::response::Explain>(*res, err_info.get(), req_info);
req->status(scheduler::request_detail_status::finishing);
Expand Down Expand Up @@ -1379,15 +1380,15 @@ void service::execute_query(
std::unique_ptr<jogasaki::api::executable_statement> e{};
std::shared_ptr<error::error_info> err_info{};
if(q.has_sql()) {
if(auto rc = get_impl(*db_).create_executable(q.sql(), e, err_info); rc != jogasaki::status::ok) {
if(auto rc = get_impl(*db_).create_executable(q.sql(), e, get_transaction_context(tx), err_info); rc != jogasaki::status::ok) {
VLOG(log_error) << log_location_prefix << "error in db_->create_executable() : " << rc;
details::error<sql::response::ResultOnly>(*res, err_info.get(), req_info);
return;
}
has_result_records = e->meta() != nullptr;
} else {
jogasaki::api::statement_handle statement{q.sid(), reinterpret_cast<std::uintptr_t>(db_)}; //NOLINT(cppcoreguidelines-pro-type-reinterpret-cast)
if(auto rc = get_impl(*db_).resolve(statement, q.params(), e, err_info); rc != jogasaki::status::ok) {
if(auto rc = get_impl(*db_).resolve(statement, q.params(), e, get_transaction_context(tx), err_info); rc != jogasaki::status::ok) {
details::error<sql::response::ResultOnly>(*res, err_info.get(), req_info);
return;
}
Expand Down Expand Up @@ -1591,7 +1592,7 @@ void service::execute_dump(
BOOST_ASSERT(! q.has_sql()); //NOLINT
jogasaki::api::statement_handle statement{q.sid(), reinterpret_cast<std::uintptr_t>(db_)}; //NOLINT(cppcoreguidelines-pro-type-reinterpret-cast)
std::shared_ptr<error::error_info> err_info{};
if(auto rc = get_impl(*db_).resolve(statement, q.params(), e, err_info); rc != jogasaki::status::ok) {
if(auto rc = get_impl(*db_).resolve(statement, q.params(), e, get_transaction_context(tx), err_info); rc != jogasaki::status::ok) {
details::error<sql::response::ResultOnly>(*res, err_info.get(), req_info);
return;
}
Expand Down
16 changes: 8 additions & 8 deletions src/jogasaki/datastore/register_lob.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,19 @@

namespace jogasaki::datastore {

status register_lob(std::string_view path, kvs::database* db, transaction_context* tx, limestone::api::blob_id_type& out) {
auto* ds = get_datastore(db);
if (tx) {
if (! tx->blob_pool()) {
// TODO exception / error handling with limestone
tx->blob_pool(ds->acquire_blob_pool());
}
} else {
status register_lob(std::string_view path, transaction_context* tx, limestone::api::blob_id_type& out) {
if (! tx) {
// for testing
auto* ds = get_datastore(nullptr);
auto pool = ds->acquire_blob_pool();
out = pool->register_file(boost::filesystem::path{std::string{path}}, false);
return status::ok;
}
auto* ds = get_datastore(tx->database());
if (! tx->blob_pool()) {
// TODO exception / error handling with limestone
tx->blob_pool(ds->acquire_blob_pool());
}
out = tx->blob_pool()->register_file(boost::filesystem::path{std::string{path}}, false);
return status::ok;
}
Expand Down
2 changes: 1 addition & 1 deletion src/jogasaki/datastore/register_lob.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,6 @@ namespace jogasaki::datastore {
* @return status::ok when successful
* @return any other error otherwise
*/
status register_lob(std::string_view path, kvs::database* db, transaction_context* tx, limestone::api::blob_id_type& out);
status register_lob(std::string_view path, transaction_context* tx, limestone::api::blob_id_type& out);

} // namespace jogasaki::datastore
17 changes: 8 additions & 9 deletions src/jogasaki/plan/compiler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -863,7 +863,8 @@ status validate_host_variables(

std::shared_ptr<executor::process::impl::variable_table> create_host_variables(
parameter_set const* parameters,
std::shared_ptr<executor::process::impl::variable_table_info> const& info
std::shared_ptr<executor::process::impl::variable_table_info> const& info,
compiler_context& ctx
) {
if (parameters == nullptr || info == nullptr) {
return {};
Expand All @@ -881,16 +882,14 @@ std::shared_ptr<executor::process::impl::variable_table> create_host_variables(
data::any a{};
if (e.value().type_index() == data::value::index<blob_locator>) {
limestone::api::blob_id_type blob_id{};
//TODO pass kvs db and tx
if (auto res = datastore::register_lob(e.value().ref<blob_locator>().path(), nullptr, nullptr, blob_id); res != status::ok) {
if (auto res = datastore::register_lob(e.value().ref<blob_locator>().path(), ctx.transaction().get(), blob_id); res != status::ok) {
// error handling
std::abort();
}
a = data::any{std::in_place_type<blob_reference>, blob_reference{blob_id, lob_data_provider::datastore}};
} else if (e.value().type_index() == data::value::index<clob_locator>) {
limestone::api::blob_id_type blob_id{};
//TODO pass kvs db and tx
if (auto res = datastore::register_lob(e.value().ref<clob_locator>().path(), nullptr, nullptr, blob_id); res != status::ok) {
if (auto res = datastore::register_lob(e.value().ref<clob_locator>().path(), ctx.transaction().get(), blob_id); res != status::ok) {
// error handling
std::abort();
}
Expand All @@ -917,7 +916,7 @@ void create_mirror_for_write(
std::shared_ptr<mirror_container> const& mirrors,
parameter_set const* parameters
) {
auto vars = create_host_variables(parameters, mirrors->host_variable_info());
auto vars = create_host_variables(parameters, mirrors->host_variable_info(), ctx);
auto& node = unsafe_downcast<statement::write>(*statement);
auto& index = yugawara::binding::extract<yugawara::storage::index>(node.destination());
auto write = std::make_shared<executor::common::write_statement>(
Expand Down Expand Up @@ -953,7 +952,7 @@ void create_mirror_for_empty_statement(
parameter_set const* parameters
) {
auto ops = std::make_shared<executor::common::empty>();
auto vars = create_host_variables(parameters, mirrors->host_variable_info());
auto vars = create_host_variables(parameters, mirrors->host_variable_info(), ctx);
ctx.executable_statement(
std::make_shared<executable_statement>(
std::move(statement),
Expand Down Expand Up @@ -999,7 +998,7 @@ void create_mirror_for_ddl(
default:
throw_exception(std::logic_error{""});
}
auto vars = create_host_variables(parameters, mirrors->host_variable_info());
auto vars = create_host_variables(parameters, mirrors->host_variable_info(), ctx);
ctx.executable_statement(
std::make_shared<executable_statement>(
std::move(statement),
Expand All @@ -1020,7 +1019,7 @@ void create_mirror_for_execute(
std::shared_ptr<mirror_container> const& mirrors,
parameter_set const* parameters
) {
auto vars = create_host_variables(parameters, mirrors->host_variable_info());
auto vars = create_host_variables(parameters, mirrors->host_variable_info(), ctx);
std::unordered_map<takatori::plan::step const*, executor::common::step*> steps{};
yugawara::binding::factory bindings{};
auto mirror = std::make_shared<executor::common::graph>();
Expand Down
14 changes: 4 additions & 10 deletions src/jogasaki/scheduler/flat_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
#include <type_traits>
#include <glog/logging.h>

#include <takatori/util/string_builder.h>
#include <tateyama/common.h>
#include <tateyama/logging_helper.h>
#include <tateyama/task_scheduler/context.h>
Expand Down Expand Up @@ -52,8 +51,6 @@

namespace jogasaki::scheduler {

using takatori::util::string_builder;

void flat_task::bootstrap(tateyama::task_scheduler::context& ctx) {
log_entry << *this;
trace_scope_name("bootstrap"); //NOLINT
Expand Down Expand Up @@ -375,14 +372,11 @@ void flat_task::resolve(tateyama::task_scheduler::context& ctx) {
log_entry << *this;
(void)ctx;
auto& e = sctx_->executable_statement_;

std::shared_ptr<error::error_info> info{};
if(auto res = sctx_->database_->resolve(sctx_->prepared_,
maybe_shared_ptr{sctx_->parameters_}, e); res != status::ok) {
set_error(
*req_context_,
error_code::sql_execution_exception,
string_builder{} << "creating parallel execution plan failed. status:" << res << string_builder::to_string,
res
);
maybe_shared_ptr{sctx_->parameters_}, e, req_context_->transaction(), info); res != status::ok) {
req_context_->error_info(info);
} else {
executor::execute_async_on_context(
*sctx_->database_,
Expand Down
14 changes: 7 additions & 7 deletions test/jogasaki/api/api_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,23 +97,23 @@ using namespace std::string_view_literals;
TEST_F(api_test, syntax_error) {
std::unique_ptr<api::executable_statement> stmt{};
std::shared_ptr<error::error_info> info{};
ASSERT_EQ(status::err_parse_error, get_impl(*db_).create_executable("AAA", stmt, info));
ASSERT_EQ(status::err_parse_error, get_impl(*db_).create_executable("AAA", stmt, nullptr, info));
EXPECT_EQ(error_code::syntax_exception, info->code());
std::cerr << info->message() << std::endl;
}

TEST_F(api_test, missing_table) {
std::unique_ptr<api::executable_statement> stmt{};
std::shared_ptr<error::error_info> info{};
ASSERT_EQ(status::err_compiler_error, get_impl(*db_).create_executable("select * from dummy", stmt, info));
ASSERT_EQ(status::err_compiler_error, get_impl(*db_).create_executable("select * from dummy", stmt, nullptr, info));
EXPECT_EQ(error_code::symbol_analyze_exception, info->code());
std::cerr << info->message() << std::endl;
}

TEST_F(api_test, invalid_column_name) {
std::unique_ptr<api::executable_statement> stmt{};
std::shared_ptr<error::error_info> info{};
ASSERT_EQ(status::err_compiler_error, get_impl(*db_).create_executable("INSERT INTO T0(dummy) VALUES(1)", stmt, info));
ASSERT_EQ(status::err_compiler_error, get_impl(*db_).create_executable("INSERT INTO T0(dummy) VALUES(1)", stmt, nullptr, info));
EXPECT_EQ(error_code::symbol_analyze_exception, info->code());
std::cerr << info->message() << std::endl;
}
Expand All @@ -123,7 +123,7 @@ TEST_F(api_test, inconsistent_type_in_write) {
// analyzer option cast_literals_in_context = false can be used to keep the old behavior
std::unique_ptr<api::executable_statement> stmt{};
std::shared_ptr<error::error_info> info{};
ASSERT_EQ(status::ok, get_impl(*db_).create_executable("INSERT INTO T0(C0) VALUES('X')", stmt, info));
ASSERT_EQ(status::ok, get_impl(*db_).create_executable("INSERT INTO T0(C0) VALUES('X')", stmt, nullptr, info));
auto tx = utils::create_transaction(*db_);
auto err = execute(*tx, *stmt);
ASSERT_EQ(error_code::value_evaluation_exception, err->code());
Expand All @@ -133,7 +133,7 @@ TEST_F(api_test, inconsistent_type_in_write) {
TEST_F(api_test, inconsistent_type_in_query) {
std::unique_ptr<api::executable_statement> stmt{};
std::shared_ptr<error::error_info> info{};
ASSERT_EQ(status::err_compiler_error, get_impl(*db_).create_executable("select C1 from T0 where C1='X'", stmt, info));
ASSERT_EQ(status::err_compiler_error, get_impl(*db_).create_executable("select C1 from T0 where C1='X'", stmt, nullptr, info));
EXPECT_EQ(error_code::type_analyze_exception, info->code());
std::cerr << info->message() << std::endl;
}
Expand Down Expand Up @@ -591,7 +591,7 @@ TEST_F(api_test, unresolved_parameters) {
std::shared_ptr<error::error_info> info{};
auto ps = api::create_parameter_set();
std::unique_ptr<api::executable_statement> exec{};
ASSERT_EQ(status::err_unresolved_host_variable, get_impl(*db_).resolve(prepared, std::shared_ptr{std::move(ps)}, exec, info));
ASSERT_EQ(status::err_unresolved_host_variable, get_impl(*db_).resolve(prepared, std::shared_ptr{std::move(ps)}, exec, nullptr, info));
EXPECT_EQ(error_code::unresolved_placeholder_exception, info->code());
std::cerr << info->message() << std::endl;
ASSERT_EQ(status::ok,db_->destroy_statement(prepared));
Expand All @@ -602,7 +602,7 @@ TEST_F(api_test, unresolved_parameters) {
std::shared_ptr<error::error_info> info{};
auto ps = api::create_parameter_set();
std::unique_ptr<api::executable_statement> exec{};
ASSERT_EQ(status::err_unresolved_host_variable, get_impl(*db_).resolve(query, std::shared_ptr{std::move(ps)}, exec, info));
ASSERT_EQ(status::err_unresolved_host_variable, get_impl(*db_).resolve(query, std::shared_ptr{std::move(ps)}, exec, nullptr, info));
EXPECT_EQ(error_code::unresolved_placeholder_exception, info->code());
std::cerr << info->message() << std::endl;
ASSERT_EQ(status::ok,db_->destroy_statement(query));
Expand Down
Loading

0 comments on commit 5c1a5fd

Please sign in to comment.