-
Notifications
You must be signed in to change notification settings - Fork 11
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement Extension registration API #142
Implement Extension registration API #142
Conversation
@@ -77,19 +77,18 @@ pub struct App<'app> { | |||
} | |||
|
|||
impl<'app> App<'app> { | |||
pub fn new(state: state::AppState<'app>) -> Self { | |||
pub fn new(state: state::AppState<'app>, execution: ExecutionContext) -> Self { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I moved the ExecutionContext creation out of the application state (as it can now potentially return an Error)
pub async fn run_app(state: state::AppState<'_>) -> Result<()> { | ||
info!("Running app with state: {:?}", state); | ||
let mut app = App::new(state); | ||
impl App<'_> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This just makes run_app
a method on app: App::run
which simplified passing in arguments
use datafusion::execution::SendableRecordBatchStream; | ||
use datafusion::physical_plan::{visit_execution_plan, ExecutionPlan, ExecutionPlanVisitor}; | ||
use datafusion::prelude::*; | ||
use datafusion::sql::parser::Statement; | ||
#[cfg(feature = "deltalake")] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The point of the PR is to remove these #cfg
out of this file
src/execution/mod.rs
Outdated
.insert("DELTATABLE".to_string(), Arc::new(DeltaTableFactory {})); | ||
pub fn try_new(config: &ExecutionConfig) -> Result<Self> { | ||
let mut builder = DftSessionStateBuilder::new(); | ||
for extension in all_extensions() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am quite pleased how this looks now. Adding new extensions should be pretty straightforward
use std::fmt::Debug; | ||
use std::sync::Arc; | ||
|
||
/// Builds a DataFusion [`SessionState`] with any necessary configuration |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This struct is somewhat unfortunate - it is mostly due to the fact that the DataFusion SessionStateBuilder's APIs aren't quite nice enough to incrementally update the state
Over time I'll try and port some/all of these features upstream so we can just use SessionStateBuilder directly
/// Create a new builder | ||
pub fn new() -> Self { | ||
let session_config = SessionConfig::default() | ||
// TODO why is batch size 1? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just copied this batch size of 1 here, but it seems quite inefficient
@matthewmturner do you know why dft is using a batch size of 1?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That was copied over from the Polygon terminal app i was working on that was querying web sockets where it was useful to test with batch size of 1. We can definitely change that.
#[cfg(feature = "s3")] | ||
Box::new(s3::AwsS3Extension::new()), | ||
#[cfg(feature = "deltalake")] | ||
Box::new(deltalake::DeltaLakeExtension::new()), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Adding a new extension should be a case of following the pattern in this file -- make a new module w/ feature flag and add a new extension in this list
let config_path = tempdir().unwrap(); | ||
let state = initialize(config_path.path().to_path_buf()); | ||
let _app = App::new(state); | ||
async fn construct_with_no_args() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because I changed the App::new
signature I had to change these tests too -- so I took the opportunity to rewrite them to use a common fixture to avoid as much repetition and broke them into multiple functions for each different feature
Co-authored-by: Matthew Turner <[email protected]>
} | ||
Ok(Self { | ||
session_ctx, | ||
#[cfg(feature = "flightsql")] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
given the new organization i am wondering if flightsql
client would be better a better fit in app_execution
. nothing for this PR, just thinking out loud.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I agree the flightsql integration is still a little wonky.
One thing I am wondering is rather than directly using the FlightSQL client, would it make more sense to have the UIs use an interface (aka trait) that takes SQL text and returns a RecordBatchStream
? Then we could implement that for both ExecutionContext and flightsql 🤔
It is more indirection but it would make it easy to use the same UI with different "backends" 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We might be able to - the only thing that gives me pause on that is the statistics collection we do on the local executions to show additional statistics in the history tab. right now its simple per query stats (elapsed time, and bytes scanned), but i had in mind (i.e. ExecMetrics
in the link above) also showing summary stats per ExecutionPlan node (i have in mind displaying something like a heatmap table to show where the expensive parts of a query are). It probably can be figured out how to work on a generic interface like that I just havent had time to think about it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could maybe just stube that out / return no information for engines that don't support providing the info (actually in InfludDB 3.0 we return some of that info as grpc headers in the FlightSQL response, so it would be possible)
Anyhow, maybe we can do that when adding flightsql to the cli interface
Great update @alamb! Extension story is definitely improving. |
Thank you @matthewmturner -- this is pretty exciting. Next up is JSON functions (and/or maybe writing some tests for S3 / delta) |
@alamb sounds good. i am focusing on the pagination - it is a fairly invasive change to get it working with the tui rendering and so its taking some mucking around to figure out the right approach. |
Last part of #132
Closes #132
Rationale
I want to begin working on integration other components (e.g. #130) and I want to avoid littering the code with
cfg
flags.I think after this PR we'll have two
cfg
uses for each extension. Short of macros this is the best I can come up withChanges: