diff --git a/cpp-ch/local-engine/Common/AggregateUtil.cpp b/cpp-ch/local-engine/Common/AggregateUtil.cpp index 851dd2e7fe3d1..a6d4e1a148592 100644 --- a/cpp-ch/local-engine/Common/AggregateUtil.cpp +++ b/cpp-ch/local-engine/Common/AggregateUtil.cpp @@ -15,8 +15,11 @@ * limitations under the License. */ +#include "AggregateUtil.h" +#include #include #include +#include #include #include #include @@ -26,8 +29,26 @@ namespace DB { namespace ErrorCodes { - extern const int LOGICAL_ERROR; - extern const int UNKNOWN_AGGREGATED_DATA_VARIANT; +extern const int LOGICAL_ERROR; +extern const int UNKNOWN_AGGREGATED_DATA_VARIANT; +} + +namespace Setting +{ +extern const SettingsUInt64 max_bytes_before_external_group_by; +extern const SettingsBool optimize_group_by_constant_keys; +extern const SettingsUInt64 min_free_disk_space_for_temporary_data; +extern const SettingsMaxThreads max_threads; +extern const SettingsBool empty_result_for_aggregation_by_empty_set; +extern const SettingsUInt64 group_by_two_level_threshold_bytes; +extern const SettingsOverflowModeGroupBy group_by_overflow_mode; +extern const SettingsUInt64 max_rows_to_group_by; +extern const SettingsBool enable_memory_bound_merging_of_aggregation_results; +extern const SettingsUInt64 aggregation_in_order_max_block_bytes; +extern const SettingsUInt64 group_by_two_level_threshold; +extern const SettingsFloat min_hit_rate_to_use_consecutive_keys_optimization; +extern const SettingsMaxThreads max_threads; +extern const SettingsUInt64 max_block_size; } template @@ -39,24 +60,23 @@ static Int32 extractMethodBucketsNum(Method & /*method*/) Int32 GlutenAggregatorUtil::getBucketsNum(AggregatedDataVariants & data_variants) { if (!data_variants.isTwoLevel()) - { return 0; - } - + Int32 buckets_num = 0; #define M(NAME) \ - else if (data_variants.type == AggregatedDataVariants::Type::NAME) \ - buckets_num = extractMethodBucketsNum(*data_variants.NAME); + else if (data_variants.type == AggregatedDataVariants::Type::NAME) buckets_num = extractMethodBucketsNum(*data_variants.NAME); - if (false) {} // NOLINT + if (false) + { + } // NOLINT APPLY_FOR_VARIANTS_TWO_LEVEL(M) #undef M - else - throw Exception(ErrorCodes::UNKNOWN_AGGREGATED_DATA_VARIANT, "Unknown aggregated data variant"); + else throw Exception(ErrorCodes::UNKNOWN_AGGREGATED_DATA_VARIANT, "Unknown aggregated data variant"); return buckets_num; } -std::optional GlutenAggregatorUtil::safeConvertOneBucketToBlock(Aggregator & aggregator, AggregatedDataVariants & variants, Arena * arena, bool final, Int32 bucket) +std::optional GlutenAggregatorUtil::safeConvertOneBucketToBlock( + Aggregator & aggregator, AggregatedDataVariants & variants, Arena * arena, bool final, Int32 bucket) { if (!variants.isTwoLevel()) return {}; @@ -65,7 +85,7 @@ std::optional GlutenAggregatorUtil::safeConvertOneBucketToBlock(Aggregato return aggregator.convertOneBucketToBlock(variants, arena, final, bucket); } -template +template static void releaseOneBucket(Method & method, Int32 bucket) { method.data.impls[bucket].clearAndShrink(); @@ -77,29 +97,26 @@ void GlutenAggregatorUtil::safeReleaseOneBucket(AggregatedDataVariants & variant return; if (bucket >= getBucketsNum(variants)) return; -#define M(NAME) \ - else if (variants.type == AggregatedDataVariants::Type::NAME) \ - releaseOneBucket(*variants.NAME, bucket); +#define M(NAME) else if (variants.type == AggregatedDataVariants::Type::NAME) releaseOneBucket(*variants.NAME, bucket); - if (false) {} // NOLINT + if (false) + { + } // NOLINT APPLY_FOR_VARIANTS_TWO_LEVEL(M) #undef M - else - throw Exception(ErrorCodes::UNKNOWN_AGGREGATED_DATA_VARIANT, "Unknown aggregated data variant"); - + else throw Exception(ErrorCodes::UNKNOWN_AGGREGATED_DATA_VARIANT, "Unknown aggregated data variant"); } } namespace local_engine { -AggregateDataBlockConverter::AggregateDataBlockConverter(DB::Aggregator & aggregator_, DB::AggregatedDataVariantsPtr data_variants_, bool final_) +AggregateDataBlockConverter::AggregateDataBlockConverter( + DB::Aggregator & aggregator_, DB::AggregatedDataVariantsPtr data_variants_, bool final_) : aggregator(aggregator_), data_variants(std::move(data_variants_)), final(final_) { if (data_variants->isTwoLevel()) - { buckets_num = DB::GlutenAggregatorUtil::getBucketsNum(*data_variants); - } else if (data_variants->size()) buckets_num = 1; else @@ -168,4 +185,93 @@ DB::Block AggregateDataBlockConverter::next() output_blocks.pop_front(); return block; } + +DB::Aggregator::Params AggregatorParamsHelper::buildParams( + DB::ContextPtr context, + const DB::Names & grouping_keys, + const DB::AggregateDescriptions & agg_descriptions, + Mode mode, + Algorithm algorithm) +{ + const auto & settings = context->getSettingsRef(); + size_t max_rows_to_group_by = mode == Mode::PARTIAL_TO_FINISHED ? 0 : settings[DB::Setting::max_rows_to_group_by]; + DB::OverflowMode group_by_overflow_mode = settings[DB::Setting::group_by_overflow_mode]; + size_t group_by_two_level_threshold + = algorithm == Algorithm::GlutenGraceAggregate ? static_cast(settings[DB::Setting::group_by_two_level_threshold]) : 0; + size_t group_by_two_level_threshold_bytes = algorithm == Algorithm::GlutenGraceAggregate + ? 0 + : (mode == Mode::PARTIAL_TO_FINISHED ? 0 : static_cast(settings[DB::Setting::group_by_two_level_threshold_bytes])); + size_t max_bytes_before_external_group_by = algorithm == Algorithm::GlutenGraceAggregate + ? 0 + : (mode == Mode::PARTIAL_TO_FINISHED ? 0 : static_cast(settings[DB::Setting::max_bytes_before_external_group_by])); + bool empty_result_for_aggregation_by_empty_set = algorithm == Algorithm::GlutenGraceAggregate + ? false + : (mode == Mode::PARTIAL_TO_FINISHED ? false : static_cast(settings[DB::Setting::empty_result_for_aggregation_by_empty_set])); + DB::TemporaryDataOnDiskScopePtr tmp_data_scope = algorithm == Algorithm::GlutenGraceAggregate ? nullptr : context->getTempDataOnDisk(); + size_t max_threads = settings[DB::Setting::max_threads]; + size_t min_free_disk_space + = algorithm == Algorithm::GlutenGraceAggregate ? 0 : settings[DB::Setting::min_free_disk_space_for_temporary_data]; + bool compile_aggregate_expressions = mode == Mode::PARTIAL_TO_FINISHED ? false : true; + size_t min_count_to_compile_aggregate_expression = mode == Mode::PARTIAL_TO_FINISHED ? 0 : 3; + size_t max_block_size = PODArrayUtil::adjustMemoryEfficientSize(settings[DB::Setting::max_block_size]); + bool enable_prefetch = mode == Mode::PARTIAL_TO_FINISHED ? false : true; + bool only_merge = mode == Mode::PARTIAL_TO_FINISHED; + bool optimize_group_by_constant_keys + = mode == Mode::PARTIAL_TO_FINISHED ? false : settings[DB::Setting::optimize_group_by_constant_keys]; + double min_hit_rate_to_use_consecutive_keys_optimization = settings[DB::Setting::min_hit_rate_to_use_consecutive_keys_optimization]; + DB::Aggregator::Params params( + grouping_keys, + agg_descriptions, + false, + max_rows_to_group_by, + group_by_overflow_mode, + group_by_two_level_threshold, + group_by_two_level_threshold_bytes, + max_bytes_before_external_group_by, + empty_result_for_aggregation_by_empty_set, + tmp_data_scope, + max_threads, + min_free_disk_space, + compile_aggregate_expressions, + min_count_to_compile_aggregate_expression, + max_block_size, + enable_prefetch, + only_merge, + optimize_group_by_constant_keys, + min_hit_rate_to_use_consecutive_keys_optimization, + {}); + return params; +} + + +#define COMPARE_FIELD(field) \ + if (lhs.field != rhs.field) \ + { \ + LOG_ERROR(getLogger("AggregatorParamsHelper"), "Aggregator::Params field " #field " is not equal. {}/{}", lhs.field, rhs.field); \ + return false; \ + } +bool AggregatorParamsHelper::compare(const DB::Aggregator::Params & lhs, const DB::Aggregator::Params & rhs) +{ + COMPARE_FIELD(overflow_row); + COMPARE_FIELD(max_rows_to_group_by); + COMPARE_FIELD(group_by_overflow_mode); + COMPARE_FIELD(group_by_two_level_threshold); + COMPARE_FIELD(group_by_two_level_threshold_bytes); + COMPARE_FIELD(max_bytes_before_external_group_by); + COMPARE_FIELD(empty_result_for_aggregation_by_empty_set); + COMPARE_FIELD(max_threads); + COMPARE_FIELD(min_free_disk_space); + COMPARE_FIELD(compile_aggregate_expressions); + if ((lhs.tmp_data_scope == nullptr) != (rhs.tmp_data_scope == nullptr)) + { + LOG_ERROR(getLogger("AggregatorParamsHelper"), "Aggregator::Params field tmp_data_scope is not equal."); + return false; + } + COMPARE_FIELD(min_count_to_compile_aggregate_expression); + COMPARE_FIELD(enable_prefetch); + COMPARE_FIELD(only_merge); + COMPARE_FIELD(optimize_group_by_constant_keys); + COMPARE_FIELD(min_hit_rate_to_use_consecutive_keys_optimization); + return true; +} } diff --git a/cpp-ch/local-engine/Common/AggregateUtil.h b/cpp-ch/local-engine/Common/AggregateUtil.h index b14cd59c54901..380e1ea355398 100644 --- a/cpp-ch/local-engine/Common/AggregateUtil.h +++ b/cpp-ch/local-engine/Common/AggregateUtil.h @@ -25,7 +25,8 @@ class GlutenAggregatorUtil { public: static Int32 getBucketsNum(AggregatedDataVariants & data_variants); - static std::optional safeConvertOneBucketToBlock(Aggregator & aggregator, AggregatedDataVariants & variants, Arena * arena, bool final, Int32 bucket); + static std::optional + safeConvertOneBucketToBlock(Aggregator & aggregator, AggregatedDataVariants & variants, Arena * arena, bool final, Int32 bucket); static void safeReleaseOneBucket(AggregatedDataVariants & variants, Int32 bucket); }; } @@ -41,6 +42,7 @@ class AggregateDataBlockConverter ~AggregateDataBlockConverter() = default; bool hasNext(); DB::Block next(); + private: DB::Aggregator & aggregator; DB::AggregatedDataVariantsPtr data_variants; @@ -50,4 +52,31 @@ class AggregateDataBlockConverter Int32 current_bucket = 0; DB::BlocksList output_blocks; }; + +class AggregatorParamsHelper +{ +public: + enum class Algorithm + { + GlutenGraceAggregate, + CHTwoStageAggregate + }; + enum class Mode + { + INIT_TO_PARTIAL, + INIT_TO_COMPLETED, + PARTIAL_TO_PARTIAL, + PARTIAL_TO_FINISHED, + }; + + // for using grace aggregating, never enable ch spill, otherwise there will be data lost. + static DB::Aggregator::Params buildParams( + DB::ContextPtr context, + const DB::Names & grouping_keys, + const DB::AggregateDescriptions & agg_descriptions, + Mode mode, + Algorithm algorithm = Algorithm::GlutenGraceAggregate); + static bool compare(const DB::Aggregator::Params & lhs, const DB::Aggregator::Params & rhs); +}; + } diff --git a/cpp-ch/local-engine/Parser/RelParsers/AggregateRelParser.cpp b/cpp-ch/local-engine/Parser/RelParsers/AggregateRelParser.cpp index 6bc8c7e6e14ad..3d6c978f0e543 100644 --- a/cpp-ch/local-engine/Parser/RelParsers/AggregateRelParser.cpp +++ b/cpp-ch/local-engine/Parser/RelParsers/AggregateRelParser.cpp @@ -184,9 +184,7 @@ void AggregateRelParser::setup(DB::QueryPlanPtr query_plan, const substrait::Rel agg_info.signature_function_name = *parseSignatureFunctionName(measure.measure().function_reference()); auto function_parser = AggregateFunctionParserFactory::instance().get(agg_info.signature_function_name, parser_context); if (!function_parser) - { throw Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Unsupported aggregate function: {}", agg_info.signature_function_name); - } /// Put function_parser, parser_func_info and function_name into agg_info for reducing repeated builds. agg_info.function_parser = function_parser; agg_info.parser_func_info = AggregateFunctionParser::CommonFunctionInfo(measure); @@ -198,16 +196,10 @@ void AggregateRelParser::setup(DB::QueryPlanPtr query_plan, const substrait::Rel if (aggregate_rel->groupings_size() == 1) { for (const auto & expr : aggregate_rel->groupings(0).grouping_expressions()) - { if (expr.has_selection() && expr.selection().has_direct_reference()) - { grouping_keys.push_back(input_header.getByPosition(expr.selection().direct_reference().struct_field().field()).name); - } else - { throw Exception(ErrorCodes::BAD_ARGUMENTS, "unsupported group expression: {}", expr.DebugString()); - } - } } else if (aggregate_rel->groupings_size() != 0) { @@ -345,23 +337,23 @@ void AggregateRelParser::addMergingAggregatedStep() AggregateDescriptions aggregate_descriptions; buildAggregateDescriptions(aggregate_descriptions); const auto & settings = getContext()->getSettingsRef(); - Aggregator::Params params( - grouping_keys, - aggregate_descriptions, - false, - settings[Setting::max_threads], - PODArrayUtil::adjustMemoryEfficientSize(settings[Setting::max_block_size]), - settings[Setting::min_hit_rate_to_use_consecutive_keys_optimization]); auto config = StreamingAggregateConfig::loadFromContext(getContext()); if (config.enable_streaming_aggregating) { - params.group_by_two_level_threshold = settings[Setting::group_by_two_level_threshold]; + auto params = AggregatorParamsHelper::buildParams( + getContext(), grouping_keys, aggregate_descriptions, AggregatorParamsHelper::Mode::PARTIAL_TO_FINISHED); auto merging_step = std::make_unique(getContext(), plan->getCurrentHeader(), params, false); steps.emplace_back(merging_step.get()); plan->addStep(std::move(merging_step)); } else { + auto params = AggregatorParamsHelper::buildParams( + getContext(), + grouping_keys, + aggregate_descriptions, + AggregatorParamsHelper::Mode::PARTIAL_TO_FINISHED, + AggregatorParamsHelper::Algorithm::CHTwoStageAggregate); /// We don't use the grouping set feature in CH, so grouping_sets_params_list should always be empty. DB::GroupingSetsParamsList grouping_sets_params_list; auto merging_step = std::make_unique( @@ -389,54 +381,20 @@ void AggregateRelParser::addCompleteModeAggregatedStep() auto config = StreamingAggregateConfig::loadFromContext(getContext()); if (config.enable_streaming_aggregating) { - Aggregator::Params params( - grouping_keys, - aggregate_descriptions, - false, - settings[Setting::max_rows_to_group_by], - settings[Setting::group_by_overflow_mode], - settings[Setting::group_by_two_level_threshold], - settings[Setting::group_by_two_level_threshold_bytes], - 0, /*settings[Setting::max_bytes_before_external_group_by]*/ - settings[Setting::empty_result_for_aggregation_by_empty_set], - getContext()->getTempDataOnDisk(), - settings[Setting::max_threads], - settings[Setting::min_free_disk_space_for_temporary_data], - true, - 3, - PODArrayUtil::adjustMemoryEfficientSize(settings[Setting::max_block_size]), - /*enable_prefetch*/ true, - /*only_merge*/ false, - settings[Setting::optimize_group_by_constant_keys], - settings[Setting::min_hit_rate_to_use_consecutive_keys_optimization], - /*StatsCollectingParams*/ {}); + auto params = AggregatorParamsHelper::buildParams( + getContext(), grouping_keys, aggregate_descriptions, AggregatorParamsHelper::Mode::INIT_TO_COMPLETED); auto merging_step = std::make_unique(getContext(), plan->getCurrentHeader(), params, true); steps.emplace_back(merging_step.get()); plan->addStep(std::move(merging_step)); } else { - Aggregator::Params params( + auto params = AggregatorParamsHelper::buildParams( + getContext(), grouping_keys, aggregate_descriptions, - false, - settings[Setting::max_rows_to_group_by], - settings[Setting::group_by_overflow_mode], - settings[Setting::group_by_two_level_threshold], - settings[Setting::group_by_two_level_threshold_bytes], - settings[Setting::max_bytes_before_external_group_by], - settings[Setting::empty_result_for_aggregation_by_empty_set], - getContext()->getTempDataOnDisk(), - settings[Setting::max_threads], - settings[Setting::min_free_disk_space_for_temporary_data], - true, - 3, - PODArrayUtil::adjustMemoryEfficientSize(settings[Setting::max_block_size]), - /*enable_prefetch*/ true, - /*only_merge*/ false, - settings[Setting::optimize_group_by_constant_keys], - settings[Setting::min_hit_rate_to_use_consecutive_keys_optimization], - /*StatsCollectingParams*/ {}); + AggregatorParamsHelper::Mode::INIT_TO_COMPLETED, + AggregatorParamsHelper::Algorithm::CHTwoStageAggregate); auto aggregating_step = std::make_unique( plan->getCurrentHeader(), @@ -471,9 +429,7 @@ void AggregateRelParser::addAggregatingStep() { const auto & next_rel = *(rel_stack->back()); if (next_rel.rel_type_case() == substrait::Rel::RelTypeCase::kAggregate) - { is_distinct_aggreate = true; - } } if (config.enable_streaming_aggregating) @@ -484,27 +440,9 @@ void AggregateRelParser::addAggregatingStep() // unreliable. It will appear that a small hash table is converted into a two level structure, resulting in a // lot of small blocks. So we disable this condition, reamain `group_by_two_level_threshold` as the condition to // convert a single level hash table into a two level one. - Aggregator::Params params( - grouping_keys, - aggregate_descriptions, - false, - settings[Setting::max_rows_to_group_by], - settings[Setting::group_by_overflow_mode], - settings[Setting::group_by_two_level_threshold], - 0, // group_by_two_level_threshold_bytes - 0, - settings[Setting::empty_result_for_aggregation_by_empty_set], - nullptr, - settings[Setting::max_threads], - settings[Setting::min_free_disk_space_for_temporary_data], - true, - 3, - PODArrayUtil::adjustMemoryEfficientSize(settings[Setting::max_block_size]), - /*enable_prefetch*/ true, - /*only_merge*/ false, - settings[Setting::optimize_group_by_constant_keys], - settings[Setting::min_hit_rate_to_use_consecutive_keys_optimization], - /*StatsCollectingParams*/ {}); + auto params = AggregatorParamsHelper::buildParams( + getContext(), grouping_keys, aggregate_descriptions, AggregatorParamsHelper::Mode::INIT_TO_PARTIAL); + if (!is_distinct_aggreate) { auto aggregating_step = std::make_unique(getContext(), plan->getCurrentHeader(), params); @@ -532,27 +470,12 @@ void AggregateRelParser::addAggregatingStep() } else { - Aggregator::Params params( + auto params = AggregatorParamsHelper::buildParams( + getContext(), grouping_keys, aggregate_descriptions, - false, - settings[Setting::max_rows_to_group_by], - settings[Setting::group_by_overflow_mode], - settings[Setting::group_by_two_level_threshold], - settings[Setting::group_by_two_level_threshold_bytes], - settings[Setting::max_bytes_before_external_group_by], - settings[Setting::empty_result_for_aggregation_by_empty_set], - getContext()->getTempDataOnDisk(), - settings[Setting::max_threads], - settings[Setting::min_free_disk_space_for_temporary_data], - true, - 3, - PODArrayUtil::adjustMemoryEfficientSize(settings[Setting::max_block_size]), - /*enable_prefetch*/ true, - /*only_merge*/ false, - settings[Setting::optimize_group_by_constant_keys], - settings[Setting::min_hit_rate_to_use_consecutive_keys_optimization], - /*StatsCollectingParams*/ {}); + AggregatorParamsHelper::Mode::INIT_TO_PARTIAL, + AggregatorParamsHelper::Algorithm::CHTwoStageAggregate); auto aggregating_step = std::make_unique( plan->getCurrentHeader(), @@ -587,12 +510,8 @@ void AggregateRelParser::addPostProjection() for (const auto & agg_info : aggregates) { for (const auto * input_node : project_actions_dag.getInputs()) - { if (input_node->result_name == agg_info.measure_column_name) - { agg_info.function_parser->convertNodeTypeIfNeeded(agg_info.parser_func_info, input_node, project_actions_dag, false); - } - } } } else if (has_complete_stage) @@ -601,12 +520,8 @@ void AggregateRelParser::addPostProjection() for (const auto & agg_info : aggregates) { for (const auto * output_node : project_actions_dag.getOutputs()) - { if (output_node->result_name == agg_info.measure_column_name) - { agg_info.function_parser->convertNodeTypeIfNeeded(agg_info.parser_func_info, output_node, project_actions_dag, true); - } - } } } if (project_actions_dag.dumpDAG() != dag_footprint)