From b4f952f4b64448f1741752533b09dbe1422023c0 Mon Sep 17 00:00:00 2001 From: Albert Skalt Date: Thu, 19 Dec 2024 17:59:37 +0300 Subject: [PATCH] default physical planner: add an ability to use custom physical expression creator This patch adds an ability to construct default physical planner with custom physical expression creator. It could be convinient if we use custom physical expressions and want to create it during default planning. --- datafusion/core/src/physical_planner.rs | 67 +++++++++++++++++++++++-- 1 file changed, 62 insertions(+), 5 deletions(-) diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index afa329ec7ef0b..87781703878f6 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -143,6 +143,36 @@ pub trait ExtensionPlanner { ) -> Result>>; } +/// This trait exposes the ability to create physical expressions. +pub trait PhysicalExpressionCreator { + /// Create a physical expression from a logical expression + /// suitable for evaluation + /// + /// `expr`: the expression to convert + /// + /// `input_dfschema`: the logical plan schema for evaluating `e` + fn create_physical_expr( + &self, + expr: &Expr, + input_dfschema: &DFSchema, + session_state: &SessionState, + ) -> Result>; +} + +/// Default physical expression creator. +pub struct DefaultPhysicalExpressionCreator; + +impl PhysicalExpressionCreator for DefaultPhysicalExpressionCreator { + fn create_physical_expr( + &self, + expr: &Expr, + input_dfschema: &DFSchema, + session_state: &SessionState, + ) -> Result> { + create_physical_expr(expr, input_dfschema, session_state.execution_props()) + } +} + /// Default single node physical query planner that converts a /// `LogicalPlan` to an `ExecutionPlan` suitable for execution. /// @@ -161,9 +191,18 @@ pub trait ExtensionPlanner { /// execute concurrently. /// /// [`planning_concurrency`]: crate::config::ExecutionOptions::planning_concurrency -#[derive(Default)] pub struct DefaultPhysicalPlanner { extension_planners: Vec>, + physical_expr_creator: Arc, +} + +impl Default for DefaultPhysicalPlanner { + fn default() -> Self { + Self { + extension_planners: Default::default(), + physical_expr_creator: Arc::new(DefaultPhysicalExpressionCreator), + } + } } #[async_trait] @@ -189,16 +228,20 @@ impl PhysicalPlanner for DefaultPhysicalPlanner { /// Create a physical expression from a logical expression /// suitable for evaluation /// - /// `e`: the expression to convert + /// `expr`: the expression to convert /// - /// `input_dfschema`: the logical plan schema for evaluating `e` + /// `input_dfschema`: the logical plan schema for evaluating `expr`. fn create_physical_expr( &self, expr: &Expr, input_dfschema: &DFSchema, session_state: &SessionState, ) -> Result> { - create_physical_expr(expr, input_dfschema, session_state.execution_props()) + self.physical_expr_creator.create_physical_expr( + expr, + input_dfschema, + session_state, + ) } } @@ -260,6 +303,17 @@ struct LogicalNode<'a> { } impl DefaultPhysicalPlanner { + /// Make a new [`DefaultPhysicalPlanner`]. + pub fn new( + extension_planners: Vec>, + physical_expr_creator: Arc, + ) -> Self { + Self { + extension_planners, + physical_expr_creator, + } + } + /// Create a physical planner that uses `extension_planners` to /// plan user-defined logical nodes [`LogicalPlan::Extension`]. /// The planner uses the first [`ExtensionPlanner`] to return a non-`None` @@ -267,7 +321,10 @@ impl DefaultPhysicalPlanner { pub fn with_extension_planners( extension_planners: Vec>, ) -> Self { - Self { extension_planners } + Self { + extension_planners, + physical_expr_creator: Arc::new(DefaultPhysicalExpressionCreator), + } } /// Create a physical plan from a logical plan