Skip to content

Commit

Permalink
feat: add fallback TableProvider to FederatedTableProviderAdaptor (da…
Browse files Browse the repository at this point in the history
  • Loading branch information
phillipleblanc authored Jun 18, 2024
1 parent 0567c02 commit ce87bf4
Showing 1 changed file with 81 additions and 6 deletions.
87 changes: 81 additions & 6 deletions datafusion-federation/src/table_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use datafusion::{
datasource::TableProvider,
error::{DataFusionError, Result},
execution::context::SessionState,
logical_expr::{Expr, LogicalPlan, TableSource, TableType},
logical_expr::{Expr, LogicalPlan, TableProviderFilterPushDown, TableSource, TableType},
physical_plan::ExecutionPlan,
};

Expand All @@ -17,11 +17,28 @@ use crate::FederationProvider;
// from a TableScan. This wrapper may be avoidable.
pub struct FederatedTableProviderAdaptor {
pub source: Arc<dyn FederatedTableSource>,
pub table_provider: Option<Arc<dyn TableProvider>>,
}

impl FederatedTableProviderAdaptor {
pub fn new(source: Arc<dyn FederatedTableSource>) -> Self {
Self { source }
Self {
source,
table_provider: None,
}
}

/// Creates a new FederatedTableProviderAdaptor that falls back to the
/// provided TableProvider. This is useful if used within a DataFusion
/// context without the federation optimizer.
pub fn new_with_provider(
source: Arc<dyn FederatedTableSource>,
table_provider: Arc<dyn TableProvider>,
) -> Self {
Self {
source,
table_provider: Some(table_provider),
}
}
}

Expand All @@ -31,34 +48,92 @@ impl TableProvider for FederatedTableProviderAdaptor {
self
}
fn schema(&self) -> SchemaRef {
if let Some(table_provider) = &self.table_provider {
return table_provider.schema();
}

self.source.schema()
}
fn constraints(&self) -> Option<&Constraints> {
if let Some(table_provider) = &self.table_provider {
return table_provider
.constraints()
.or_else(|| self.source.constraints());
}

self.source.constraints()
}
fn table_type(&self) -> TableType {
if let Some(table_provider) = &self.table_provider {
return table_provider.table_type();
}

self.source.table_type()
}
fn get_logical_plan(&self) -> Option<&LogicalPlan> {
if let Some(table_provider) = &self.table_provider {
return table_provider
.get_logical_plan()
.or_else(|| self.source.get_logical_plan());
}

self.source.get_logical_plan()
}
fn get_column_default(&self, column: &str) -> Option<&Expr> {
if let Some(table_provider) = &self.table_provider {
return table_provider
.get_column_default(column)
.or_else(|| self.source.get_column_default(column));
}

self.source.get_column_default(column)
}
fn supports_filters_pushdown(
&self,
filters: &[&Expr],
) -> Result<Vec<TableProviderFilterPushDown>> {
if let Some(table_provider) = &self.table_provider {
return table_provider.supports_filters_pushdown(filters);
}

Ok(vec![
TableProviderFilterPushDown::Unsupported;
filters.len()
])
}

// Scan is not supported; the adaptor should be replaced
// with a virtual TableProvider that provides federation for a sub-plan.
async fn scan(
&self,
_state: &SessionState,
_projection: Option<&Vec<usize>>,
_filters: &[Expr],
_limit: Option<usize>,
state: &SessionState,
projection: Option<&Vec<usize>>,
filters: &[Expr],
limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
if let Some(table_provider) = &self.table_provider {
return table_provider.scan(state, projection, filters, limit).await;
}

Err(DataFusionError::NotImplemented(
"FederatedTableProviderAdaptor cannot scan".to_string(),
))
}

async fn insert_into(
&self,
_state: &SessionState,
input: Arc<dyn ExecutionPlan>,
overwrite: bool,
) -> Result<Arc<dyn ExecutionPlan>> {
if let Some(table_provider) = &self.table_provider {
return table_provider.insert_into(_state, input, overwrite).await;
}

Err(DataFusionError::NotImplemented(
"FederatedTableProviderAdaptor cannot insert_into".to_string(),
))
}
}

// FederatedTableProvider extends DataFusion's TableProvider trait
Expand Down

0 comments on commit ce87bf4

Please sign in to comment.