Skip to content

Commit

Permalink
default physical planner: add an ability to use custom physical expre…
Browse files Browse the repository at this point in the history
…ssion planner

This patch adds an ability to construct default physical planner with
custom physical expression planner. It could be convinient if we use
custom physical expressions and want to create it during default planning.

WARNING: is dirty, unfinished patch only for POC.
  • Loading branch information
askalt committed Dec 20, 2024
1 parent e64f889 commit eefcd12
Show file tree
Hide file tree
Showing 5 changed files with 913 additions and 632 deletions.
5 changes: 3 additions & 2 deletions datafusion/core/src/datasource/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use crate::physical_plan::{
common, DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties,
Partitioning, SendableRecordBatchStream,
};
use crate::physical_planner::create_physical_sort_exprs;
use crate::physical_planner::DefaultPhysicalPlanner;

use arrow::datatypes::SchemaRef;
use arrow::record_batch::RecordBatch;
Expand Down Expand Up @@ -224,6 +224,7 @@ impl TableProvider for MemTable {

let show_sizes = state.config_options().explain.show_sizes;
exec = exec.with_show_sizes(show_sizes);
let phys_planner = DefaultPhysicalPlanner::default();

// add sort information if present
let sort_order = self.sort_order.lock();
Expand All @@ -233,7 +234,7 @@ impl TableProvider for MemTable {
let file_sort_order = sort_order
.iter()
.map(|sort_exprs| {
create_physical_sort_exprs(
phys_planner.create_physical_sort_exprs(
sort_exprs,
&df_schema,
state.execution_props(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -613,7 +613,10 @@ mod tests {
use arrow_array::Int32Array;

use super::*;
use crate::{test::columns, test_util::aggr_test_schema};
use crate::{
physical_planner::DefaultPhysicalPlanner, test::columns,
test_util::aggr_test_schema,
};

#[test]
fn physical_plan_config_no_projection() {
Expand Down Expand Up @@ -1091,6 +1094,7 @@ mod tests {
},
];

let planner = DefaultPhysicalPlanner::default();
for case in cases {
let table_schema = Arc::new(Schema::new(
case.file_schema
Expand All @@ -1109,7 +1113,7 @@ mod tests {
.sort
.into_iter()
.map(|expr| {
crate::physical_planner::create_physical_sort_expr(
planner.create_physical_sort_expr(
&expr,
&DFSchema::try_from(table_schema.as_ref().clone())?,
&ExecutionProps::default(),
Expand Down
Loading

0 comments on commit eefcd12

Please sign in to comment.