diff --git a/src/jogasaki/executor/process/impl/ops/index_join.cpp b/src/jogasaki/executor/process/impl/ops/index_join.cpp index 946fed1e..f4d79b9d 100644 --- a/src/jogasaki/executor/process/impl/ops/index_join.cpp +++ b/src/jogasaki/executor/process/impl/ops/index_join.cpp @@ -14,581 +14,3 @@ * limitations under the License. */ #include "index_join.h" - -#include -#include -#include -#include - -#include -#include -#include -#include - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#include "context_helper.h" -#include "details/encode_key.h" -#include "details/error_abort.h" -#include "index_join_context.h" -#include "operator_base.h" - -namespace jogasaki::executor::process::impl::ops { - -using takatori::util::unsafe_downcast; - -namespace details { - -std::vector create_secondary_key_fields( - yugawara::storage::index const* secondary_idx -) { - if(secondary_idx == nullptr) { - return {}; - } - std::vector ret{}; - ret.reserve(secondary_idx->keys().size()); - for(auto&& f : secondary_idx->keys()) { - ret.emplace_back( - utils::type_for(f.column().type()), - f.column().criteria().nullity().nullable(), - f.direction() == takatori::relation::sort_direction::ascendant ? kvs::spec_key_ascending : kvs::spec_key_descending - ); - } - return ret; -} - -matcher::matcher( - bool use_secondary, - bool for_join_scan, - std::vector const& key_fields, - std::vector const& begin_fields, - kvs::end_point_kind begin_endpoint, - std::vector const& end_fields, - kvs::end_point_kind end_endpoint, - std::vector key_columns, - std::vector value_columns, - std::vector secondary_key_fields -) : - use_secondary_(use_secondary), - for_join_scan_(for_join_scan), - key_fields_(key_fields), - begin_fields_(begin_fields), - begin_endpoint_(begin_endpoint), - end_fields_(end_fields), - end_endpoint_(end_endpoint), - field_mapper_( - use_secondary_, - std::move(key_columns), - std::move(value_columns), - std::move(secondary_key_fields) - ) -{ - (void) for_join_scan_; -} - -matcher::matcher( - bool use_secondary, - std::vector const& begin_fields, - kvs::end_point_kind begin_endpoint, - std::vector const& end_fields, - kvs::end_point_kind end_endpoint, - std::vector key_columns, - std::vector value_columns, - std::vector secondary_key_fields -) : - matcher( - use_secondary, - true, - {}, - begin_fields, - begin_endpoint, - end_fields, - end_endpoint, - std::move(key_columns), - std::move(value_columns), - std::move(secondary_key_fields) - ) -{} - -matcher::matcher( - bool use_secondary, - std::vector const& key_fields, - std::vector key_columns, - std::vector value_columns, - std::vector secondary_key_fields -) : - matcher( - use_secondary, - false, - key_fields, - {}, - {}, - {}, - {}, - std::move(key_columns), - std::move(value_columns), - std::move(secondary_key_fields) - ) -{} - -bool matcher::process_find( - request_context& ctx, - variable_table& input_variables, - variable_table& output_variables, - kvs::storage& primary_stg, - kvs::storage* secondary_stg, - matcher::memory_resource* resource -) { - std::size_t len{}; - std::string msg{}; - if(auto res = details::encode_key(std::addressof(ctx), key_fields_, input_variables, *resource, buf_, len, msg); - res != status::ok) { - status_ = res; - // TODO handle msg - if(res == status::err_integrity_constraint_violation) { - // null is assigned for find condition. Nothing should match. - status_ = status::not_found; - } - return false; - } - std::string_view key{static_cast(buf_.data()), len}; - std::string_view value{}; - - if (! use_secondary_) { - auto res = primary_stg.content_get(*ctx.transaction(), key, value); - status_ = res; - if (res != status::ok) { - utils::modify_concurrent_operation_status(*ctx.transaction(), res, false); - status_ = res; - return false; - } - return field_mapper_(key, value, output_variables.store().ref(), primary_stg, *ctx.transaction(), resource) == - status::ok; - } - auto& stg = use_secondary_ ? *secondary_stg : primary_stg; - if(auto res = stg.content_scan(*ctx.transaction(), - key, kvs::end_point_kind::prefixed_inclusive, - key, kvs::end_point_kind::prefixed_inclusive, - it_ - ); res != status::ok) { - status_ = res; - return false; - } - - // remember parameters for current scan - output_variables_ = std::addressof(output_variables); - primary_storage_ = std::addressof(primary_stg); - tx_ = std::addressof(*ctx.transaction()); - resource_ = resource; - return next(); -} - -bool matcher::process_scan( - request_context& ctx, - variable_table& input_variables, - variable_table& output_variables, - kvs::storage& primary_stg, - kvs::storage* secondary_stg, - matcher::memory_resource* resource -) { - std::size_t begin_len{}; - std::size_t end_len{}; - std::string msg{}; - if(auto res = details::encode_key(std::addressof(ctx), begin_fields_, input_variables, *resource, buf_, begin_len, msg); - res != status::ok) { - status_ = res; - // TODO handle msg - if (res == status::err_integrity_constraint_violation) { - // null is assigned for find condition. Nothing should match. - status_ = status::not_found; - } - return false; - } - if(auto res = details::encode_key(std::addressof(ctx), end_fields_, input_variables, *resource, buf2_, end_len, msg); - res != status::ok) { - status_ = res; - // TODO handle msg - if (res == status::err_integrity_constraint_violation) { - // null is assigned for find condition. Nothing should match. - status_ = status::not_found; - } - return false; - } - std::string_view b{static_cast(buf_.data()), begin_len}; - std::string_view e{static_cast(buf2_.data()), end_len}; - - auto& stg = use_secondary_ ? *secondary_stg : primary_stg; - if(auto res = stg.content_scan(*ctx.transaction(), - b, begin_endpoint_, - e, end_endpoint_, - it_ - ); res != status::ok) { - status_ = res; - return false; - } - - // remember parameters for current scan - output_variables_ = std::addressof(output_variables); - primary_storage_ = std::addressof(primary_stg); - tx_ = std::addressof(*ctx.transaction()); - resource_ = resource; - return next(); -} - - -bool matcher::next() { - if (it_ == nullptr) { - status_ = status::not_found; - return false; - } - while(true) { // loop to skip not_found with key()/value() - auto res = it_->next(); - if(res != status::ok) { - status_ = res; - it_.reset(); - return false; - } - std::string_view key{}; - std::string_view value{}; - if(auto r = it_->read_key(key); r != status::ok) { - utils::modify_concurrent_operation_status(*tx_, r, true); - if(r == status::not_found) { - continue; - } - status_ = r; - it_.reset(); - return false; - } - if(auto r = it_->read_value(value); r != status::ok) { - utils::modify_concurrent_operation_status(*tx_, r, true); - if(r == status::not_found) { - continue; - } - status_ = r; - it_.reset(); - return false; - } - return field_mapper_(key, value, output_variables_->store().ref(), *primary_storage_, *tx_, resource_) == - status::ok; - } -} - -status matcher::result() const noexcept { - return status_; -} - -} // namespace details - -operation_status index_join::process_record(abstract::task_context* context) { - BOOST_ASSERT(context != nullptr); //NOLINT - context_helper ctx{*context}; - auto* p = find_context(index(), ctx.contexts()); - if (! p) { - p = ctx.make_context( - index(), - ctx.variable_table(block_index()), - ctx.variable_table(block_index()), - ctx.database()->get_storage(primary_storage_name_), - use_secondary_ ? ctx.database()->get_storage(secondary_storage_name_) : nullptr, - ctx.transaction(), - for_join_scan_ ? std::make_unique( - use_secondary_, - begin_for_scan_, - begin_endpoint_, - end_for_scan_, - end_endpoint_, - key_columns_, - value_columns_, - secondary_key_fields_ - ) - : std::make_unique( - use_secondary_, - search_key_fields_, - key_columns_, - value_columns_, - secondary_key_fields_ - ), - ctx.resource(), - ctx.varlen_resource() - ); - } - return (*this)(*p, context); -} - -void index_join::nullify_output_variables(accessor::record_ref target) { - for(auto&& f : key_columns_) { - if(f.exists_) { - target.set_null(f.nullity_offset_, true); - } - } - for(auto&& f : value_columns_) { - if(f.exists_) { - target.set_null(f.nullity_offset_, true); - } - } -} -operation_status index_join::operator()(index_join_context& ctx, abstract::task_context* context) { //NOLINT(readability-function-cognitive-complexity) - if (ctx.inactive()) { - return {operation_status_kind::aborted}; - } - auto resource = ctx.varlen_resource(); - nullify_output_variables(ctx.output_variables().store().ref()); - bool matched = for_join_scan_ ? ctx.matcher_->process_scan( - *ctx.req_context(), - ctx.input_variables(), - ctx.output_variables(), - *ctx.primary_stg_, - ctx.secondary_stg_.get(), - resource - ) - : ctx.matcher_->process_find( - *ctx.req_context(), - ctx.input_variables(), - ctx.output_variables(), - *ctx.primary_stg_, - ctx.secondary_stg_.get(), - resource - ); - if(matched || join_kind_ == join_kind::left_outer) { - do { - if (condition_) { - expr::evaluator_context c{ - resource, - ctx.req_context() ? utils::make_function_context(*ctx.req_context()->transaction()) : nullptr - }; - auto r = evaluate_bool(c, evaluator_, ctx.input_variables(), resource); - if (r.error()) { - return handle_expression_error(ctx, r, c); - } - if(! r.to()) { - if(join_kind_ != join_kind::left_outer) { - // inner join: skip record - continue; - } - // left outer join: nullify output variables and send record downstream - nullify_output_variables(ctx.output_variables().store().ref()); - } - } - if (downstream_) { - if(auto st = unsafe_downcast(downstream_.get())->process_record(context); !st) { - ctx.abort(); - return {operation_status_kind::aborted}; - } - } - // clean output variables for next record just in case - nullify_output_variables(ctx.output_variables().store().ref()); - } while(matched && ctx.matcher_->next()); - } - if(auto res = ctx.matcher_->result(); res != status::not_found) { - if(res == status::err_integrity_constraint_violation) { - // match condition saw null. No record should match. - return {}; - } - handle_kvs_errors(*ctx.req_context(), res); - return error_abort(ctx, res); - } - return {}; -} - -operator_kind index_join::kind() const noexcept { - return operator_kind::join_find; -} - -std::string_view index_join::storage_name() const noexcept { - return primary_storage_name_; -} - -void index_join::finish(abstract::task_context* context) { - if (! context) return; - context_helper ctx{*context}; - if (auto* p = find_context(index(), ctx.contexts())) { - p->release(); - } - if (downstream_) { - unsafe_downcast(downstream_.get())->finish(context); - } -} - -index_join::index_join( - join_kind kind, - bool for_join_scan, - operator_base::operator_index_type index, - processor_info const& info, - operator_base::block_index_type block_index, - std::string_view primary_storage_name, - std::string_view secondary_storage_name, - std::vector key_columns, - std::vector value_columns, - std::vector search_key_fields, - std::vector begin_for_scan, - kvs::end_point_kind begin_endpoint, - std::vector end_for_scan, - kvs::end_point_kind end_endpoint, - takatori::util::optional_ptr condition, - std::vector secondary_key_fields, - std::unique_ptr downstream, - variable_table_info const* input_variable_info, - variable_table_info const* output_variable_info -) noexcept: - record_operator(index, info, block_index, input_variable_info, output_variable_info), - join_kind_(kind), - for_join_scan_(for_join_scan), - use_secondary_(! secondary_storage_name.empty()), - primary_storage_name_(primary_storage_name), - secondary_storage_name_(secondary_storage_name), - key_columns_(std::move(key_columns)), - value_columns_(std::move(value_columns)), - search_key_fields_(std::move(search_key_fields)), - begin_for_scan_(std::move(begin_for_scan)), - begin_endpoint_(begin_endpoint), - end_for_scan_(std::move(end_for_scan)), - end_endpoint_(end_endpoint), - condition_(std::move(condition)), - downstream_(std::move(downstream)), - evaluator_(condition_ ? - expr::evaluator{*condition_, info.compiled_info(), info.host_variables()} : - expr::evaluator{} - ), - secondary_key_fields_(std::move(secondary_key_fields)) -{} - -index_join::index_join( - join_kind kind, - operator_base::operator_index_type index, - processor_info const& info, - operator_base::block_index_type block_index, - yugawara::storage::index const& primary_idx, - sequence_view columns, - takatori::tree::tree_fragment_vector const& keys, - takatori::util::optional_ptr condition, - yugawara::storage::index const* secondary_idx, - std::unique_ptr downstream, - variable_table_info const* input_variable_info, - variable_table_info const* output_variable_info -) : - index_join( - kind, - false, // for_join_scan - index, - info, - block_index, - primary_idx.simple_name(), - secondary_idx != nullptr ? secondary_idx->simple_name() : "", - index::create_fields( - primary_idx, - columns, - (output_variable_info != nullptr ? *output_variable_info : info.vars_info_list()[block_index]), - true, - true - ), - index::create_fields( - primary_idx, - columns, - (output_variable_info != nullptr ? *output_variable_info : info.vars_info_list()[block_index]), - false, - true - ), - details::create_search_key_fields( - secondary_idx != nullptr ? *secondary_idx : primary_idx, - keys, - info - ), - {}, - {}, - {}, - {}, - condition, - details::create_secondary_key_fields(secondary_idx), - std::move(downstream), - input_variable_info, - output_variable_info - ) -{} - -index_join::index_join( - join_kind kind, - operator_base::operator_index_type index, - processor_info const& info, - operator_base::block_index_type block_index, - yugawara::storage::index const& primary_idx, - sequence_view columns, - takatori::tree::tree_fragment_vector const& begin_for_scan, - kvs::end_point_kind begin_endpoint, - takatori::tree::tree_fragment_vector const& end_for_scan, - kvs::end_point_kind end_endpoint, - takatori::util::optional_ptr condition, - yugawara::storage::index const* secondary_idx, - std::unique_ptr downstream, - variable_table_info const* input_variable_info, - variable_table_info const* output_variable_info -) : - index_join( - kind, - true, // for_join_scan - index, - info, - block_index, - primary_idx.simple_name(), - secondary_idx != nullptr ? secondary_idx->simple_name() : "", - index::create_fields( - primary_idx, - columns, - (output_variable_info != nullptr ? *output_variable_info : info.vars_info_list()[block_index]), - true, - true - ), - index::create_fields( - primary_idx, - columns, - (output_variable_info != nullptr ? *output_variable_info : info.vars_info_list()[block_index]), - false, - true - ), - {}, - details::create_search_key_fields( - secondary_idx != nullptr ? *secondary_idx : primary_idx, - begin_for_scan, - info - ), - begin_endpoint, - details::create_search_key_fields( - secondary_idx != nullptr ? *secondary_idx : primary_idx, - end_for_scan, - info - ), - end_endpoint, - condition, - details::create_secondary_key_fields(secondary_idx), - std::move(downstream), - input_variable_info, - output_variable_info - ) -{} - -std::vector const& index_join::key_columns() const noexcept { - return key_columns_; -} - -std::vector const& index_join::value_columns() const noexcept { - return value_columns_; -} - -std::vector const& index_join::search_key_fields() const noexcept { - return search_key_fields_; -} - -} // namespace jogasaki::executor::process::impl::ops diff --git a/src/jogasaki/executor/process/impl/ops/index_join.h b/src/jogasaki/executor/process/impl/ops/index_join.h index 0dfd5851..192322f6 100644 --- a/src/jogasaki/executor/process/impl/ops/index_join.h +++ b/src/jogasaki/executor/process/impl/ops/index_join.h @@ -31,6 +31,9 @@ #include #include #include +#include +#include +#include #include #include #include @@ -44,7 +47,11 @@ #include #include #include +#include +#include +#include +#include "details/encode_key.h" #include "details/search_key_field_info.h" #include "index_field_mapper.h" #include "index_join_context.h" @@ -52,53 +59,94 @@ namespace jogasaki::executor::process::impl::ops { +template +class index_join_context; + namespace details { /** * @brief create secondary index key field info. Kept public for testing */ -std::vector create_secondary_key_fields( +inline std::vector create_secondary_key_fields( yugawara::storage::index const* secondary_idx -); - -/** - * @brief matcher object to encapsulate difference between single result find and multiple result - */ -class matcher { +) { + if(secondary_idx == nullptr) { + return {}; + } + std::vector ret{}; + ret.reserve(secondary_idx->keys().size()); + for(auto&& f : secondary_idx->keys()) { + ret.emplace_back( + utils::type_for(f.column().type()), + f.column().criteria().nullity().nullable(), + f.direction() == takatori::relation::sort_direction::ascendant ? kvs::spec_key_ascending : kvs::spec_key_descending + ); + } + return ret; +} + +class match_info_scan { public: - using memory_resource = memory::lifo_paged_memory_resource; - matcher( - bool use_secondary, - bool for_join_scan, - std::vector const& key_fields, - std::vector const& begin_fields, + match_info_scan( + std::vector begin_fields, kvs::end_point_kind begin_endpoint, - std::vector const& end_fields, + std::vector end_fields, kvs::end_point_kind end_endpoint, - std::vector key_columns, - std::vector value_columns, std::vector secondary_key_fields - ); + ) : + begin_fields_(std::move(begin_fields)), + begin_endpoint_(begin_endpoint), + end_fields_(std::move(end_fields)), + end_endpoint_(end_endpoint), + secondary_key_fields_(std::move(secondary_key_fields)) {} + + std::vector begin_fields_; + kvs::end_point_kind begin_endpoint_{}; + std::vector end_fields_; + kvs::end_point_kind end_endpoint_{}; + std::vector secondary_key_fields_{}; +}; - matcher( - bool use_secondary, - std::vector const& key_fields, - std::vector key_columns, - std::vector value_columns, +class match_info_find { +public: + + match_info_find( + std::vector key_fields, std::vector secondary_key_fields - ); + ) : + key_fields_(std::move(key_fields)), + secondary_key_fields_(std::move(secondary_key_fields)) + {} + + std::vector key_fields_; + std::vector secondary_key_fields_{}; +}; + +/** + * @brief matcher object to encapsulate difference between single result find and multiple result + */ +template +class matcher { +public: + using memory_resource = memory::lifo_paged_memory_resource; matcher( bool use_secondary, - std::vector const& begin_fields, - kvs::end_point_kind begin_endpoint, - std::vector const& end_fields, - kvs::end_point_kind end_endpoint, + Info const& info, std::vector key_columns, - std::vector value_columns, - std::vector secondary_key_fields - ); + std::vector value_columns + ) : + use_secondary_(use_secondary), + info_(info), + for_join_scan_(std::is_same_v), + field_mapper_( + use_secondary_, + std::move(key_columns), + std::move(value_columns), + info.secondary_key_fields_ + ) + {} /** * @brief execute the matching for join_find @@ -106,14 +154,58 @@ class matcher { * @return false if match is not successful, check status with result() function to see if the result is * simply not-found or other error happened. */ - [[nodiscard]] bool process_find( + template + [[nodiscard]] std::enable_if_t, bool> process( request_context& ctx, variable_table& input_variables, variable_table& output_variables, kvs::storage& primary_stg, kvs::storage* secondary_stg, memory_resource* resource = nullptr - ); + ) { + std::size_t len{}; + std::string msg{}; + if(auto res = details::encode_key(std::addressof(ctx), info_.key_fields_, input_variables, *resource, buf_, len, msg); + res != status::ok) { + status_ = res; + // TODO handle msg + if(res == status::err_integrity_constraint_violation) { + // null is assigned for find condition. Nothing should match. + status_ = status::not_found; + } + return false; + } + std::string_view key{static_cast(buf_.data()), len}; + std::string_view value{}; + + if (! use_secondary_) { + auto res = primary_stg.content_get(*ctx.transaction(), key, value); + status_ = res; + if (res != status::ok) { + utils::modify_concurrent_operation_status(*ctx.transaction(), res, false); + status_ = res; + return false; + } + return field_mapper_(key, value, output_variables.store().ref(), primary_stg, *ctx.transaction(), resource) == + status::ok; + } + auto& stg = use_secondary_ ? *secondary_stg : primary_stg; + if(auto res = stg.content_scan(*ctx.transaction(), + key, kvs::end_point_kind::prefixed_inclusive, + key, kvs::end_point_kind::prefixed_inclusive, + it_ + ); res != status::ok) { + status_ = res; + return false; + } + + // remember parameters for current scan + output_variables_ = std::addressof(output_variables); + primary_storage_ = std::addressof(primary_stg); + tx_ = std::addressof(*ctx.transaction()); + resource_ = resource; + return next(); + } /** * @brief execute the matching for join_scan @@ -121,14 +213,59 @@ class matcher { * @return false if match is not successful, check status with result() function to see if the result is * simply not-found or other error happened. */ - [[nodiscard]] bool process_scan( + template + [[nodiscard]] std::enable_if_t, bool> process( request_context& ctx, variable_table& input_variables, variable_table& output_variables, kvs::storage& primary_stg, kvs::storage* secondary_stg, memory_resource* resource = nullptr - ); + ) { + std::size_t begin_len{}; + std::size_t end_len{}; + std::string msg{}; + if(auto res = details::encode_key(std::addressof(ctx), info_.begin_fields_, input_variables, *resource, buf_, begin_len, msg); + res != status::ok) { + status_ = res; + // TODO handle msg + if (res == status::err_integrity_constraint_violation) { + // null is assigned for find condition. Nothing should match. + status_ = status::not_found; + } + return false; + } + if(auto res = details::encode_key(std::addressof(ctx), info_.end_fields_, input_variables, *resource, buf2_, end_len, msg); + res != status::ok) { + status_ = res; + // TODO handle msg + if (res == status::err_integrity_constraint_violation) { + // null is assigned for find condition. Nothing should match. + status_ = status::not_found; + } + return false; + } + std::string_view b{static_cast(buf_.data()), begin_len}; + std::string_view e{static_cast(buf2_.data()), end_len}; + + auto& stg = use_secondary_ ? *secondary_stg : primary_stg; + if(auto res = stg.content_scan(*ctx.transaction(), + b, info_.begin_endpoint_, + e, info_.end_endpoint_, + it_ + ); res != status::ok) { + status_ = res; + return false; + } + + // remember parameters for current scan + output_variables_ = std::addressof(output_variables); + primary_storage_ = std::addressof(primary_stg); + tx_ = std::addressof(*ctx.transaction()); + resource_ = resource; + return next(); + } + /** * @brief retrieve next match @@ -136,7 +273,43 @@ class matcher { * @return false if match is not successful, check status with result() function to see if the result is * simply not-found or other error happened. */ - bool next(); + bool next() { + if (it_ == nullptr) { + status_ = status::not_found; + return false; + } + while(true) { // loop to skip not_found with key()/value() + auto res = it_->next(); + if(res != status::ok) { + status_ = res; + it_.reset(); + return false; + } + std::string_view key{}; + std::string_view value{}; + if(auto r = it_->read_key(key); r != status::ok) { + utils::modify_concurrent_operation_status(*tx_, r, true); + if(r == status::not_found) { + continue; + } + status_ = r; + it_.reset(); + return false; + } + if(auto r = it_->read_value(value); r != status::ok) { + utils::modify_concurrent_operation_status(*tx_, r, true); + if(r == status::not_found) { + continue; + } + status_ = r; + it_.reset(); + return false; + } + return field_mapper_(key, value, output_variables_->store().ref(), *primary_storage_, *tx_, resource_) == + status::ok; + } + } + /** * @brief retrieve the status code of the last match execution @@ -145,16 +318,14 @@ class matcher { * @return status::not_found if match was not successful due to missing target record * @return other error (e.g. status::err_aborted_retryable) occurred when accessing kvs */ - [[nodiscard]] status result() const noexcept; + [[nodiscard]] status result() const noexcept { + return status_; + } private: bool use_secondary_{}; + Info const& info_; bool for_join_scan_{}; - std::vector const& key_fields_; - std::vector const& begin_fields_; - kvs::end_point_kind begin_endpoint_{}; - std::vector const& end_fields_; - kvs::end_point_kind end_endpoint_{}; data::aligned_buffer buf_{}; data::aligned_buffer buf2_{}; status status_{status::ok}; @@ -172,9 +343,10 @@ class matcher { /** * @brief index_join class common for join_find/join_scan operators */ +template class index_join : public record_operator { public: - friend class index_join_context; + friend class index_join_context; using join_kind = takatori::relation::join_kind; @@ -184,59 +356,130 @@ class index_join : public record_operator { */ index_join() = default; - /** - * @brief common constructor for join_scan and join_find - * @param kind the kind of the join - * @param for_join_scan whether this object is used for join_scan (otherwise for join_find) - * @param index the index to identify the operator in the process - * @param info processor's information where this operation is contained - * @param block_index the index of the block that this operation belongs to - * @param primary_storage_name the storage name to find - * @param key_columns column information for key fields - * @param value_columns column information for value fields - * @param search_key_fields key_field information - * @param condition additional join condition - * @param downstream downstream operator invoked after this operation. Pass nullptr if such dispatch is not needed. - */ + template , void>> index_join( join_kind kind, - bool for_join_scan, - operator_index_type index, + operator_base::operator_index_type index, processor_info const& info, - block_index_type block_index, + operator_base::block_index_type block_index, std::string_view primary_storage_name, std::string_view secondary_storage_name, std::vector key_columns, std::vector value_columns, std::vector search_key_fields, + takatori::util::optional_ptr condition, + std::vector secondary_key_fields, + std::unique_ptr downstream, + variable_table_info const* input_variable_info = nullptr, + variable_table_info const* output_variable_info = nullptr + ) noexcept: + record_operator(index, info, block_index, input_variable_info, output_variable_info), + join_kind_(kind), + for_join_scan_(false), + use_secondary_(! secondary_storage_name.empty()), + primary_storage_name_(primary_storage_name), + secondary_storage_name_(secondary_storage_name), + key_columns_(std::move(key_columns)), + value_columns_(std::move(value_columns)), + match_info_( + std::move(search_key_fields), + std::move(secondary_key_fields) + ), + condition_(std::move(condition)), + downstream_(std::move(downstream)), + evaluator_(condition_ ? + expr::evaluator{*condition_, info.compiled_info(), info.host_variables()} : + expr::evaluator{} + ) + {} + + template , void>> + index_join( + join_kind kind, + operator_base::operator_index_type index, + processor_info const& info, + operator_base::block_index_type block_index, + std::string_view primary_storage_name, + std::string_view secondary_storage_name, + std::vector key_columns, + std::vector value_columns, std::vector begin_for_scan, kvs::end_point_kind begin_endpoint, std::vector end_for_scan, kvs::end_point_kind end_endpoint, takatori::util::optional_ptr condition, std::vector secondary_key_fields, - std::unique_ptr downstream = nullptr, + std::unique_ptr downstream, variable_table_info const* input_variable_info = nullptr, variable_table_info const* output_variable_info = nullptr - ) noexcept; + ) noexcept: + record_operator(index, info, block_index, input_variable_info, output_variable_info), + join_kind_(kind), + for_join_scan_(true), + use_secondary_(! secondary_storage_name.empty()), + primary_storage_name_(primary_storage_name), + secondary_storage_name_(secondary_storage_name), + key_columns_(std::move(key_columns)), + value_columns_(std::move(value_columns)), + match_info_( + std::move(begin_for_scan), + begin_endpoint, + std::move(end_for_scan), + end_endpoint, + std::move(secondary_key_fields) + ), + condition_(std::move(condition)), + downstream_(std::move(downstream)), + evaluator_(condition_ ? + expr::evaluator{*condition_, info.compiled_info(), info.host_variables()} : + expr::evaluator{} + ) + {} - /** - * @brief create new object for join_find from takatori objects - * @param kind the kind of the join - * @param index the index to identify the operator in the process - * @param info processor's information where this operation is contained - * @param block_index the index of the block that this operation belongs to - * @param storage_name the storage name to find - * @param columns takatori join_find column information - * @param keys takatori join_find key information - * @param condition additional join condition - * @param downstream downstream operator invoked after this operation. Pass nullptr if such dispatch is not needed. - */ +/* index_join( join_kind kind, + bool for_join_scan, operator_index_type index, processor_info const& info, block_index_type block_index, + std::string_view primary_storage_name, + std::string_view secondary_storage_name, + std::vector key_columns, + std::vector value_columns, + Info info, + takatori::util::optional_ptr condition, + std::unique_ptr downstream = nullptr, + variable_table_info const* input_variable_info = nullptr, + variable_table_info const* output_variable_info = nullptr + ) noexcept : + index_join( + kind, + for_join_scan, + index, + info, + block_index, + primary_storage_name, + secondary_storage_name, + std::move(key_columns), + std::move(value_columns), + info_( + + ), + std::move(condition), + std::move(downstream), + input_variable_info, + output_variable_info + ) + {} + */ + + template , void>> + index_join( + join_kind kind, + operator_base::operator_index_type index, + processor_info const& info, + operator_base::block_index_type block_index, yugawara::storage::index const& primary_idx, sequence_view columns, takatori::tree::tree_fragment_vector const& keys, @@ -245,27 +488,49 @@ class index_join : public record_operator { std::unique_ptr downstream, variable_table_info const* input_variable_info = nullptr, variable_table_info const* output_variable_info = nullptr - ); - - /** - * @brief create new object for join_scan from takatori objects - * @param kind the kind of the join - * @param index the index to identify the operator in the process - * @param info processor's information where this operation is contained - * @param block_index the index of the block that this operation belongs to - * @param storage_name the storage name to find - * @param columns takatori join_find column information - * @param keys takatori join_find key information - * @param condition additional join condition - * @param downstream downstream operator invoked after this operation. Pass nullptr if such dispatch is not needed. - */ + ) : + index_join( + kind, + index, + info, + block_index, + primary_idx.simple_name(), + secondary_idx != nullptr ? secondary_idx->simple_name() : "", + index::create_fields( + primary_idx, + columns, + (output_variable_info != nullptr ? *output_variable_info : info.vars_info_list()[block_index]), + true, + true + ), + index::create_fields( + primary_idx, + columns, + (output_variable_info != nullptr ? *output_variable_info : info.vars_info_list()[block_index]), + false, + true + ), + details::create_search_key_fields( + secondary_idx != nullptr ? *secondary_idx : primary_idx, + keys, + info + ), + condition, + details::create_secondary_key_fields(secondary_idx), + std::move(downstream), + input_variable_info, + output_variable_info + ) + {} + + template , void>> index_join( join_kind kind, - operator_index_type index, + operator_base::operator_index_type index, processor_info const& info, - block_index_type block_index, + operator_base::block_index_type block_index, yugawara::storage::index const& primary_idx, - sequence_view columns, + sequence_view columns, takatori::tree::tree_fragment_vector const& begin_for_scan, kvs::end_point_kind begin_endpoint, takatori::tree::tree_fragment_vector const& end_for_scan, @@ -275,14 +540,72 @@ class index_join : public record_operator { std::unique_ptr downstream, variable_table_info const* input_variable_info = nullptr, variable_table_info const* output_variable_info = nullptr - ); + ) : + index_join( + kind, + index, + info, + block_index, + primary_idx.simple_name(), + secondary_idx != nullptr ? secondary_idx->simple_name() : "", + index::create_fields( + primary_idx, + columns, + (output_variable_info != nullptr ? *output_variable_info : info.vars_info_list()[block_index]), + true, + true + ), + index::create_fields( + primary_idx, + columns, + (output_variable_info != nullptr ? *output_variable_info : info.vars_info_list()[block_index]), + false, + true + ), + details::create_search_key_fields( + secondary_idx != nullptr ? *secondary_idx : primary_idx, + begin_for_scan, + info + ), + begin_endpoint, + details::create_search_key_fields( + secondary_idx != nullptr ? *secondary_idx : primary_idx, + end_for_scan, + info + ), + end_endpoint, + condition, + details::create_secondary_key_fields(secondary_idx), + std::move(downstream), + input_variable_info, + output_variable_info + ) + {} /** * @brief create context (if needed) and process record * @param context task-wide context used to create operator context * @return status of the operation */ - operation_status process_record(abstract::task_context* context) override; + operation_status process_record(abstract::task_context* context) override { + BOOST_ASSERT(context != nullptr); //NOLINT + context_helper ctx{*context}; + auto* p = find_context>(index(), ctx.contexts()); + if (! p) { + p = ctx.make_context>( + index(), + ctx.variable_table(block_index()), + ctx.variable_table(block_index()), + ctx.database()->get_storage(primary_storage_name_), + use_secondary_ ? ctx.database()->get_storage(secondary_storage_name_) : nullptr, + ctx.transaction(), + std::make_unique>(use_secondary_, match_info_, key_columns_, value_columns_), + ctx.resource(), + ctx.varlen_resource() + ); + } + return (*this)(*p, context); + } /** * @brief process record with context object @@ -291,38 +614,109 @@ class index_join : public record_operator { * @param context task context for the downstream, can be nullptr if downstream doesn't require. * @return status of the operation */ - operation_status operator()(index_join_context& ctx, abstract::task_context* context = nullptr); + operation_status operator()(index_join_context& ctx, abstract::task_context* context = nullptr) { //NOLINT(readability-function-cognitive-complexity) + if (ctx.inactive()) { + return {operation_status_kind::aborted}; + } + auto resource = ctx.varlen_resource(); + nullify_output_variables(ctx.output_variables().store().ref()); + bool matched = ctx.matcher_->template process( + *ctx.req_context(), + ctx.input_variables(), + ctx.output_variables(), + *ctx.primary_stg_, + ctx.secondary_stg_.get(), + resource + ); + if(matched || join_kind_ == join_kind::left_outer) { + do { + if (condition_) { + expr::evaluator_context c{ + resource, + ctx.req_context() ? utils::make_function_context(*ctx.req_context()->transaction()) : nullptr + }; + auto r = evaluate_bool(c, evaluator_, ctx.input_variables(), resource); + if (r.error()) { + return handle_expression_error(ctx, r, c); + } + if(! r.template to()) { + if(join_kind_ != join_kind::left_outer) { + // inner join: skip record + continue; + } + // left outer join: nullify output variables and send record downstream + nullify_output_variables(ctx.output_variables().store().ref()); + } + } + if (downstream_) { + if(auto st = unsafe_downcast(downstream_.get())->process_record(context); !st) { + ctx.abort(); + return {operation_status_kind::aborted}; + } + } + // clean output variables for next record just in case + nullify_output_variables(ctx.output_variables().store().ref()); + } while(matched && ctx.matcher_->next()); + } + if(auto res = ctx.matcher_->result(); res != status::not_found) { + if(res == status::err_integrity_constraint_violation) { + // match condition saw null. No record should match. + return {}; + } + handle_kvs_errors(*ctx.req_context(), res); + return error_abort(ctx, res); + } + return {}; + } /** * @see operator_base::kind() */ - [[nodiscard]] operator_kind kind() const noexcept override; + [[nodiscard]] operator_kind kind() const noexcept override { + return operator_kind::join_find; + } /** * @brief return storage name * @return the storage name of the find target */ - [[nodiscard]] std::string_view storage_name() const noexcept; + [[nodiscard]] std::string_view storage_name() const noexcept { + return primary_storage_name_; + } /** * @see operator_base::finish() */ - void finish(abstract::task_context*) override; + void finish(abstract::task_context* context) override { + if (! context) return; + context_helper ctx{*context}; + if (auto* p = find_context>(index(), ctx.contexts())) { + p->release(); + } + if (downstream_) { + unsafe_downcast(downstream_.get())->finish(context); + } + } /** * @brief accessor to key columns */ - [[nodiscard]] std::vector const& key_columns() const noexcept; + [[nodiscard]] std::vector const& key_columns() const noexcept { + return key_columns_; + } /** * @brief accessor to value columns */ - [[nodiscard]] std::vector const& value_columns() const noexcept; + [[nodiscard]] std::vector const& value_columns() const noexcept { + return value_columns_; + } - /** - * @brief accessor to key fields - */ - [[nodiscard]] std::vector const& search_key_fields() const noexcept; +/* + [[nodiscard]] std::vector const& search_key_fields() const noexcept { + return search_key_fields_; + } + */ private: join_kind join_kind_{}; @@ -332,17 +726,23 @@ class index_join : public record_operator { std::string secondary_storage_name_{}; std::vector key_columns_{}; std::vector value_columns_{}; - std::vector search_key_fields_{}; - std::vector begin_for_scan_{}; - kvs::end_point_kind begin_endpoint_ = kvs::end_point_kind::unbound; - std::vector end_for_scan_{}; - kvs::end_point_kind end_endpoint_ = kvs::end_point_kind::unbound; + Info match_info_{}; takatori::util::optional_ptr condition_{}; std::unique_ptr downstream_{}; expr::evaluator evaluator_{}; - std::vector secondary_key_fields_{}; - void nullify_output_variables(accessor::record_ref target); + void nullify_output_variables(accessor::record_ref target) { + for(auto&& f : key_columns_) { + if(f.exists_) { + target.set_null(f.nullity_offset_, true); + } + } + for(auto&& f : value_columns_) { + if(f.exists_) { + target.set_null(f.nullity_offset_, true); + } + } + } }; } // namespace jogasaki::executor::process::impl::ops diff --git a/src/jogasaki/executor/process/impl/ops/index_join_context.cpp b/src/jogasaki/executor/process/impl/ops/index_join_context.cpp index 40b031d0..7b12ba77 100644 --- a/src/jogasaki/executor/process/impl/ops/index_join_context.cpp +++ b/src/jogasaki/executor/process/impl/ops/index_join_context.cpp @@ -15,43 +15,3 @@ */ #include "index_join_context.h" -#include - -#include "context_base.h" -#include "index_join.h" - -namespace jogasaki::executor::process::impl::ops { - -index_join_context::index_join_context( - class abstract::task_context* ctx, - variable_table& input_variables, - variable_table& output_variables, - std::unique_ptr primary_stg, - std::unique_ptr secondary_stg, - transaction_context* tx, - std::unique_ptr matcher, - context_base::memory_resource* resource, - context_base::memory_resource* varlen_resource -) : - context_base(ctx, input_variables, output_variables, resource, varlen_resource), - primary_stg_(std::move(primary_stg)), - secondary_stg_(std::move(secondary_stg)), - tx_(tx), - matcher_(std::move(matcher)) -{} - -operator_kind index_join_context::kind() const noexcept { - return operator_kind::join_find; -} - -void index_join_context::release() { - //TODO -} - -transaction_context* index_join_context::transaction() const noexcept { - return tx_; -} - -} - - diff --git a/src/jogasaki/executor/process/impl/ops/index_join_context.h b/src/jogasaki/executor/process/impl/ops/index_join_context.h index be8193d3..ce241594 100644 --- a/src/jogasaki/executor/process/impl/ops/index_join_context.h +++ b/src/jogasaki/executor/process/impl/ops/index_join_context.h @@ -33,15 +33,20 @@ namespace jogasaki::executor::process::impl::ops { namespace details { +template class matcher; } +template +class index_join; + /** * @brief index_join_context context */ +template class index_join_context : public context_base { public: - friend class index_join; + friend class index_join; /** * @brief create empty object */ @@ -57,22 +62,34 @@ class index_join_context : public context_base { std::unique_ptr primary_stg, std::unique_ptr secondary_stg, transaction_context* tx, - std::unique_ptr matcher, + std::unique_ptr> matcher, memory_resource* resource, memory_resource* varlen_resource - ); + ) : + context_base(ctx, input_variables, output_variables, resource, varlen_resource), + primary_stg_(std::move(primary_stg)), + secondary_stg_(std::move(secondary_stg)), + tx_(tx), + matcher_(std::move(matcher)) + {} - [[nodiscard]] operator_kind kind() const noexcept override; + [[nodiscard]] operator_kind kind() const noexcept override { + return operator_kind::join_find; + } - void release() override; + void release() override { + //TODO + } - [[nodiscard]] transaction_context* transaction() const noexcept; + [[nodiscard]] transaction_context* transaction() const noexcept { + return tx_; + } private: std::unique_ptr primary_stg_{}; std::unique_ptr secondary_stg_{}; transaction_context* tx_{}; - std::unique_ptr matcher_{}; + std::unique_ptr> matcher_{}; }; } diff --git a/src/jogasaki/executor/process/impl/ops/operator_builder.cpp b/src/jogasaki/executor/process/impl/ops/operator_builder.cpp index 7ad6244c..35541c2f 100644 --- a/src/jogasaki/executor/process/impl/ops/operator_builder.cpp +++ b/src/jogasaki/executor/process/impl/ops/operator_builder.cpp @@ -149,7 +149,7 @@ std::unique_ptr operator_builder::operator()(const relation::join auto& secondary_or_primary_index = yugawara::binding::extract(node.source()); auto& table = secondary_or_primary_index.table(); auto primary = table.owner()->find_primary_index(table); - return std::make_unique( + return std::make_unique>( node.operator_kind(), index_++, *info_, @@ -169,7 +169,7 @@ std::unique_ptr operator_builder::operator()(const relation::join auto& secondary_or_primary_index = yugawara::binding::extract(node.source()); auto& table = secondary_or_primary_index.table(); auto primary = table.owner()->find_primary_index(table); - return std::make_unique( + return std::make_unique>( node.operator_kind(), index_++, *info_,