Skip to content

Commit

Permalink
test(14691): demonstrate EnforceSorting can remove a needed coalesce (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
wiedld authored Feb 28, 2025
1 parent 8bc889f commit 32224b4
Show file tree
Hide file tree
Showing 4 changed files with 139 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ fn csv_exec_multiple_sorted(output_ordering: Vec<LexOrdering>) -> Arc<DataSource
.build()
}

fn projection_exec_with_alias(
pub(crate) fn projection_exec_with_alias(
input: Arc<dyn ExecutionPlan>,
alias_pairs: Vec<(String, String)>,
) -> Arc<dyn ExecutionPlan> {
Expand Down
66 changes: 65 additions & 1 deletion datafusion/core/tests/physical_optimizer/enforce_sorting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,16 @@

use std::sync::Arc;

use crate::physical_optimizer::enforce_distribution::projection_exec_with_alias;
use crate::physical_optimizer::sanity_checker::{
assert_sanity_check, assert_sanity_check_err,
};
use crate::physical_optimizer::test_utils::{
aggregate_exec, bounded_window_exec, check_integrity, coalesce_batches_exec,
coalesce_partitions_exec, create_test_schema, create_test_schema2,
create_test_schema3, filter_exec, global_limit_exec, hash_join_exec, limit_exec,
local_limit_exec, memory_exec, parquet_exec, repartition_exec, sort_exec, sort_expr,
local_limit_exec, memory_exec, parquet_exec, parquet_exec_with_stats,
repartition_exec, schema, single_partitioned_aggregate, sort_exec, sort_expr,
sort_expr_options, sort_merge_join_exec, sort_preserving_merge_exec,
sort_preserving_merge_exec_with_fetch, spr_repartition_exec, stream_exec_ordered,
union_exec, RequirementsTestExec,
Expand Down Expand Up @@ -3346,3 +3351,62 @@ async fn test_window_partial_constant_and_set_monotonicity() -> Result<()> {

Ok(())
}

#[tokio::test]
async fn test_preserve_needed_coalesce() -> Result<()> {
// Input to EnforceSorting, from our test case.
let plan = projection_exec_with_alias(
union_exec(vec![parquet_exec_with_stats(); 2]),
vec![
("a".to_string(), "a".to_string()),
("b".to_string(), "value".to_string()),
],
);
let plan = Arc::new(CoalescePartitionsExec::new(plan));
let schema = schema();
let sort_key = LexOrdering::new(vec![PhysicalSortExpr {
expr: col("a", &schema).unwrap(),
options: SortOptions::default(),
}]);
let plan: Arc<dyn ExecutionPlan> =
single_partitioned_aggregate(plan, vec![("a".to_string(), "a1".to_string())]);
let plan = sort_exec(sort_key, plan);

// Starting plan: as in our test case.
assert_eq!(
get_plan_string(&plan),
vec![
"SortExec: expr=[a@0 ASC], preserve_partitioning=[false]",
" AggregateExec: mode=SinglePartitioned, gby=[a@0 as a1], aggr=[]",
" CoalescePartitionsExec",
" ProjectionExec: expr=[a@0 as a, b@1 as value]",
" UnionExec",
" DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet",
" DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet",
],
);
// Test: plan is valid.
assert_sanity_check(&plan, true);

// EnforceSorting will remove the coalesce, and add an SPM further up (above the aggregate).
let optimizer = EnforceSorting::new();
let optimized = optimizer.optimize(plan, &Default::default())?;
assert_eq!(
get_plan_string(&optimized),
vec![
"SortPreservingMergeExec: [a@0 ASC]",
" SortExec: expr=[a@0 ASC], preserve_partitioning=[true]",
" AggregateExec: mode=SinglePartitioned, gby=[a@0 as a1], aggr=[]",
" ProjectionExec: expr=[a@0 as a, b@1 as value]",
" UnionExec",
" DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet",
" DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet",
],
);

// Bug: Plan is now invalid.
let err = "does not satisfy distribution requirements: HashPartitioned[[a@0]]). Child-0 output partitioning: UnknownPartitioning(2)";
assert_sanity_check_err(&optimized, err);

Ok(())
}
10 changes: 9 additions & 1 deletion datafusion/core/tests/physical_optimizer/sanity_checker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,7 @@ fn create_test_schema2() -> SchemaRef {
}

/// Check if sanity checker should accept or reject plans.
fn assert_sanity_check(plan: &Arc<dyn ExecutionPlan>, is_sane: bool) {
pub(crate) fn assert_sanity_check(plan: &Arc<dyn ExecutionPlan>, is_sane: bool) {
let sanity_checker = SanityCheckPlan::new();
let opts = ConfigOptions::default();
assert_eq!(
Expand All @@ -397,6 +397,14 @@ fn assert_sanity_check(plan: &Arc<dyn ExecutionPlan>, is_sane: bool) {
);
}

/// Assert reason for sanity check failure.
pub(crate) fn assert_sanity_check_err(plan: &Arc<dyn ExecutionPlan>, err: &str) {
let sanity_checker = SanityCheckPlan::new();
let opts = ConfigOptions::default();
let error = sanity_checker.optimize(plan.clone(), &opts).unwrap_err();
assert!(error.message().contains(err));
}

/// Check if the plan we created is as expected by comparing the plan
/// formatted as a string.
fn assert_plan(plan: &dyn ExecutionPlan, expected_lines: Vec<&str>) {
Expand Down
65 changes: 64 additions & 1 deletion datafusion/core/tests/physical_optimizer/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,10 @@ use datafusion::datasource::memory::MemorySourceConfig;
use datafusion::datasource::physical_plan::ParquetSource;
use datafusion::datasource::source::DataSourceExec;
use datafusion_common::config::ConfigOptions;
use datafusion_common::stats::Precision;
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
use datafusion_common::utils::expr::COUNT_STAR_EXPANSION;
use datafusion_common::{JoinType, Result};
use datafusion_common::{ColumnStatistics, JoinType, Result, Statistics};
use datafusion_datasource::file_scan_config::FileScanConfig;
use datafusion_execution::object_store::ObjectStoreUrl;
use datafusion_execution::{SendableRecordBatchStream, TaskContext};
Expand Down Expand Up @@ -102,6 +103,44 @@ pub fn schema() -> SchemaRef {
]))
}

fn int64_stats() -> ColumnStatistics {
ColumnStatistics {
null_count: Precision::Absent,
sum_value: Precision::Absent,
max_value: Precision::Exact(1_000_000.into()),
min_value: Precision::Exact(0.into()),
distinct_count: Precision::Absent,
}
}

fn column_stats() -> Vec<ColumnStatistics> {
vec![
int64_stats(), // a
int64_stats(), // b
int64_stats(), // c
ColumnStatistics::default(),
ColumnStatistics::default(),
]
}

/// Create parquet datasource exec using schema from [`schema`].
pub(crate) fn parquet_exec_with_stats() -> Arc<DataSourceExec> {
let mut statistics = Statistics::new_unknown(&schema());
statistics.num_rows = Precision::Inexact(10);
statistics.column_statistics = column_stats();

let config = FileScanConfig::new(
ObjectStoreUrl::parse("test:///").unwrap(),
schema(),
Arc::new(ParquetSource::new(Default::default())),
)
.with_file(PartitionedFile::new("x".to_string(), 10000))
.with_statistics(statistics);
assert_eq!(config.statistics.num_rows, Precision::Inexact(10));

config.build()
}

pub fn create_test_schema() -> Result<SchemaRef> {
let nullable_column = Field::new("nullable_col", DataType::Int32, true);
let non_nullable_column = Field::new("non_nullable_col", DataType::Int32, false);
Expand Down Expand Up @@ -522,6 +561,30 @@ pub fn build_group_by(input_schema: &SchemaRef, columns: Vec<String>) -> Physica
PhysicalGroupBy::new_single(group_by_expr.clone())
}

pub(crate) fn single_partitioned_aggregate(
input: Arc<dyn ExecutionPlan>,
alias_pairs: Vec<(String, String)>,
) -> Arc<dyn ExecutionPlan> {
let schema = schema();
let group_by = alias_pairs
.iter()
.map(|(column, alias)| (col(column, &input.schema()).unwrap(), alias.to_string()))
.collect::<Vec<_>>();
let group_by = PhysicalGroupBy::new_single(group_by);

Arc::new(
AggregateExec::try_new(
AggregateMode::SinglePartitioned,
group_by,
vec![],
vec![],
input,
schema,
)
.unwrap(),
)
}

pub fn assert_plan_matches_expected(
plan: &Arc<dyn ExecutionPlan>,
expected: &[&str],
Expand Down

0 comments on commit 32224b4

Please sign in to comment.