Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement Extension registration API #142

Merged
merged 3 commits into from
Sep 14, 2024

Conversation

alamb
Copy link
Contributor

@alamb alamb commented Sep 14, 2024

Last part of #132
Closes #132

Rationale

I want to begin working on integration other components (e.g. #130) and I want to avoid littering the code with cfg flags.

I think after this PR we'll have two cfg uses for each extension. Short of macros this is the best I can come up with

Changes:

  1. Pull deltalake and s3 registration into their own module
  2. Create a new API for registering extensions
  3. Create a list of nice to have APIs upstream in DataFusion

@@ -77,19 +77,18 @@ pub struct App<'app> {
}

impl<'app> App<'app> {
pub fn new(state: state::AppState<'app>) -> Self {
pub fn new(state: state::AppState<'app>, execution: ExecutionContext) -> Self {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I moved the ExecutionContext creation out of the application state (as it can now potentially return an Error)

pub async fn run_app(state: state::AppState<'_>) -> Result<()> {
info!("Running app with state: {:?}", state);
let mut app = App::new(state);
impl App<'_> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This just makes run_app a method on app: App::run which simplified passing in arguments

use datafusion::execution::SendableRecordBatchStream;
use datafusion::physical_plan::{visit_execution_plan, ExecutionPlan, ExecutionPlanVisitor};
use datafusion::prelude::*;
use datafusion::sql::parser::Statement;
#[cfg(feature = "deltalake")]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The point of the PR is to remove these #cfg out of this file

.insert("DELTATABLE".to_string(), Arc::new(DeltaTableFactory {}));
pub fn try_new(config: &ExecutionConfig) -> Result<Self> {
let mut builder = DftSessionStateBuilder::new();
for extension in all_extensions() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am quite pleased how this looks now. Adding new extensions should be pretty straightforward

use std::fmt::Debug;
use std::sync::Arc;

/// Builds a DataFusion [`SessionState`] with any necessary configuration
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This struct is somewhat unfortunate - it is mostly due to the fact that the DataFusion SessionStateBuilder's APIs aren't quite nice enough to incrementally update the state

Over time I'll try and port some/all of these features upstream so we can just use SessionStateBuilder directly

/// Create a new builder
pub fn new() -> Self {
let session_config = SessionConfig::default()
// TODO why is batch size 1?
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just copied this batch size of 1 here, but it seems quite inefficient

@matthewmturner do you know why dft is using a batch size of 1?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That was copied over from the Polygon terminal app i was working on that was querying web sockets where it was useful to test with batch size of 1. We can definitely change that.

#[cfg(feature = "s3")]
Box::new(s3::AwsS3Extension::new()),
#[cfg(feature = "deltalake")]
Box::new(deltalake::DeltaLakeExtension::new()),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding a new extension should be a case of following the pattern in this file -- make a new module w/ feature flag and add a new extension in this list

let config_path = tempdir().unwrap();
let state = initialize(config_path.path().to_path_buf());
let _app = App::new(state);
async fn construct_with_no_args() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because I changed the App::new signature I had to change these tests too -- so I took the opportunity to rewrite them to use a common fixture to avoid as much repetition and broke them into multiple functions for each different feature

Co-authored-by: Matthew Turner <[email protected]>
}
Ok(Self {
session_ctx,
#[cfg(feature = "flightsql")]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

given the new organization i am wondering if flightsql client would be better a better fit in app_execution. nothing for this PR, just thinking out loud.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I agree the flightsql integration is still a little wonky.

One thing I am wondering is rather than directly using the FlightSQL client, would it make more sense to have the UIs use an interface (aka trait) that takes SQL text and returns a RecordBatchStream? Then we could implement that for both ExecutionContext and flightsql 🤔

It is more indirection but it would make it easy to use the same UI with different "backends" 🤔

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might be able to - the only thing that gives me pause on that is the statistics collection we do on the local executions to show additional statistics in the history tab. right now its simple per query stats (elapsed time, and bytes scanned), but i had in mind (i.e. ExecMetrics in the link above) also showing summary stats per ExecutionPlan node (i have in mind displaying something like a heatmap table to show where the expensive parts of a query are). It probably can be figured out how to work on a generic interface like that I just havent had time to think about it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could maybe just stube that out / return no information for engines that don't support providing the info (actually in InfludDB 3.0 we return some of that info as grpc headers in the FlightSQL response, so it would be possible)

Anyhow, maybe we can do that when adding flightsql to the cli interface

@matthewmturner matthewmturner merged commit b419d8c into datafusion-contrib:main Sep 14, 2024
3 checks passed
@matthewmturner
Copy link
Collaborator

Great update @alamb! Extension story is definitely improving.

@alamb
Copy link
Contributor Author

alamb commented Sep 14, 2024

Thank you @matthewmturner -- this is pretty exciting. Next up is JSON functions (and/or maybe writing some tests for S3 / delta)

@alamb alamb deleted the alamb/registration branch September 14, 2024 13:55
@matthewmturner
Copy link
Collaborator

@alamb sounds good. i am focusing on the pagination - it is a fairly invasive change to get it working with the tui rendering and so its taking some mucking around to figure out the right approach.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Preparatory refactoring of ExecutionContext
2 participants