diff --git a/cpp-ch/local-engine/Operator/AdvancedExpandStep.cpp b/cpp-ch/local-engine/Operator/AdvancedExpandStep.cpp index ee767b31bd21..72721ce85921 100644 --- a/cpp-ch/local-engine/Operator/AdvancedExpandStep.cpp +++ b/cpp-ch/local-engine/Operator/AdvancedExpandStep.cpp @@ -25,7 +25,7 @@ #include #include #include -#include +#include #include #include #include @@ -110,8 +110,7 @@ void AdvancedExpandStep::transformPipeline(DB::QueryPipelineBuilder & pipeline, auto expand_output_header = expand_processor->getOutputs().front().getHeader(); auto transform_params = std::make_shared(expand_output_header, params, false); - auto aggregate_processor - = std::make_shared(expand_output_header, transform_params, context, false, false); + auto aggregate_processor = std::make_shared(context, expand_output_header, transform_params); DB::connect(expand_processor->getOutputs().back(), aggregate_processor->getInputs().front()); new_processors.push_back(aggregate_processor); auto aggregate_output_header = aggregate_processor->getOutputs().front().getHeader(); @@ -146,8 +145,15 @@ AdvancedExpandTransform::AdvancedExpandTransform( , project_set_exprs(project_set_exprs_) , input_header(input_header_) { - for (size_t i = 0; i < project_set_exprs.getKinds().size(); ++i) - is_low_cardinality_expand.push_back(true); + for (size_t i = 0; i < project_set_exprs.getExpandRows(); ++i) + { + const auto & kinds = project_set_exprs.getKinds()[i]; + size_t n = 0; + for (size_t k = 0; k < grouping_keys; ++k) + if (kinds[k] == EXPAND_FIELD_KIND_SELECTION) + n += 1; + need_to_aggregate.push_back((n != grouping_keys)); + } for (auto & port : outputs) output_ports.push_back(&port); @@ -167,9 +173,11 @@ DB::IProcessor::Status AdvancedExpandTransform::prepare() if (has_output) { - auto & output_port = *output_ports[is_low_cardinality_expand[expand_expr_iterator - 1]]; + auto & output_port = *output_ports[need_to_aggregate[expand_expr_iterator - 1]]; if (output_port.canPush()) { + output_blocks[need_to_aggregate[expand_expr_iterator - 1]] += 1; + output_rows[need_to_aggregate[expand_expr_iterator - 1]] += output_chunk.getNumRows(); output_port.push(std::move(output_chunk)); has_output = false; auto status = expand_expr_iterator >= project_set_exprs.getExpandRows() ? Status::NeedData : Status::Ready; @@ -185,17 +193,18 @@ DB::IProcessor::Status AdvancedExpandTransform::prepare() { if (input.isFinished()) { - if (!cardinality_detect_blocks.empty()) - { - input_finished = true; - return Status::Ready; - } - else - { - output_ports[0]->finish(); - output_ports[1]->finish(); - return Status::Finished; - } + LOG_DEBUG( + getLogger("AdvancedExpandTransform"), + "Input rows/blocks={}/{}. output rows/blocks=[{}/{}, {}/{}]", + input_rows, + input_blocks, + output_rows[0], + output_blocks[0], + output_rows[1], + output_blocks[1]); + output_ports[0]->finish(); + output_ports[1]->finish(); + return Status::Finished; } input.setNeeded(); @@ -204,6 +213,8 @@ DB::IProcessor::Status AdvancedExpandTransform::prepare() input_chunk = input.pull(true); has_input = true; expand_expr_iterator = 0; + input_blocks += 1; + input_rows += input_chunk.getNumRows(); } return Status::Ready; @@ -211,55 +222,9 @@ DB::IProcessor::Status AdvancedExpandTransform::prepare() void AdvancedExpandTransform::work() { - if (!input_finished && cardinality_detect_rows < rows_for_detect_cardinality) - { - cardinality_detect_blocks.push_back(input_header.cloneWithColumns(input_chunk.detachColumns())); - cardinality_detect_rows += cardinality_detect_blocks.back().rows(); - has_input = false; - } - if ((input_finished || cardinality_detect_rows >= rows_for_detect_cardinality) && !cardinality_detect_blocks.empty()) - detectCardinality(); - else if (!input_finished && cardinality_detect_rows < rows_for_detect_cardinality) - return; - - /// The phase of detecting grouping keys' cardinality is finished here. expandInputChunk(); } -void AdvancedExpandTransform::detectCardinality() -{ - DB::Block block = BlockUtil::concatenateBlocksMemoryEfficiently(std::move(cardinality_detect_blocks)); - std::vector is_col_low_cardinality; - for (size_t i = 0; i < grouping_keys; ++i) - { - DB::WeakHash32 hash = block.getByPosition(i).column->getWeakHash32(); - std::unordered_set distinct_ids; - const auto & data = hash.getData(); - for (size_t j = 0; j < cardinality_detect_rows; ++j) - distinct_ids.insert(data[j]); - size_t distinct_ids_cnt = distinct_ids.size(); - is_col_low_cardinality.push_back(distinct_ids.size() < 1000); - } - - for (size_t i = 0; i < project_set_exprs.getExpandRows(); ++i) - { - const auto & kinds = project_set_exprs.getKinds()[i]; - for (size_t k = 0; k < grouping_keys; ++k) - { - const auto & kind = kinds[k]; - if (kind == EXPAND_FIELD_KIND_SELECTION && !is_col_low_cardinality[k]) - { - is_low_cardinality_expand[i] = false; - break; - } - } - } - LOG_DEBUG(getLogger("AdvancedExpandTransform"), "Low cardinality expand: {}", fmt::join(is_low_cardinality_expand, ",")); - - input_chunk = DB::Chunk(block.getColumns(), block.rows()); - cardinality_detect_blocks.clear(); -} - void AdvancedExpandTransform::expandInputChunk() { const auto & input_columns = input_chunk.getColumns(); diff --git a/cpp-ch/local-engine/Operator/AdvancedExpandStep.h b/cpp-ch/local-engine/Operator/AdvancedExpandStep.h index 295084658531..343a7f5227ee 100644 --- a/cpp-ch/local-engine/Operator/AdvancedExpandStep.h +++ b/cpp-ch/local-engine/Operator/AdvancedExpandStep.h @@ -78,19 +78,18 @@ class AdvancedExpandTransform : public DB::IProcessor bool has_input = false; bool has_output = false; size_t expand_expr_iterator = 0; - std::vector is_low_cardinality_expand; - std::vector approximate_grouping_keys; - size_t cardinality_detect_rows = 0; - std::vector cardinality_detect_blocks; - static constexpr size_t rows_for_detect_cardinality = 10000; - bool input_finished = false; + std::vector need_to_aggregate; std::vector output_ports; DB::Chunk input_chunk; DB::Chunk output_chunk; - void detectCardinality(); + size_t input_blocks = 0; + size_t input_rows = 0; + std::vector output_blocks = {0, 0}; + std::vector output_rows = {0, 0}; + void expandInputChunk(); };