From 17d9cd8dfbfa0afc551d71be954553f959ce7f26 Mon Sep 17 00:00:00 2001 From: lgbo Date: Mon, 18 Nov 2024 10:18:43 +0800 Subject: [PATCH] [GLUTEN-7959][CH] `AdvancedExpandStep` generates less row than expected (#7960) * detect cardinality * fix aggregate params --- .../Operator/AdvancedExpandStep.cpp | 19 ++++++------------- 1 file changed, 6 insertions(+), 13 deletions(-) diff --git a/cpp-ch/local-engine/Operator/AdvancedExpandStep.cpp b/cpp-ch/local-engine/Operator/AdvancedExpandStep.cpp index 6ac5f5fc8f8b..b777731a9103 100644 --- a/cpp-ch/local-engine/Operator/AdvancedExpandStep.cpp +++ b/cpp-ch/local-engine/Operator/AdvancedExpandStep.cpp @@ -115,6 +115,7 @@ void AdvancedExpandStep::transformPipeline(DB::QueryPipelineBuilder & pipeline, break; aggregate_grouping_keys.push_back(col.name); } + // partial to partial aggregate DB::Aggregator::Params params( aggregate_grouping_keys, aggregate_descriptions, @@ -122,10 +123,10 @@ void AdvancedExpandStep::transformPipeline(DB::QueryPipelineBuilder & pipeline, settings[DB::Setting::max_rows_to_group_by], settings[DB::Setting::group_by_overflow_mode], settings[DB::Setting::group_by_two_level_threshold], - settings[DB::Setting::group_by_two_level_threshold_bytes], - settings[DB::Setting::max_bytes_before_external_group_by], + 0, + 0, settings[DB::Setting::empty_result_for_aggregation_by_empty_set], - context->getTempDataOnDisk(), + nullptr, settings[DB::Setting::max_threads], settings[DB::Setting::min_free_disk_space_for_temporary_data], true, @@ -149,7 +150,7 @@ void AdvancedExpandStep::transformPipeline(DB::QueryPipelineBuilder & pipeline, new_processors.push_back(expand_processor); auto expand_output_header = expand_processor->getOutputs().front().getHeader(); - + auto transform_params = std::make_shared(expand_output_header, params, false); auto aggregate_processor = std::make_shared(expand_output_header, transform_params, context, false, false); @@ -188,14 +189,10 @@ AdvancedExpandTransform::AdvancedExpandTransform( , input_header(input_header_) { for (size_t i = 0; i < project_set_exprs.getKinds().size(); ++i) - { is_low_cardinality_expand.push_back(true); - } for (auto & port : outputs) - { output_ports.push_back(&port); - } } DB::IProcessor::Status AdvancedExpandTransform::prepare() @@ -245,9 +242,7 @@ DB::IProcessor::Status AdvancedExpandTransform::prepare() input.setNeeded(); if (!input.hasData()) - { return Status::NeedData; - } input_chunk = input.pull(true); has_input = true; expand_expr_iterator = 0; @@ -265,9 +260,7 @@ void AdvancedExpandTransform::work() has_input = false; } if ((input_finished || cardinality_detect_rows >= rows_for_detect_cardinality) && !cardinality_detect_blocks.empty()) - { detectCardinality(); - } else if (!input_finished && cardinality_detect_rows < rows_for_detect_cardinality) return; @@ -281,7 +274,7 @@ void AdvancedExpandTransform::detectCardinality() std::vector is_col_low_cardinality; for (size_t i = 0; i < grouping_keys; ++i) { - DB::WeakHash32 hash(cardinality_detect_rows); + DB::WeakHash32 hash = block.getByPosition(i).column->getWeakHash32(); std::unordered_set distinct_ids; const auto & data = hash.getData(); for (size_t j = 0; j < cardinality_detect_rows; ++j)