Skip to content

Commit

Permalink
WIP sql: add ability to extend planner
Browse files Browse the repository at this point in the history
  • Loading branch information
askalt committed Feb 15, 2025
1 parent 986eefc commit aa12c3f
Show file tree
Hide file tree
Showing 20 changed files with 125 additions and 44 deletions.
40 changes: 32 additions & 8 deletions datafusion/core/src/execution/session_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,9 @@ use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
use datafusion_physical_optimizer::PhysicalOptimizerRule;
use datafusion_physical_plan::ExecutionPlan;
use datafusion_sql::parser::{DFParser, Statement};
use datafusion_sql::planner::{ContextProvider, ParserOptions, PlannerContext, SqlToRel};
use datafusion_sql::planner::{
ContextProvider, ParserOptions, PlannerContext, SqlPlannerExtension, SqlToRel,
};
use itertools::Itertools;
use log::{debug, info};
use sqlparser::ast::Expr as SQLExpr;
Expand Down Expand Up @@ -104,11 +106,11 @@ use uuid::Uuid;
/// # #[tokio::main]
/// # async fn main() -> Result<()> {
/// let state = SessionStateBuilder::new()
/// .with_config(SessionConfig::new())
/// .with_config(SessionConfig::new())
/// .with_runtime_env(Arc::new(RuntimeEnv::default()))
/// .with_default_features()
/// .build();
/// Ok(())
/// Ok(())
/// # }
/// ```
///
Expand Down Expand Up @@ -545,11 +547,10 @@ impl SessionState {
Ok(table_refs)
}

/// Convert an AST Statement into a LogicalPlan
pub async fn statement_to_plan(
async fn create_provider(
&self,
statement: datafusion_sql::parser::Statement,
) -> datafusion_common::Result<LogicalPlan> {
statement: &datafusion_sql::parser::Statement,
) -> datafusion_common::Result<SessionContextProvider> {
let references = self.resolve_table_references(&statement)?;

let mut provider = SessionContextProvider {
Expand All @@ -568,7 +569,30 @@ impl SessionState {
}
}
}
Ok(provider)
}

/// Convert an AST statement into a LogicalPlan with custom planner.
pub async fn statenent_to_plan_with_extensions<E: SqlPlannerExtension>(
&self,
statement: datafusion_sql::parser::Statement,
ext: E,
) -> datafusion_common::Result<LogicalPlan> {
let provider = self.create_provider(&statement).await?;
let query = SqlToRel::new_with_options_and_extension(
&provider,
self.get_parser_options(),
ext,
);
query.statement_to_plan(statement)
}

/// Convert an AST Statement into a LogicalPlan
pub async fn statement_to_plan(
&self,
statement: datafusion_sql::parser::Statement,
) -> datafusion_common::Result<LogicalPlan> {
let provider = self.create_provider(&statement).await?;
let query = SqlToRel::new_with_options(&provider, self.get_parser_options());
query.statement_to_plan(statement)
}
Expand Down Expand Up @@ -1514,7 +1538,7 @@ impl From<SessionState> for SessionStateBuilder {
///
/// This is used so the SQL planner can access the state of the session without
/// having a direct dependency on the [`SessionState`] struct (and core crate)
struct SessionContextProvider<'a> {
pub struct SessionContextProvider<'a> {
state: &'a SessionState,
tables: HashMap<ResolvedTableReference, Arc<dyn TableSource>>,
}
Expand Down
4 changes: 2 additions & 2 deletions datafusion/sql/src/cte.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

use std::sync::Arc;

use crate::planner::{ContextProvider, PlannerContext, SqlToRel};
use crate::planner::{ContextProvider, PlannerContext, SqlPlannerExtension, SqlToRel};

use arrow::datatypes::Schema;
use datafusion_common::{
Expand All @@ -28,7 +28,7 @@ use datafusion_common::{
use datafusion_expr::{LogicalPlan, LogicalPlanBuilder, TableSource};
use sqlparser::ast::{Query, SetExpr, SetOperator, With};

impl<S: ContextProvider> SqlToRel<'_, S> {
impl<S: ContextProvider, E: SqlPlannerExtension> SqlToRel<'_, S, E> {
pub(super) fn plan_with_clause(
&self,
with: With,
Expand Down
4 changes: 2 additions & 2 deletions datafusion/sql/src/expr/binary_op.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@
// specific language governing permissions and limitations
// under the License.

use crate::planner::{ContextProvider, SqlToRel};
use crate::planner::{ContextProvider, SqlPlannerExtension, SqlToRel};
use datafusion_common::{not_impl_err, Result};
use datafusion_expr::Operator;
use sqlparser::ast::BinaryOperator;

impl<S: ContextProvider> SqlToRel<'_, S> {
impl<S: ContextProvider, E: SqlPlannerExtension> SqlToRel<'_, S, E> {
pub(crate) fn parse_sql_binary_op(&self, op: BinaryOperator) -> Result<Operator> {
match op {
BinaryOperator::Gt => Ok(Operator::Gt),
Expand Down
4 changes: 2 additions & 2 deletions datafusion/sql/src/expr/function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use crate::planner::{ContextProvider, PlannerContext, SqlToRel};
use crate::planner::{ContextProvider, PlannerContext, SqlPlannerExtension, SqlToRel};

use arrow_schema::DataType;
use datafusion_common::{
Expand Down Expand Up @@ -195,7 +195,7 @@ impl FunctionArgs {
}
}

impl<S: ContextProvider> SqlToRel<'_, S> {
impl<S: ContextProvider, E: SqlPlannerExtension> SqlToRel<'_, S, E>{
pub(super) fn sql_function_to_expr(
&self,
function: SQLFunction,
Expand Down
4 changes: 2 additions & 2 deletions datafusion/sql/src/expr/grouping_set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,13 @@
// specific language governing permissions and limitations
// under the License.

use crate::planner::{ContextProvider, PlannerContext, SqlToRel};
use crate::planner::{ContextProvider, PlannerContext, SqlPlannerExtension, SqlToRel};
use datafusion_common::plan_err;
use datafusion_common::{DFSchema, Result};
use datafusion_expr::{Expr, GroupingSet};
use sqlparser::ast::Expr as SQLExpr;

impl<S: ContextProvider> SqlToRel<'_, S> {
impl<S: ContextProvider, E: SqlPlannerExtension> SqlToRel<'_, S, E> {
pub(super) fn sql_grouping_sets_to_expr(
&self,
exprs: Vec<Vec<SQLExpr>>,
Expand Down
4 changes: 2 additions & 2 deletions datafusion/sql/src/expr/identifier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@ use datafusion_common::{
use datafusion_expr::planner::PlannerResult;
use datafusion_expr::{Case, Expr};

use crate::planner::{ContextProvider, PlannerContext, SqlToRel};
use crate::planner::{ContextProvider, PlannerContext, SqlPlannerExtension, SqlToRel};
use datafusion_expr::UNNAMED_TABLE;

impl<S: ContextProvider> SqlToRel<'_, S> {
impl<'a, S: ContextProvider, E: SqlPlannerExtension> SqlToRel<'a, S, E> {
pub(super) fn sql_identifier_to_expr(
&self,
id: Ident,
Expand Down
3 changes: 2 additions & 1 deletion datafusion/sql/src/expr/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ use datafusion_expr::{
Operator, TryCast,
};

use crate::planner::SqlPlannerExtension;
use crate::planner::{ContextProvider, PlannerContext, SqlToRel};

mod binary_op;
Expand All @@ -48,7 +49,7 @@ mod substring;
mod unary_op;
mod value;

impl<S: ContextProvider> SqlToRel<'_, S> {
impl<'a, S: ContextProvider, E: SqlPlannerExtension> SqlToRel<'a, S, E> {
pub(crate) fn sql_expr_to_logical_expr(
&self,
sql: SQLExpr,
Expand Down
4 changes: 2 additions & 2 deletions datafusion/sql/src/expr/order_by.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,15 @@
// specific language governing permissions and limitations
// under the License.

use crate::planner::{ContextProvider, PlannerContext, SqlToRel};
use crate::planner::{ContextProvider, PlannerContext, SqlPlannerExtension, SqlToRel};
use datafusion_common::{
not_impl_err, plan_datafusion_err, plan_err, Column, DFSchema, Result,
};
use datafusion_expr::expr::Sort;
use datafusion_expr::{Expr, SortExpr};
use sqlparser::ast::{Expr as SQLExpr, OrderByExpr, Value};

impl<S: ContextProvider> SqlToRel<'_, S> {
impl<'a, S: ContextProvider, E: SqlPlannerExtension> SqlToRel<'a, S, E>{
/// Convert sql [OrderByExpr] to `Vec<Expr>`.
///
/// `input_schema` and `additional_schema` are used to resolve column references in the order-by expressions.
Expand Down
3 changes: 2 additions & 1 deletion datafusion/sql/src/expr/subquery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use crate::planner::SqlPlannerExtension;
use crate::planner::{ContextProvider, PlannerContext, SqlToRel};
use datafusion_common::{DFSchema, Result};
use datafusion_expr::expr::Exists;
Expand All @@ -24,7 +25,7 @@ use sqlparser::ast::Expr as SQLExpr;
use sqlparser::ast::Query;
use std::sync::Arc;

impl<S: ContextProvider> SqlToRel<'_, S> {
impl<'a, S: ContextProvider, E: SqlPlannerExtension> SqlToRel<'a, S, E> {
pub(super) fn parse_exists_subquery(
&self,
subquery: Query,
Expand Down
4 changes: 2 additions & 2 deletions datafusion/sql/src/expr/substring.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,14 @@
// specific language governing permissions and limitations
// under the License.

use crate::planner::{ContextProvider, PlannerContext, SqlToRel};
use crate::planner::{ContextProvider, PlannerContext, SqlPlannerExtension, SqlToRel};
use datafusion_common::{not_impl_err, plan_err};
use datafusion_common::{DFSchema, Result, ScalarValue};
use datafusion_expr::planner::PlannerResult;
use datafusion_expr::Expr;
use sqlparser::ast::Expr as SQLExpr;

impl<S: ContextProvider> SqlToRel<'_, S> {
impl<'a, S: ContextProvider, E: SqlPlannerExtension> SqlToRel<'a, S, E>{
pub(super) fn sql_substring_to_expr(
&self,
expr: Box<SQLExpr>,
Expand Down
4 changes: 2 additions & 2 deletions datafusion/sql/src/expr/unary_op.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@
// specific language governing permissions and limitations
// under the License.

use crate::planner::{ContextProvider, PlannerContext, SqlToRel};
use crate::planner::{ContextProvider, PlannerContext, SqlPlannerExtension, SqlToRel};
use datafusion_common::{not_impl_err, DFSchema, Result};
use datafusion_expr::Expr;
use sqlparser::ast::{Expr as SQLExpr, UnaryOperator, Value};

impl<S: ContextProvider> SqlToRel<'_, S> {
impl<'a, S: ContextProvider, E: SqlPlannerExtension> SqlToRel<'a, S, E> {
pub(crate) fn parse_sql_unary_op(
&self,
op: UnaryOperator,
Expand Down
4 changes: 2 additions & 2 deletions datafusion/sql/src/expr/value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use crate::planner::{ContextProvider, PlannerContext, SqlToRel};
use crate::planner::{ContextProvider, PlannerContext, SqlPlannerExtension, SqlToRel};
use arrow::compute::kernels::cast_utils::parse_interval_month_day_nano;
use arrow::datatypes::DECIMAL128_MAX_PRECISION;
use arrow_schema::DataType;
Expand All @@ -30,7 +30,7 @@ use sqlparser::ast::{BinaryOperator, Expr as SQLExpr, Interval, Value};
use sqlparser::parser::ParserError::ParserError;
use std::borrow::Cow;

impl<S: ContextProvider> SqlToRel<'_, S> {
impl<'a, S: ContextProvider, E: SqlPlannerExtension> SqlToRel<'a, S, E>{
pub(crate) fn parse_value(
&self,
value: Value,
Expand Down
54 changes: 52 additions & 2 deletions datafusion/sql/src/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ use datafusion_expr::logical_plan::{LogicalPlan, LogicalPlanBuilder};
use datafusion_expr::utils::find_column_exprs;
use datafusion_expr::{col, Expr};

use crate::parser::Statement;
use crate::utils::{make_decimal_type, value_to_string};
pub use datafusion_expr::planner::ContextProvider;

Expand Down Expand Up @@ -234,15 +235,44 @@ impl PlannerContext {
}
}

/// SQL planner extension.
///
/// A way to extend default planning mechanism.
pub trait SqlPlannerExtension: Sized {
/// [`SqlToRel`] will call this method prior to apply the own analysis.
/// If this methods returns `Some` then a resulting plan is returned,
/// otherwise planner will apply the own analysis.
///
fn statement_to_plan<'a, T: ContextProvider>(
&self,
statement: &Statement,
planner: &SqlToRel<'a, T, Self>,
) -> Result<Option<LogicalPlan>>;
}

/// Default extensions does nothing.
pub struct DefaultSqlPlannerExtension;

impl SqlPlannerExtension for DefaultSqlPlannerExtension {
fn statement_to_plan<'a, T: ContextProvider>(
&self,
_statement: &Statement,
_planner: &SqlToRel<'a, T, Self>,
) -> Result<Option<LogicalPlan>> {
Ok(None)
}
}

/// SQL query planner
pub struct SqlToRel<'a, S: ContextProvider> {
pub struct SqlToRel<'a, S: ContextProvider, E: SqlPlannerExtension> {
pub(crate) context_provider: &'a S,
pub(crate) options: ParserOptions,
pub(crate) ident_normalizer: IdentNormalizer,
pub(crate) value_normalizer: ValueNormalizer,
pub(crate) extension: E,
}

impl<'a, S: ContextProvider> SqlToRel<'a, S> {
impl<'a, S: ContextProvider> SqlToRel<'a, S, DefaultSqlPlannerExtension> {
/// Create a new query planner
pub fn new(context_provider: &'a S) -> Self {
Self::new_with_options(context_provider, ParserOptions::default())
Expand All @@ -258,6 +288,26 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
options,
ident_normalizer: IdentNormalizer::new(ident_normalize),
value_normalizer: ValueNormalizer::new(options_value_normalize),
extension: DefaultSqlPlannerExtension,
}
}
}

impl<'a, S: ContextProvider, E: SqlPlannerExtension> SqlToRel<'a, S, E> {
pub fn new_with_options_and_extension(
context_provider: &'a S,
options: ParserOptions,
extension: E,
) -> Self {
let ident_normalize = options.enable_ident_normalization;
let options_value_normalize = options.enable_options_value_normalization;

SqlToRel {
context_provider,
options,
ident_normalizer: IdentNormalizer::new(ident_normalize),
value_normalizer: ValueNormalizer::new(options_value_normalize),
extension,
}
}

Expand Down
4 changes: 2 additions & 2 deletions datafusion/sql/src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

use std::sync::Arc;

use crate::planner::{ContextProvider, PlannerContext, SqlToRel};
use crate::planner::{ContextProvider, PlannerContext, SqlPlannerExtension, SqlToRel};

use datafusion_common::{not_impl_err, plan_err, Constraints, Result, ScalarValue};
use datafusion_expr::expr::Sort;
Expand All @@ -30,7 +30,7 @@ use sqlparser::ast::{
SetExpr, Value,
};

impl<S: ContextProvider> SqlToRel<'_, S> {
impl<S: ContextProvider, E: SqlPlannerExtension> SqlToRel<'_, S, E> {
/// Generate a logical plan from an SQL query/subquery
pub(crate) fn query_to_plan(
&self,
Expand Down
4 changes: 2 additions & 2 deletions datafusion/sql/src/relation/join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,13 @@
// specific language governing permissions and limitations
// under the License.

use crate::planner::{ContextProvider, PlannerContext, SqlToRel};
use crate::planner::{ContextProvider, PlannerContext, SqlPlannerExtension, SqlToRel};
use datafusion_common::{not_impl_err, Column, Result};
use datafusion_expr::{JoinType, LogicalPlan, LogicalPlanBuilder};
use sqlparser::ast::{Join, JoinConstraint, JoinOperator, TableFactor, TableWithJoins};
use std::collections::HashSet;

impl<S: ContextProvider> SqlToRel<'_, S> {
impl<'a, S: ContextProvider, E: SqlPlannerExtension> SqlToRel<'a, S, E> {
pub(crate) fn plan_table_with_joins(
&self,
t: TableWithJoins,
Expand Down
4 changes: 2 additions & 2 deletions datafusion/sql/src/relation/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

use std::sync::Arc;

use crate::planner::{ContextProvider, PlannerContext, SqlToRel};
use crate::planner::{ContextProvider, PlannerContext, SqlPlannerExtension, SqlToRel};

use datafusion_common::tree_node::{Transformed, TreeNode};
use datafusion_common::{not_impl_err, plan_err, DFSchema, Result, TableReference};
Expand All @@ -28,7 +28,7 @@ use sqlparser::ast::{FunctionArg, FunctionArgExpr, TableFactor};

mod join;

impl<S: ContextProvider> SqlToRel<'_, S> {
impl<'a, S: ContextProvider, E: SqlPlannerExtension> SqlToRel<'a, S, E> {
/// Create a `LogicalPlan` that scans the named relation
fn create_relation(
&self,
Expand Down
Loading

0 comments on commit aa12c3f

Please sign in to comment.