Skip to content

Commit

Permalink
Improve lazy expand for high cardinality aggregation
Browse files Browse the repository at this point in the history
  • Loading branch information
lgbo-ustc committed Nov 19, 2024
1 parent 9d0b9c5 commit 81feb50
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 70 deletions.
91 changes: 28 additions & 63 deletions cpp-ch/local-engine/Operator/AdvancedExpandStep.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
#include <Interpreters/Aggregator.h>
#include <Interpreters/ExpressionActions.h>
#include <Interpreters/castColumn.h>
#include <Operator/GraceAggregatingTransform.h>
#include <Operator/StreamingAggregatingStep.h>
#include <Processors/ResizeProcessor.h>
#include <Processors/Transforms/ExpressionTransform.h>
#include <QueryPipeline/Pipe.h>
Expand Down Expand Up @@ -110,8 +110,7 @@ void AdvancedExpandStep::transformPipeline(DB::QueryPipelineBuilder & pipeline,
auto expand_output_header = expand_processor->getOutputs().front().getHeader();

auto transform_params = std::make_shared<DB::AggregatingTransformParams>(expand_output_header, params, false);
auto aggregate_processor
= std::make_shared<GraceAggregatingTransform>(expand_output_header, transform_params, context, false, false);
auto aggregate_processor = std::make_shared<StreamingAggregatingTransform>(context, expand_output_header, transform_params);
DB::connect(expand_processor->getOutputs().back(), aggregate_processor->getInputs().front());
new_processors.push_back(aggregate_processor);
auto aggregate_output_header = aggregate_processor->getOutputs().front().getHeader();
Expand Down Expand Up @@ -146,8 +145,15 @@ AdvancedExpandTransform::AdvancedExpandTransform(
, project_set_exprs(project_set_exprs_)
, input_header(input_header_)
{
for (size_t i = 0; i < project_set_exprs.getKinds().size(); ++i)
is_low_cardinality_expand.push_back(true);
for (size_t i = 0; i < project_set_exprs.getExpandRows(); ++i)
{
const auto & kinds = project_set_exprs.getKinds()[i];
size_t n = 0;
for (size_t k = 0; k < grouping_keys; ++k)
if (kinds[k] == EXPAND_FIELD_KIND_SELECTION)
n += 1;
need_to_aggregate.push_back((n != grouping_keys));
}

for (auto & port : outputs)
output_ports.push_back(&port);
Expand All @@ -167,9 +173,11 @@ DB::IProcessor::Status AdvancedExpandTransform::prepare()

if (has_output)
{
auto & output_port = *output_ports[is_low_cardinality_expand[expand_expr_iterator - 1]];
auto & output_port = *output_ports[need_to_aggregate[expand_expr_iterator - 1]];
if (output_port.canPush())
{
output_blocks[need_to_aggregate[expand_expr_iterator - 1]] += 1;
output_rows[need_to_aggregate[expand_expr_iterator - 1]] += output_chunk.getNumRows();
output_port.push(std::move(output_chunk));
has_output = false;
auto status = expand_expr_iterator >= project_set_exprs.getExpandRows() ? Status::NeedData : Status::Ready;
Expand All @@ -185,17 +193,18 @@ DB::IProcessor::Status AdvancedExpandTransform::prepare()
{
if (input.isFinished())
{
if (!cardinality_detect_blocks.empty())
{
input_finished = true;
return Status::Ready;
}
else
{
output_ports[0]->finish();
output_ports[1]->finish();
return Status::Finished;
}
LOG_DEBUG(
getLogger("AdvancedExpandTransform"),
"Input rows/blocks={}/{}. output rows/blocks=[{}/{}, {}/{}]",
input_rows,
input_blocks,
output_rows[0],
output_blocks[0],
output_rows[1],
output_blocks[1]);
output_ports[0]->finish();
output_ports[1]->finish();
return Status::Finished;
}

input.setNeeded();
Expand All @@ -204,62 +213,18 @@ DB::IProcessor::Status AdvancedExpandTransform::prepare()
input_chunk = input.pull(true);
has_input = true;
expand_expr_iterator = 0;
input_blocks += 1;
input_rows += input_chunk.getNumRows();
}

return Status::Ready;
}

void AdvancedExpandTransform::work()
{
if (!input_finished && cardinality_detect_rows < rows_for_detect_cardinality)
{
cardinality_detect_blocks.push_back(input_header.cloneWithColumns(input_chunk.detachColumns()));
cardinality_detect_rows += cardinality_detect_blocks.back().rows();
has_input = false;
}
if ((input_finished || cardinality_detect_rows >= rows_for_detect_cardinality) && !cardinality_detect_blocks.empty())
detectCardinality();
else if (!input_finished && cardinality_detect_rows < rows_for_detect_cardinality)
return;

/// The phase of detecting grouping keys' cardinality is finished here.
expandInputChunk();
}

void AdvancedExpandTransform::detectCardinality()
{
DB::Block block = BlockUtil::concatenateBlocksMemoryEfficiently(std::move(cardinality_detect_blocks));
std::vector<bool> is_col_low_cardinality;
for (size_t i = 0; i < grouping_keys; ++i)
{
DB::WeakHash32 hash = block.getByPosition(i).column->getWeakHash32();
std::unordered_set<UInt32> distinct_ids;
const auto & data = hash.getData();
for (size_t j = 0; j < cardinality_detect_rows; ++j)
distinct_ids.insert(data[j]);
size_t distinct_ids_cnt = distinct_ids.size();
is_col_low_cardinality.push_back(distinct_ids.size() < 1000);
}

for (size_t i = 0; i < project_set_exprs.getExpandRows(); ++i)
{
const auto & kinds = project_set_exprs.getKinds()[i];
for (size_t k = 0; k < grouping_keys; ++k)
{
const auto & kind = kinds[k];
if (kind == EXPAND_FIELD_KIND_SELECTION && !is_col_low_cardinality[k])
{
is_low_cardinality_expand[i] = false;
break;
}
}
}
LOG_DEBUG(getLogger("AdvancedExpandTransform"), "Low cardinality expand: {}", fmt::join(is_low_cardinality_expand, ","));

input_chunk = DB::Chunk(block.getColumns(), block.rows());
cardinality_detect_blocks.clear();
}

void AdvancedExpandTransform::expandInputChunk()
{
const auto & input_columns = input_chunk.getColumns();
Expand Down
13 changes: 6 additions & 7 deletions cpp-ch/local-engine/Operator/AdvancedExpandStep.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,19 +78,18 @@ class AdvancedExpandTransform : public DB::IProcessor
bool has_input = false;
bool has_output = false;
size_t expand_expr_iterator = 0;
std::vector<bool> is_low_cardinality_expand;
std::vector<size_t> approximate_grouping_keys;
size_t cardinality_detect_rows = 0;
std::vector<DB::Block> cardinality_detect_blocks;
static constexpr size_t rows_for_detect_cardinality = 10000;
bool input_finished = false;
std::vector<bool> need_to_aggregate;

std::vector<DB::OutputPort *> output_ports;

DB::Chunk input_chunk;
DB::Chunk output_chunk;

void detectCardinality();
size_t input_blocks = 0;
size_t input_rows = 0;
std::vector<size_t> output_blocks = {0, 0};
std::vector<size_t> output_rows = {0, 0};

void expandInputChunk();
};

Expand Down

0 comments on commit 81feb50

Please sign in to comment.