diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index afa329ec7ef0b..339945c39f6ac 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -143,6 +143,38 @@ pub trait ExtensionPlanner { ) -> Result>>; } +/// This trait exposes the ability to create physical expressions. +pub trait PhysicalExpressionCreator { + /// Create a physical expression from a logical expression + /// suitable for evaluation + /// + /// `expr`: the expression to convert + /// + /// `input_dfschema`: the logical plan schema for evaluating `e` + fn create_physical_expr( + &self, + expr: &Expr, + input_dfschema: &DFSchema, + session_state: &SessionState, + ) -> Result> { + create_physical_expr(expr, input_dfschema, session_state.execution_props()) + } +} + +/// Default physical expression creator. +pub struct DefaultPhysicalExpressionCreator; + +impl PhysicalExpressionCreator for DefaultPhysicalExpressionCreator { + fn create_physical_expr( + &self, + expr: &Expr, + input_dfschema: &DFSchema, + session_state: &SessionState, + ) -> Result> { + create_physical_expr(expr, input_dfschema, session_state.execution_props()) + } +} + /// Default single node physical query planner that converts a /// `LogicalPlan` to an `ExecutionPlan` suitable for execution. /// @@ -161,9 +193,18 @@ pub trait ExtensionPlanner { /// execute concurrently. /// /// [`planning_concurrency`]: crate::config::ExecutionOptions::planning_concurrency -#[derive(Default)] pub struct DefaultPhysicalPlanner { extension_planners: Vec>, + physical_expr_creator: Arc, +} + +impl Default for DefaultPhysicalPlanner { + fn default() -> Self { + Self { + extension_planners: Default::default(), + physical_expr_creator: Arc::new(DefaultPhysicalExpressionCreator), + } + } } #[async_trait] @@ -189,16 +230,20 @@ impl PhysicalPlanner for DefaultPhysicalPlanner { /// Create a physical expression from a logical expression /// suitable for evaluation /// - /// `e`: the expression to convert + /// `expr`: the expression to convert /// - /// `input_dfschema`: the logical plan schema for evaluating `e` + /// `input_dfschema`: the logical plan schema for evaluating `expr`. fn create_physical_expr( &self, expr: &Expr, input_dfschema: &DFSchema, session_state: &SessionState, ) -> Result> { - create_physical_expr(expr, input_dfschema, session_state.execution_props()) + self.physical_expr_creator.create_physical_expr( + expr, + input_dfschema, + session_state, + ) } } @@ -260,6 +305,17 @@ struct LogicalNode<'a> { } impl DefaultPhysicalPlanner { + /// Make a new [`DefaultPhysicalPlanner`]. + pub fn new( + extension_planners: Vec>, + physical_expr_creator: Arc, + ) -> Self { + Self { + extension_planners, + physical_expr_creator, + } + } + /// Create a physical planner that uses `extension_planners` to /// plan user-defined logical nodes [`LogicalPlan::Extension`]. /// The planner uses the first [`ExtensionPlanner`] to return a non-`None` @@ -267,7 +323,10 @@ impl DefaultPhysicalPlanner { pub fn with_extension_planners( extension_planners: Vec>, ) -> Self { - Self { extension_planners } + Self { + extension_planners, + physical_expr_creator: Arc::new(DefaultPhysicalExpressionCreator), + } } /// Create a physical plan from a logical plan