Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement Extension registration API #142

Merged
merged 3 commits into from
Sep 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 24 additions & 21 deletions src/app/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,19 +77,18 @@ pub struct App<'app> {
}

impl<'app> App<'app> {
pub fn new(state: state::AppState<'app>) -> Self {
pub fn new(state: state::AppState<'app>, execution: ExecutionContext) -> Self {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I moved the ExecutionContext creation out of the application state (as it can now potentially return an Error)

let (event_tx, event_rx) = mpsc::unbounded_channel();
let cancellation_token = CancellationToken::new();
let task = tokio::spawn(async {});
let execution = Arc::new(ExecutionContext::new(state.config.execution.clone()));

Self {
state,
task,
event_rx,
event_tx,
cancellation_token,
execution,
execution: Arc::new(execution),
}
}

Expand Down Expand Up @@ -310,30 +309,34 @@ impl Widget for &App<'_> {
}
}

pub async fn run_app(state: state::AppState<'_>) -> Result<()> {
info!("Running app with state: {:?}", state);
let mut app = App::new(state);
impl App<'_> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This just makes run_app a method on app: App::run which simplified passing in arguments

/// Run the main event loop for the application
pub async fn run_app(self) -> Result<()> {
info!("Running app with state: {:?}", self.state);
let mut app = self;

app.execute_ddl();
app.execute_ddl();

#[cfg(feature = "flightsql")]
app.establish_flightsql_connection();
#[cfg(feature = "flightsql")]
app.establish_flightsql_connection();

let mut terminal = ratatui::Terminal::new(CrosstermBackend::new(std::io::stdout())).unwrap();
app.enter(true)?;
// Main loop for handling events
loop {
let event = app.next().await?;
let mut terminal =
ratatui::Terminal::new(CrosstermBackend::new(std::io::stdout())).unwrap();
app.enter(true)?;
// Main loop for handling events
loop {
let event = app.next().await?;

if let AppEvent::Render = event.clone() {
terminal.draw(|f| f.render_widget(&app, f.area()))?;
};
if let AppEvent::Render = event.clone() {
terminal.draw(|f| f.render_widget(&app, f.area()))?;
};

app.handle_app_event(event)?;
app.handle_app_event(event)?;

if app.state.should_quit {
break;
if app.state.should_quit {
break;
}
}
app.exit()
}
app.exit()
}
5 changes: 1 addition & 4 deletions src/cli/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
// under the License.
//! [`CliApp`]: Command Line User Interface

use crate::app::state;
use crate::execution::ExecutionContext;
use color_eyre::eyre::eyre;
use datafusion::arrow::util::pretty::pretty_format_batches;
Expand All @@ -35,9 +34,7 @@ pub struct CliApp {
}

impl CliApp {
pub fn new(state: state::AppState<'static>) -> Self {
let execution = ExecutionContext::new(state.config.execution.clone());

pub fn new(execution: ExecutionContext) -> Self {
Self { execution }
}

Expand Down
77 changes: 12 additions & 65 deletions src/execution/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,25 +20,20 @@
use std::sync::Arc;

use color_eyre::eyre::Result;
use datafusion::execution::runtime_env::RuntimeEnv;
use datafusion::execution::session_state::SessionStateBuilder;
use datafusion::execution::SendableRecordBatchStream;
use datafusion::physical_plan::{visit_execution_plan, ExecutionPlan, ExecutionPlanVisitor};
use datafusion::prelude::*;
use datafusion::sql::parser::Statement;
#[cfg(feature = "deltalake")]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The point of the PR is to remove these #cfg out of this file

use deltalake::delta_datafusion::DeltaTableFactory;
use log::info;
use tokio_stream::StreamExt;
#[cfg(feature = "s3")]
use url::Url;
#[cfg(feature = "flightsql")]
use {
arrow_flight::sql::client::FlightSqlServiceClient, tokio::sync::Mutex,
tonic::transport::Channel,
};

use crate::config::ExecutionConfig;
use crate::extensions::{enabled_extensions, DftSessionStateBuilder};

/// Structure for executing queries either locally or remotely (via FlightSQL)
///
Expand All @@ -61,68 +56,20 @@ pub struct ExecutionContext {

impl ExecutionContext {
/// Construct a new `ExecutionContext` with the specified configuration
pub fn new(config: ExecutionConfig) -> Self {
let _ = &config; // avoid unused variable warning (it is used when some features are enabled)

let cfg = SessionConfig::default()
.with_batch_size(1)
.with_information_schema(true);

let runtime_env = RuntimeEnv::default();

#[cfg(feature = "s3")]
{
if let Some(object_store_config) = &config.object_store {
if let Some(s3_configs) = &object_store_config.s3 {
info!("S3 configs exists");
for s3_config in s3_configs {
match s3_config.to_object_store() {
Ok(object_store) => {
info!("Created object store");
if let Some(object_store_url) = s3_config.object_store_url() {
info!("Endpoint exists");
if let Ok(parsed_endpoint) = Url::parse(object_store_url) {
info!("Parsed endpoint");
runtime_env.register_object_store(
&parsed_endpoint,
Arc::new(object_store),
);
info!("Registered s3 object store");
}
}
}
Err(e) => {
log::error!("Error creating object store: {:?}", e);
}
}
}
}
}
}

#[allow(unused_mut)] // used when deltalake is enabled
let mut state = SessionStateBuilder::new()
.with_default_features()
.with_runtime_env(runtime_env.into())
.with_config(cfg)
.build();

#[cfg(feature = "deltalake")]
{
state
.table_factories_mut()
.insert("DELTATABLE".to_string(), Arc::new(DeltaTableFactory {}));
pub fn try_new(config: &ExecutionConfig) -> Result<Self> {
let mut builder = DftSessionStateBuilder::new();
for extension in enabled_extensions() {
builder = extension.register(config, builder)?;
}

{
let session_ctx = SessionContext::new_with_state(state);
let state = builder.build()?;
let session_ctx = SessionContext::new_with_state(state);

Self {
session_ctx,
#[cfg(feature = "flightsql")]
flightsql_client: Mutex::new(None),
}
}
Ok(Self {
session_ctx,
#[cfg(feature = "flightsql")]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

given the new organization i am wondering if flightsql client would be better a better fit in app_execution. nothing for this PR, just thinking out loud.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I agree the flightsql integration is still a little wonky.

One thing I am wondering is rather than directly using the FlightSQL client, would it make more sense to have the UIs use an interface (aka trait) that takes SQL text and returns a RecordBatchStream? Then we could implement that for both ExecutionContext and flightsql 🤔

It is more indirection but it would make it easy to use the same UI with different "backends" 🤔

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might be able to - the only thing that gives me pause on that is the statistics collection we do on the local executions to show additional statistics in the history tab. right now its simple per query stats (elapsed time, and bytes scanned), but i had in mind (i.e. ExecMetrics in the link above) also showing summary stats per ExecutionPlan node (i have in mind displaying something like a heatmap table to show where the expensive parts of a query are). It probably can be figured out how to work on a generic interface like that I just havent had time to think about it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could maybe just stube that out / return no information for engines that don't support providing the info (actually in InfludDB 3.0 we return some of that info as grpc headers in the FlightSQL response, so it would be possible)

Anyhow, maybe we can do that when adding flightsql to the cli interface

flightsql_client: Mutex::new(None),
})
}

pub fn create_tables(&mut self) -> Result<()> {
Expand Down
130 changes: 130 additions & 0 deletions src/extensions/builder.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! [`DftSessionStateBuilder`] for configuring DataFusion [`SessionState`]

use datafusion::catalog::TableProviderFactory;
use datafusion::execution::context::SessionState;
use datafusion::execution::runtime_env::RuntimeEnv;
use datafusion::execution::session_state::SessionStateBuilder;
use datafusion::prelude::SessionConfig;
use std::collections::HashMap;
use std::fmt::Debug;
use std::sync::Arc;

/// Builds a DataFusion [`SessionState`] with any necessary configuration
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This struct is somewhat unfortunate - it is mostly due to the fact that the DataFusion SessionStateBuilder's APIs aren't quite nice enough to incrementally update the state

Over time I'll try and port some/all of these features upstream so we can just use SessionStateBuilder directly

///
/// Ideally we would use the DataFusion [`SessionStateBuilder`], but it doesn't
/// currently have all the needed APIs. Once we have a good handle on the needed
/// APIs we can upstream them to DataFusion.
///
/// List of things that would be nice to add upstream:
/// TODO: Implement Debug for SessionStateBuilder upstream
/// TODO: Implement with_table_factory to add a single TableProviderFactory to the list of factories
/// TODO: Implement some way to get access to the current RuntimeEnv (to register object stores)
/// TODO: Implement a way to add just a single TableProviderFactory
/// TODO: Make TableFactoryProvider implement Debug
/// TODO: rename RuntimeEnv::new() to RuntimeEnv::try_new() as it returns a Result
//#[derive(Debug)]
pub struct DftSessionStateBuilder {
session_config: SessionConfig,
table_factories: Option<HashMap<String, Arc<dyn TableProviderFactory>>>,
runtime_env: Option<Arc<RuntimeEnv>>,
}

impl Debug for DftSessionStateBuilder {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("DftSessionStateBuilder")
.field("session_config", &self.session_config)
//.field("table_factories", &self.table_factories)
.field(
"table_factories",
&"TODO TableFactoryDoes not implement Debug",
)
.field("runtime_env", &self.runtime_env)
.finish()
}
}

impl Default for DftSessionStateBuilder {
fn default() -> Self {
Self::new()
}
}

impl DftSessionStateBuilder {
/// Create a new builder
pub fn new() -> Self {
let session_config = SessionConfig::default()
// TODO why is batch size 1?
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just copied this batch size of 1 here, but it seems quite inefficient

@matthewmturner do you know why dft is using a batch size of 1?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That was copied over from the Polygon terminal app i was working on that was querying web sockets where it was useful to test with batch size of 1. We can definitely change that.

.with_batch_size(1)
.with_information_schema(true);

Self {
session_config,
table_factories: None,
runtime_env: None,
}
}

/// Add a table factory to the list of factories on this builder
pub fn with_table_factory(
mut self,
name: &str,
factory: Arc<dyn TableProviderFactory>,
) -> Self {
if self.table_factories.is_none() {
self.table_factories = Some(HashMap::from([(name.to_string(), factory)]));
} else {
self.table_factories
.as_mut()
.unwrap()
.insert(name.to_string(), factory);
}
self
}

/// Return the current [`RuntimeEnv`], creating a default if it doesn't exist
pub fn runtime_env(&mut self) -> &RuntimeEnv {
if self.runtime_env.is_none() {
self.runtime_env = Some(Arc::new(RuntimeEnv::default()));
}
self.runtime_env.as_ref().unwrap()
}

/// Build the [`SessionState`] from the specified configuration
pub fn build(self) -> datafusion_common::Result<SessionState> {
let Self {
session_config,
table_factories,
runtime_env,
} = self;

let mut builder = SessionStateBuilder::new()
.with_default_features()
.with_config(session_config);

if let Some(runtime_env) = runtime_env {
builder = builder.with_runtime_env(runtime_env);
}
if let Some(table_factories) = table_factories {
builder = builder.with_table_factories(table_factories);
}

Ok(builder.build())
}
}
42 changes: 42 additions & 0 deletions src/extensions/deltalake.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! DeltaLake integration: [DeltaLakeExtension]

use crate::config::ExecutionConfig;
use crate::extensions::{DftSessionStateBuilder, Extension};
use deltalake::delta_datafusion::DeltaTableFactory;
use std::sync::Arc;

#[derive(Debug, Default)]
pub struct DeltaLakeExtension {}

impl DeltaLakeExtension {
pub fn new() -> Self {
Self {}
}
}

impl Extension for DeltaLakeExtension {
fn register(
&self,
_config: &ExecutionConfig,
builder: DftSessionStateBuilder,
) -> datafusion_common::Result<DftSessionStateBuilder> {
Ok(builder.with_table_factory("DELTATABLE", Arc::new(DeltaTableFactory {})))
}
}
Loading
Loading