From 9aebcc710718fbd374f23d89920d28c3390a2bf3 Mon Sep 17 00:00:00 2001 From: Albert Skalt Date: Thu, 19 Dec 2024 17:59:37 +0300 Subject: [PATCH] default physical planner: add an ability to use custom physical expression creator This patch adds an ability to construct default physical planner with custom physical expression creator. It could be convinient if we use custom physical expressions and want to create it during default planning. --- datafusion/core/src/physical_planner.rs | 69 +++++++++++++++++++++++-- 1 file changed, 64 insertions(+), 5 deletions(-) diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index afa329ec7ef0b..339945c39f6ac 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -143,6 +143,38 @@ pub trait ExtensionPlanner { ) -> Result>>; } +/// This trait exposes the ability to create physical expressions. +pub trait PhysicalExpressionCreator { + /// Create a physical expression from a logical expression + /// suitable for evaluation + /// + /// `expr`: the expression to convert + /// + /// `input_dfschema`: the logical plan schema for evaluating `e` + fn create_physical_expr( + &self, + expr: &Expr, + input_dfschema: &DFSchema, + session_state: &SessionState, + ) -> Result> { + create_physical_expr(expr, input_dfschema, session_state.execution_props()) + } +} + +/// Default physical expression creator. +pub struct DefaultPhysicalExpressionCreator; + +impl PhysicalExpressionCreator for DefaultPhysicalExpressionCreator { + fn create_physical_expr( + &self, + expr: &Expr, + input_dfschema: &DFSchema, + session_state: &SessionState, + ) -> Result> { + create_physical_expr(expr, input_dfschema, session_state.execution_props()) + } +} + /// Default single node physical query planner that converts a /// `LogicalPlan` to an `ExecutionPlan` suitable for execution. /// @@ -161,9 +193,18 @@ pub trait ExtensionPlanner { /// execute concurrently. /// /// [`planning_concurrency`]: crate::config::ExecutionOptions::planning_concurrency -#[derive(Default)] pub struct DefaultPhysicalPlanner { extension_planners: Vec>, + physical_expr_creator: Arc, +} + +impl Default for DefaultPhysicalPlanner { + fn default() -> Self { + Self { + extension_planners: Default::default(), + physical_expr_creator: Arc::new(DefaultPhysicalExpressionCreator), + } + } } #[async_trait] @@ -189,16 +230,20 @@ impl PhysicalPlanner for DefaultPhysicalPlanner { /// Create a physical expression from a logical expression /// suitable for evaluation /// - /// `e`: the expression to convert + /// `expr`: the expression to convert /// - /// `input_dfschema`: the logical plan schema for evaluating `e` + /// `input_dfschema`: the logical plan schema for evaluating `expr`. fn create_physical_expr( &self, expr: &Expr, input_dfschema: &DFSchema, session_state: &SessionState, ) -> Result> { - create_physical_expr(expr, input_dfschema, session_state.execution_props()) + self.physical_expr_creator.create_physical_expr( + expr, + input_dfschema, + session_state, + ) } } @@ -260,6 +305,17 @@ struct LogicalNode<'a> { } impl DefaultPhysicalPlanner { + /// Make a new [`DefaultPhysicalPlanner`]. + pub fn new( + extension_planners: Vec>, + physical_expr_creator: Arc, + ) -> Self { + Self { + extension_planners, + physical_expr_creator, + } + } + /// Create a physical planner that uses `extension_planners` to /// plan user-defined logical nodes [`LogicalPlan::Extension`]. /// The planner uses the first [`ExtensionPlanner`] to return a non-`None` @@ -267,7 +323,10 @@ impl DefaultPhysicalPlanner { pub fn with_extension_planners( extension_planners: Vec>, ) -> Self { - Self { extension_planners } + Self { + extension_planners, + physical_expr_creator: Arc::new(DefaultPhysicalExpressionCreator), + } } /// Create a physical plan from a logical plan