Skip to content

Commit

Permalink
default physical planner: add an ability to use custom physical expre…
Browse files Browse the repository at this point in the history
…ssion creator

This patch adds an ability to construct default physical planner with
custom physical expression creator. It could be convinient if we use
custom physical expressions and want to create it during default planning.
  • Loading branch information
askalt committed Dec 19, 2024
1 parent d53f727 commit 9aebcc7
Showing 1 changed file with 64 additions and 5 deletions.
69 changes: 64 additions & 5 deletions datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,38 @@ pub trait ExtensionPlanner {
) -> Result<Option<Arc<dyn ExecutionPlan>>>;
}

/// This trait exposes the ability to create physical expressions.
pub trait PhysicalExpressionCreator {
/// Create a physical expression from a logical expression
/// suitable for evaluation
///
/// `expr`: the expression to convert
///
/// `input_dfschema`: the logical plan schema for evaluating `e`
fn create_physical_expr(
&self,
expr: &Expr,
input_dfschema: &DFSchema,
session_state: &SessionState,
) -> Result<Arc<dyn PhysicalExpr>> {
create_physical_expr(expr, input_dfschema, session_state.execution_props())
}
}

/// Default physical expression creator.
pub struct DefaultPhysicalExpressionCreator;

impl PhysicalExpressionCreator for DefaultPhysicalExpressionCreator {
fn create_physical_expr(
&self,
expr: &Expr,
input_dfschema: &DFSchema,
session_state: &SessionState,
) -> Result<Arc<dyn PhysicalExpr>> {
create_physical_expr(expr, input_dfschema, session_state.execution_props())
}
}

/// Default single node physical query planner that converts a
/// `LogicalPlan` to an `ExecutionPlan` suitable for execution.
///
Expand All @@ -161,9 +193,18 @@ pub trait ExtensionPlanner {
/// execute concurrently.
///
/// [`planning_concurrency`]: crate::config::ExecutionOptions::planning_concurrency
#[derive(Default)]
pub struct DefaultPhysicalPlanner {
extension_planners: Vec<Arc<dyn ExtensionPlanner + Send + Sync>>,
physical_expr_creator: Arc<dyn PhysicalExpressionCreator + Send + Sync>,
}

impl Default for DefaultPhysicalPlanner {
fn default() -> Self {
Self {
extension_planners: Default::default(),
physical_expr_creator: Arc::new(DefaultPhysicalExpressionCreator),
}
}
}

#[async_trait]
Expand All @@ -189,16 +230,20 @@ impl PhysicalPlanner for DefaultPhysicalPlanner {
/// Create a physical expression from a logical expression
/// suitable for evaluation
///
/// `e`: the expression to convert
/// `expr`: the expression to convert
///
/// `input_dfschema`: the logical plan schema for evaluating `e`
/// `input_dfschema`: the logical plan schema for evaluating `expr`.
fn create_physical_expr(
&self,
expr: &Expr,
input_dfschema: &DFSchema,
session_state: &SessionState,
) -> Result<Arc<dyn PhysicalExpr>> {
create_physical_expr(expr, input_dfschema, session_state.execution_props())
self.physical_expr_creator.create_physical_expr(
expr,
input_dfschema,
session_state,
)
}
}

Expand Down Expand Up @@ -260,14 +305,28 @@ struct LogicalNode<'a> {
}

impl DefaultPhysicalPlanner {
/// Make a new [`DefaultPhysicalPlanner`].
pub fn new(
extension_planners: Vec<Arc<dyn ExtensionPlanner + Send + Sync>>,
physical_expr_creator: Arc<dyn PhysicalExpressionCreator + Send + Sync>,
) -> Self {
Self {
extension_planners,
physical_expr_creator,
}
}

/// Create a physical planner that uses `extension_planners` to
/// plan user-defined logical nodes [`LogicalPlan::Extension`].
/// The planner uses the first [`ExtensionPlanner`] to return a non-`None`
/// plan.
pub fn with_extension_planners(
extension_planners: Vec<Arc<dyn ExtensionPlanner + Send + Sync>>,
) -> Self {
Self { extension_planners }
Self {
extension_planners,
physical_expr_creator: Arc::new(DefaultPhysicalExpressionCreator),
}
}

/// Create a physical plan from a logical plan
Expand Down

0 comments on commit 9aebcc7

Please sign in to comment.