Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid pushdown of volatile functions to tablescan #13475

Open
wants to merge 2 commits into
base: main
Choose a base branch
from

Conversation

theirix
Copy link
Contributor

@theirix theirix commented Nov 18, 2024

Which issue does this PR close?

When using pushdown filters, the planner pushes the volatile random() filter to the table source, so it executes in scan (for example, in parquet) and in the query engine, which leads to weird results.

Closes #13268.

Rationale for this change

It's impossible to evaluate volatile filters in different layers.

What changes are included in this PR?

  • Improvement for rule optimiser to avoid passing volatile filters to scan
  • Unit test

Are these changes tested?

  • Unit tests
  • Regression tests
  • Manual test

As proposed in the original issue, I tried alltypes_tiny_pages_plain.parquet sample file containing 7300 lines:

set datafusion.execution.parquet.pushdown_filters=true;
create external table data stored as parquet location 'alltypes_tiny_pages_plain.parquet';

Running a query

select COUNT(*) from data WHERE RANDOM() < 0.1;

with datafusion-cli gives an answer of 726, which is pretty close to the expected 730.

New plan

+---------------+---------------------------------------------------------------------------------+
| plan_type     | plan                                                                            |
+---------------+---------------------------------------------------------------------------------+
| logical_plan  | Aggregate: groupBy=[[]], aggr=[[count(Int64(1)) AS count(*)]]                   |
|               |   Filter: random() < Float64(0.1)                                               |
|               |     TableScan: data projection=[]                                               |
| physical_plan | AggregateExec: mode=Final, gby=[], aggr=[count(*)]                              |
|               |   CoalescePartitionsExec                                                        |
|               |     AggregateExec: mode=Partial, gby=[], aggr=[count(*)]                        |
|               |       CoalesceBatchesExec: target_batch_size=8192                               |
|               |         FilterExec: random() < 0.1                                              |
|               |           RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1  |
|               |             ParquetExec: file_groups={1 group: [[sample.parquet]]} 							|
|               |                                                                                 |
+---------------+---------------------------------------------------------------------------------+

Before the change plan was

| ParquetExec: file_groups={1 group: [[alltypes_tiny_pages_plain.parquet]]}, predicate=random() < 0.1 |

Are there any user-facing changes?

No breaking changes.

@github-actions github-actions bot added the optimizer Optimizer rules label Nov 18, 2024
@theirix theirix marked this pull request as ready for review November 18, 2024 22:33
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
optimizer Optimizer rules
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Filters on RANDOM() are applied incorrectly when pushdown_filters is enabled.
1 participant