Skip to content

Commit

Permalink
[GLUTEN-7959][CH] AdvancedExpandStep generates less row than expect…
Browse files Browse the repository at this point in the history
…ed (#7960)

* detect cardinality

* fix aggregate params
  • Loading branch information
lgbo-ustc authored Nov 18, 2024
1 parent c144443 commit 17d9cd8
Showing 1 changed file with 6 additions and 13 deletions.
19 changes: 6 additions & 13 deletions cpp-ch/local-engine/Operator/AdvancedExpandStep.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -115,17 +115,18 @@ void AdvancedExpandStep::transformPipeline(DB::QueryPipelineBuilder & pipeline,
break;
aggregate_grouping_keys.push_back(col.name);
}
// partial to partial aggregate
DB::Aggregator::Params params(
aggregate_grouping_keys,
aggregate_descriptions,
false,
settings[DB::Setting::max_rows_to_group_by],
settings[DB::Setting::group_by_overflow_mode],
settings[DB::Setting::group_by_two_level_threshold],
settings[DB::Setting::group_by_two_level_threshold_bytes],
settings[DB::Setting::max_bytes_before_external_group_by],
0,
0,
settings[DB::Setting::empty_result_for_aggregation_by_empty_set],
context->getTempDataOnDisk(),
nullptr,
settings[DB::Setting::max_threads],
settings[DB::Setting::min_free_disk_space_for_temporary_data],
true,
Expand All @@ -149,7 +150,7 @@ void AdvancedExpandStep::transformPipeline(DB::QueryPipelineBuilder & pipeline,
new_processors.push_back(expand_processor);

auto expand_output_header = expand_processor->getOutputs().front().getHeader();

auto transform_params = std::make_shared<DB::AggregatingTransformParams>(expand_output_header, params, false);
auto aggregate_processor
= std::make_shared<GraceAggregatingTransform>(expand_output_header, transform_params, context, false, false);
Expand Down Expand Up @@ -188,14 +189,10 @@ AdvancedExpandTransform::AdvancedExpandTransform(
, input_header(input_header_)
{
for (size_t i = 0; i < project_set_exprs.getKinds().size(); ++i)
{
is_low_cardinality_expand.push_back(true);
}

for (auto & port : outputs)
{
output_ports.push_back(&port);
}
}

DB::IProcessor::Status AdvancedExpandTransform::prepare()
Expand Down Expand Up @@ -245,9 +242,7 @@ DB::IProcessor::Status AdvancedExpandTransform::prepare()

input.setNeeded();
if (!input.hasData())
{
return Status::NeedData;
}
input_chunk = input.pull(true);
has_input = true;
expand_expr_iterator = 0;
Expand All @@ -265,9 +260,7 @@ void AdvancedExpandTransform::work()
has_input = false;
}
if ((input_finished || cardinality_detect_rows >= rows_for_detect_cardinality) && !cardinality_detect_blocks.empty())
{
detectCardinality();
}
else if (!input_finished && cardinality_detect_rows < rows_for_detect_cardinality)
return;

Expand All @@ -281,7 +274,7 @@ void AdvancedExpandTransform::detectCardinality()
std::vector<bool> is_col_low_cardinality;
for (size_t i = 0; i < grouping_keys; ++i)
{
DB::WeakHash32 hash(cardinality_detect_rows);
DB::WeakHash32 hash = block.getByPosition(i).column->getWeakHash32();
std::unordered_set<UInt32> distinct_ids;
const auto & data = hash.getData();
for (size_t j = 0; j < cardinality_detect_rows; ++j)
Expand Down

0 comments on commit 17d9cd8

Please sign in to comment.