-
Notifications
You must be signed in to change notification settings - Fork 11
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement Extension registration API #142
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -77,19 +77,18 @@ pub struct App<'app> { | |
} | ||
|
||
impl<'app> App<'app> { | ||
pub fn new(state: state::AppState<'app>) -> Self { | ||
pub fn new(state: state::AppState<'app>, execution: ExecutionContext) -> Self { | ||
let (event_tx, event_rx) = mpsc::unbounded_channel(); | ||
let cancellation_token = CancellationToken::new(); | ||
let task = tokio::spawn(async {}); | ||
let execution = Arc::new(ExecutionContext::new(state.config.execution.clone())); | ||
|
||
Self { | ||
state, | ||
task, | ||
event_rx, | ||
event_tx, | ||
cancellation_token, | ||
execution, | ||
execution: Arc::new(execution), | ||
} | ||
} | ||
|
||
|
@@ -310,30 +309,34 @@ impl Widget for &App<'_> { | |
} | ||
} | ||
|
||
pub async fn run_app(state: state::AppState<'_>) -> Result<()> { | ||
info!("Running app with state: {:?}", state); | ||
let mut app = App::new(state); | ||
impl App<'_> { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This just makes |
||
/// Run the main event loop for the application | ||
pub async fn run_app(self) -> Result<()> { | ||
info!("Running app with state: {:?}", self.state); | ||
let mut app = self; | ||
|
||
app.execute_ddl(); | ||
app.execute_ddl(); | ||
|
||
#[cfg(feature = "flightsql")] | ||
app.establish_flightsql_connection(); | ||
#[cfg(feature = "flightsql")] | ||
app.establish_flightsql_connection(); | ||
|
||
let mut terminal = ratatui::Terminal::new(CrosstermBackend::new(std::io::stdout())).unwrap(); | ||
app.enter(true)?; | ||
// Main loop for handling events | ||
loop { | ||
let event = app.next().await?; | ||
let mut terminal = | ||
ratatui::Terminal::new(CrosstermBackend::new(std::io::stdout())).unwrap(); | ||
app.enter(true)?; | ||
// Main loop for handling events | ||
loop { | ||
let event = app.next().await?; | ||
|
||
if let AppEvent::Render = event.clone() { | ||
terminal.draw(|f| f.render_widget(&app, f.area()))?; | ||
}; | ||
if let AppEvent::Render = event.clone() { | ||
terminal.draw(|f| f.render_widget(&app, f.area()))?; | ||
}; | ||
|
||
app.handle_app_event(event)?; | ||
app.handle_app_event(event)?; | ||
|
||
if app.state.should_quit { | ||
break; | ||
if app.state.should_quit { | ||
break; | ||
} | ||
} | ||
app.exit() | ||
} | ||
app.exit() | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,25 +20,20 @@ | |
use std::sync::Arc; | ||
|
||
use color_eyre::eyre::Result; | ||
use datafusion::execution::runtime_env::RuntimeEnv; | ||
use datafusion::execution::session_state::SessionStateBuilder; | ||
use datafusion::execution::SendableRecordBatchStream; | ||
use datafusion::physical_plan::{visit_execution_plan, ExecutionPlan, ExecutionPlanVisitor}; | ||
use datafusion::prelude::*; | ||
use datafusion::sql::parser::Statement; | ||
#[cfg(feature = "deltalake")] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The point of the PR is to remove these |
||
use deltalake::delta_datafusion::DeltaTableFactory; | ||
use log::info; | ||
use tokio_stream::StreamExt; | ||
#[cfg(feature = "s3")] | ||
use url::Url; | ||
#[cfg(feature = "flightsql")] | ||
use { | ||
arrow_flight::sql::client::FlightSqlServiceClient, tokio::sync::Mutex, | ||
tonic::transport::Channel, | ||
}; | ||
|
||
use crate::config::ExecutionConfig; | ||
use crate::extensions::{enabled_extensions, DftSessionStateBuilder}; | ||
|
||
/// Structure for executing queries either locally or remotely (via FlightSQL) | ||
/// | ||
|
@@ -61,68 +56,20 @@ pub struct ExecutionContext { | |
|
||
impl ExecutionContext { | ||
/// Construct a new `ExecutionContext` with the specified configuration | ||
pub fn new(config: ExecutionConfig) -> Self { | ||
let _ = &config; // avoid unused variable warning (it is used when some features are enabled) | ||
|
||
let cfg = SessionConfig::default() | ||
.with_batch_size(1) | ||
.with_information_schema(true); | ||
|
||
let runtime_env = RuntimeEnv::default(); | ||
|
||
#[cfg(feature = "s3")] | ||
{ | ||
if let Some(object_store_config) = &config.object_store { | ||
if let Some(s3_configs) = &object_store_config.s3 { | ||
info!("S3 configs exists"); | ||
for s3_config in s3_configs { | ||
match s3_config.to_object_store() { | ||
Ok(object_store) => { | ||
info!("Created object store"); | ||
if let Some(object_store_url) = s3_config.object_store_url() { | ||
info!("Endpoint exists"); | ||
if let Ok(parsed_endpoint) = Url::parse(object_store_url) { | ||
info!("Parsed endpoint"); | ||
runtime_env.register_object_store( | ||
&parsed_endpoint, | ||
Arc::new(object_store), | ||
); | ||
info!("Registered s3 object store"); | ||
} | ||
} | ||
} | ||
Err(e) => { | ||
log::error!("Error creating object store: {:?}", e); | ||
} | ||
} | ||
} | ||
} | ||
} | ||
} | ||
|
||
#[allow(unused_mut)] // used when deltalake is enabled | ||
let mut state = SessionStateBuilder::new() | ||
.with_default_features() | ||
.with_runtime_env(runtime_env.into()) | ||
.with_config(cfg) | ||
.build(); | ||
|
||
#[cfg(feature = "deltalake")] | ||
{ | ||
state | ||
.table_factories_mut() | ||
.insert("DELTATABLE".to_string(), Arc::new(DeltaTableFactory {})); | ||
pub fn try_new(config: &ExecutionConfig) -> Result<Self> { | ||
let mut builder = DftSessionStateBuilder::new(); | ||
for extension in enabled_extensions() { | ||
builder = extension.register(config, builder)?; | ||
} | ||
|
||
{ | ||
let session_ctx = SessionContext::new_with_state(state); | ||
let state = builder.build()?; | ||
let session_ctx = SessionContext::new_with_state(state); | ||
|
||
Self { | ||
session_ctx, | ||
#[cfg(feature = "flightsql")] | ||
flightsql_client: Mutex::new(None), | ||
} | ||
} | ||
Ok(Self { | ||
session_ctx, | ||
#[cfg(feature = "flightsql")] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. given the new organization i am wondering if There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah I agree the flightsql integration is still a little wonky. One thing I am wondering is rather than directly using the FlightSQL client, would it make more sense to have the UIs use an interface (aka trait) that takes SQL text and returns a It is more indirection but it would make it easy to use the same UI with different "backends" 🤔 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We might be able to - the only thing that gives me pause on that is the statistics collection we do on the local executions to show additional statistics in the history tab. right now its simple per query stats (elapsed time, and bytes scanned), but i had in mind (i.e. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We could maybe just stube that out / return no information for engines that don't support providing the info (actually in InfludDB 3.0 we return some of that info as grpc headers in the FlightSQL response, so it would be possible) Anyhow, maybe we can do that when adding flightsql to the cli interface |
||
flightsql_client: Mutex::new(None), | ||
}) | ||
} | ||
|
||
pub fn create_tables(&mut self) -> Result<()> { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,130 @@ | ||
// Licensed to the Apache Software Foundation (ASF) under one | ||
// or more contributor license agreements. See the NOTICE file | ||
// distributed with this work for additional information | ||
// regarding copyright ownership. The ASF licenses this file | ||
// to you under the Apache License, Version 2.0 (the | ||
// "License"); you may not use this file except in compliance | ||
// with the License. You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, | ||
// software distributed under the License is distributed on an | ||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
// KIND, either express or implied. See the License for the | ||
// specific language governing permissions and limitations | ||
// under the License. | ||
|
||
//! [`DftSessionStateBuilder`] for configuring DataFusion [`SessionState`] | ||
|
||
use datafusion::catalog::TableProviderFactory; | ||
use datafusion::execution::context::SessionState; | ||
use datafusion::execution::runtime_env::RuntimeEnv; | ||
use datafusion::execution::session_state::SessionStateBuilder; | ||
use datafusion::prelude::SessionConfig; | ||
use std::collections::HashMap; | ||
use std::fmt::Debug; | ||
use std::sync::Arc; | ||
|
||
/// Builds a DataFusion [`SessionState`] with any necessary configuration | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This struct is somewhat unfortunate - it is mostly due to the fact that the DataFusion SessionStateBuilder's APIs aren't quite nice enough to incrementally update the state Over time I'll try and port some/all of these features upstream so we can just use SessionStateBuilder directly |
||
/// | ||
/// Ideally we would use the DataFusion [`SessionStateBuilder`], but it doesn't | ||
/// currently have all the needed APIs. Once we have a good handle on the needed | ||
/// APIs we can upstream them to DataFusion. | ||
/// | ||
/// List of things that would be nice to add upstream: | ||
/// TODO: Implement Debug for SessionStateBuilder upstream | ||
/// TODO: Implement with_table_factory to add a single TableProviderFactory to the list of factories | ||
/// TODO: Implement some way to get access to the current RuntimeEnv (to register object stores) | ||
/// TODO: Implement a way to add just a single TableProviderFactory | ||
/// TODO: Make TableFactoryProvider implement Debug | ||
/// TODO: rename RuntimeEnv::new() to RuntimeEnv::try_new() as it returns a Result | ||
//#[derive(Debug)] | ||
pub struct DftSessionStateBuilder { | ||
session_config: SessionConfig, | ||
table_factories: Option<HashMap<String, Arc<dyn TableProviderFactory>>>, | ||
runtime_env: Option<Arc<RuntimeEnv>>, | ||
} | ||
|
||
impl Debug for DftSessionStateBuilder { | ||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { | ||
f.debug_struct("DftSessionStateBuilder") | ||
.field("session_config", &self.session_config) | ||
//.field("table_factories", &self.table_factories) | ||
.field( | ||
"table_factories", | ||
&"TODO TableFactoryDoes not implement Debug", | ||
) | ||
.field("runtime_env", &self.runtime_env) | ||
.finish() | ||
} | ||
} | ||
|
||
impl Default for DftSessionStateBuilder { | ||
fn default() -> Self { | ||
Self::new() | ||
} | ||
} | ||
|
||
impl DftSessionStateBuilder { | ||
/// Create a new builder | ||
pub fn new() -> Self { | ||
let session_config = SessionConfig::default() | ||
// TODO why is batch size 1? | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I just copied this batch size of 1 here, but it seems quite inefficient @matthewmturner do you know why dft is using a batch size of 1? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That was copied over from the Polygon terminal app i was working on that was querying web sockets where it was useful to test with batch size of 1. We can definitely change that. |
||
.with_batch_size(1) | ||
.with_information_schema(true); | ||
|
||
Self { | ||
session_config, | ||
table_factories: None, | ||
runtime_env: None, | ||
} | ||
} | ||
|
||
/// Add a table factory to the list of factories on this builder | ||
pub fn with_table_factory( | ||
mut self, | ||
name: &str, | ||
factory: Arc<dyn TableProviderFactory>, | ||
) -> Self { | ||
if self.table_factories.is_none() { | ||
self.table_factories = Some(HashMap::from([(name.to_string(), factory)])); | ||
} else { | ||
self.table_factories | ||
.as_mut() | ||
.unwrap() | ||
.insert(name.to_string(), factory); | ||
} | ||
self | ||
} | ||
|
||
/// Return the current [`RuntimeEnv`], creating a default if it doesn't exist | ||
pub fn runtime_env(&mut self) -> &RuntimeEnv { | ||
if self.runtime_env.is_none() { | ||
self.runtime_env = Some(Arc::new(RuntimeEnv::default())); | ||
} | ||
self.runtime_env.as_ref().unwrap() | ||
} | ||
|
||
/// Build the [`SessionState`] from the specified configuration | ||
pub fn build(self) -> datafusion_common::Result<SessionState> { | ||
let Self { | ||
session_config, | ||
table_factories, | ||
runtime_env, | ||
} = self; | ||
|
||
let mut builder = SessionStateBuilder::new() | ||
.with_default_features() | ||
.with_config(session_config); | ||
|
||
if let Some(runtime_env) = runtime_env { | ||
builder = builder.with_runtime_env(runtime_env); | ||
} | ||
if let Some(table_factories) = table_factories { | ||
builder = builder.with_table_factories(table_factories); | ||
} | ||
|
||
Ok(builder.build()) | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,42 @@ | ||
// Licensed to the Apache Software Foundation (ASF) under one | ||
// or more contributor license agreements. See the NOTICE file | ||
// distributed with this work for additional information | ||
// regarding copyright ownership. The ASF licenses this file | ||
// to you under the Apache License, Version 2.0 (the | ||
// "License"); you may not use this file except in compliance | ||
// with the License. You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, | ||
// software distributed under the License is distributed on an | ||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
// KIND, either express or implied. See the License for the | ||
// specific language governing permissions and limitations | ||
// under the License. | ||
|
||
//! DeltaLake integration: [DeltaLakeExtension] | ||
|
||
use crate::config::ExecutionConfig; | ||
use crate::extensions::{DftSessionStateBuilder, Extension}; | ||
use deltalake::delta_datafusion::DeltaTableFactory; | ||
use std::sync::Arc; | ||
|
||
#[derive(Debug, Default)] | ||
pub struct DeltaLakeExtension {} | ||
|
||
impl DeltaLakeExtension { | ||
pub fn new() -> Self { | ||
Self {} | ||
} | ||
} | ||
|
||
impl Extension for DeltaLakeExtension { | ||
fn register( | ||
&self, | ||
_config: &ExecutionConfig, | ||
builder: DftSessionStateBuilder, | ||
) -> datafusion_common::Result<DftSessionStateBuilder> { | ||
Ok(builder.with_table_factory("DELTATABLE", Arc::new(DeltaTableFactory {}))) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I moved the ExecutionContext creation out of the application state (as it can now potentially return an Error)