Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Datafusion post-35 upgrade #498

Merged
merged 4 commits into from
Feb 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
918 changes: 372 additions & 546 deletions Cargo.lock

Large diffs are not rendered by default.

75 changes: 42 additions & 33 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,20 @@
members = ["clade"]

[workspace.dependencies]
arrow = "49.0.0"
arrow-buffer = "49.0.0"
arrow-csv = "49.0.0"
arrow-flight = "49.0.0"
arrow = "50.0.0"
arrow-buffer = "50.0.0"
arrow-csv = "50.0.0"
arrow-flight = "50.0.0"
# For the JSON format support
# https://github.com/apache/arrow-rs/pull/2868
# https://github.com/apache/arrow-rs/pull/2724
arrow-integration-test = "49.0.0"
arrow-schema = "49.0.0"
arrow-integration-test = "50.0.0"
arrow-schema = "50.0.0"
async-trait = "0.1.64"

datafusion = "34.0.0"
datafusion-common = "34.0.0"
datafusion-expr = "34.0.0"
datafusion = "35.0.0"
datafusion-common = "35.0.0"
datafusion-expr = "35.0.0"

itertools = ">=0.10.0"
tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "signal", "process"] }
Expand Down Expand Up @@ -51,22 +51,31 @@ object-store-s3 = ["object_store/aws"]
remote-tables = ["dep:datafusion-remote-tables"]

[patch.crates-io]
# Patch to pick up https://github.com/apache/arrow-rs/pull/5282 from
# https://github.com/splitgraph/arrow-rs/tree/49-with-date-fix

arrow-arith = { git = "https://github.com/splitgraph/arrow-rs", branch = "49-with-date-fix" }
arrow-array = { git = "https://github.com/splitgraph/arrow-rs", branch = "49-with-date-fix" }
arrow-buffer = { git = "https://github.com/splitgraph/arrow-rs", branch = "49-with-date-fix" }
arrow-cast = { git = "https://github.com/splitgraph/arrow-rs", branch = "49-with-date-fix" }
arrow-csv = { git = "https://github.com/splitgraph/arrow-rs", branch = "49-with-date-fix" }
arrow-data = { git = "https://github.com/splitgraph/arrow-rs", branch = "49-with-date-fix" }
arrow-ipc = { git = "https://github.com/splitgraph/arrow-rs", branch = "49-with-date-fix" }
arrow-json = { git = "https://github.com/splitgraph/arrow-rs", branch = "49-with-date-fix" }
arrow-ord = { git = "https://github.com/splitgraph/arrow-rs", branch = "49-with-date-fix" }
arrow-row = { git = "https://github.com/splitgraph/arrow-rs", branch = "49-with-date-fix" }
arrow-schema = { git = "https://github.com/splitgraph/arrow-rs", branch = "49-with-date-fix" }
arrow-select = { git = "https://github.com/splitgraph/arrow-rs", branch = "49-with-date-fix" }
arrow-string = { git = "https://github.com/splitgraph/arrow-rs", branch = "49-with-date-fix" }
# Pick up https://github.com/apache/arrow-rs/pull/5282
arrow-arith = { git = "https://github.com/apache/arrow-rs", rev = "72d8a783176219f0864022daba70e84ceab7e221" }
arrow-array = { git = "https://github.com/apache/arrow-rs", rev = "72d8a783176219f0864022daba70e84ceab7e221" }
arrow-buffer = { git = "https://github.com/apache/arrow-rs", rev = "72d8a783176219f0864022daba70e84ceab7e221" }
arrow-cast = { git = "https://github.com/apache/arrow-rs", rev = "72d8a783176219f0864022daba70e84ceab7e221" }
arrow-csv = { git = "https://github.com/apache/arrow-rs", rev = "72d8a783176219f0864022daba70e84ceab7e221" }
arrow-data = { git = "https://github.com/apache/arrow-rs", rev = "72d8a783176219f0864022daba70e84ceab7e221" }
arrow-ipc = { git = "https://github.com/apache/arrow-rs", rev = "72d8a783176219f0864022daba70e84ceab7e221" }
arrow-json = { git = "https://github.com/apache/arrow-rs", rev = "72d8a783176219f0864022daba70e84ceab7e221" }
arrow-ord = { git = "https://github.com/apache/arrow-rs", rev = "72d8a783176219f0864022daba70e84ceab7e221" }
arrow-row = { git = "https://github.com/apache/arrow-rs", rev = "72d8a783176219f0864022daba70e84ceab7e221" }
arrow-schema = { git = "https://github.com/apache/arrow-rs", rev = "72d8a783176219f0864022daba70e84ceab7e221" }
arrow-select = { git = "https://github.com/apache/arrow-rs", rev = "72d8a783176219f0864022daba70e84ceab7e221" }
arrow-string = { git = "https://github.com/apache/arrow-rs", rev = "72d8a783176219f0864022daba70e84ceab7e221" }

# Pick up https://github.com/apache/arrow-datafusion/pull/8894 and https://github.com/apache/arrow-datafusion/pull/9007
datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "a7a74fa522aaef07e6605f414308f3c99bd1ea06" }
datafusion-common = { git = "https://github.com/apache/arrow-datafusion", rev = "a7a74fa522aaef07e6605f414308f3c99bd1ea06" }
datafusion-execution = { git = "https://github.com/apache/arrow-datafusion", rev = "a7a74fa522aaef07e6605f414308f3c99bd1ea06" }
datafusion-expr = { git = "https://github.com/apache/arrow-datafusion", rev = "a7a74fa522aaef07e6605f414308f3c99bd1ea06" }
datafusion-optimizer = { git = "https://github.com/apache/arrow-datafusion", rev = "a7a74fa522aaef07e6605f414308f3c99bd1ea06" }
datafusion-physical-expr = { git = "https://github.com/apache/arrow-datafusion", rev = "a7a74fa522aaef07e6605f414308f3c99bd1ea06" }
datafusion-physical-plan = { git = "https://github.com/apache/arrow-datafusion", rev = "a7a74fa522aaef07e6605f414308f3c99bd1ea06" }
datafusion-proto = { git = "https://github.com/apache/arrow-datafusion", rev = "a7a74fa522aaef07e6605f414308f3c99bd1ea06" }
datafusion-sql = { git = "https://github.com/apache/arrow-datafusion", rev = "a7a74fa522aaef07e6605f414308f3c99bd1ea06" }

[dependencies]
arrow = { workspace = true }
Expand All @@ -88,8 +97,8 @@ clap = { version = "3.2.19", features = [ "derive" ] }
config = "0.13.3"

# PG wire protocol support
convergence = { git = "https://github.com/splitgraph/convergence", branch = "datafusion-34-upgrade", optional = true }
convergence-arrow = { git = "https://github.com/splitgraph/convergence", branch = "datafusion-34-upgrade", optional = true }
convergence = { git = "https://github.com/returnString/convergence", rev = "c58ba5c9903e96fd73b65fda8c7b19192fee5cd3", optional = true }
convergence-arrow = { git = "https://github.com/returnString/convergence", rev = "c58ba5c9903e96fd73b65fda8c7b19192fee5cd3", optional = true }

dashmap = "5.4.0"

Expand All @@ -99,14 +108,14 @@ datafusion-expr = { workspace = true }

datafusion-remote-tables = { path = "./datafusion_remote_tables", optional = true }

deltalake = { git = "https://github.com/delta-io/delta-rs", rev = "9264edea89a2fc1c35f4a6b9faab125748ff3651", features = ["s3-native-tls", "datafusion-ext"] }
deltalake = { git = "https://github.com/splitgraph/delta-rs", branch = "sqlparser-0.43", features = ["datafusion"] }

futures = "0.3"
hex = ">=0.4.0"
itertools = { workspace = true }
lazy_static = ">=1.4.0"
moka = { version = "0.12.2", default_features = false, features = ["future", "atomic64", "quanta"] }
object_store = "0.8"
object_store = "0.9"
parking_lot = "0.12.1"
percent-encoding = "2.2.0"
prost = "0.12.1"
Expand All @@ -123,7 +132,7 @@ rustyline = "13.0"
serde = "1.0.156"
serde_json = "1.0.93"
sha2 = ">=0.10.1"
sqlparser = "0.40"
sqlparser = { version = "0.43", features = ["visitor"] }
sqlx = { version = "0.7.1", features = [ "runtime-tokio-rustls", "sqlite", "any", "uuid" ] }
strum = ">=0.24"
strum_macros = ">=0.24"
Expand All @@ -139,9 +148,9 @@ uuid = "1.2.1"
warp = "0.3.6"

# For WASM user-defined functions
wasi-common = "16.0.0"
wasmtime = "16.0.0"
wasmtime-wasi = "16.0.0"
wasi-common = "17.0.0"
wasmtime = "17.0.0"
wasmtime-wasi = "17.0.0"

[dev-dependencies]
assert_cmd = "2"
Expand Down
2 changes: 1 addition & 1 deletion datafusion_remote_tables/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ arrow-schema = { workspace = true }
async-trait = { workspace = true }

# Remote query execution for a variety of DBs
connectorx = { git = "https://github.com/splitgraph/connector-x", branch = "datafusion-34-upgrade", features = [ "dst_arrow", "src_postgres", "src_mysql", "src_sqlite" ] }
connectorx = { git = "https://github.com/splitgraph/connector-x", branch = "datafusion-35-upgrade", features = [ "dst_arrow", "src_postgres", "src_mysql", "src_sqlite" ] }

datafusion = { workspace = true }
datafusion-common = { workspace = true }
Expand Down
18 changes: 5 additions & 13 deletions src/context/delta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ use deltalake::writer::create_add;
use deltalake::DeltaTable;
use futures::{StreamExt, TryStreamExt};
use object_store::path::Path;
use std::collections::HashMap;
use std::fs::File;
use std::sync::Arc;
use tempfile::{NamedTempFile, TempPath};
Expand Down Expand Up @@ -258,7 +257,7 @@ pub async fn plan_to_object_store(

// Create the corresponding Add action; currently we don't support partition columns
// which simplifies things.
let add = create_add(&HashMap::default(), file_name, size, &metadata)?;
let add = create_add(&Default::default(), file_name, size, &metadata)?;

Ok(add)
});
Expand Down Expand Up @@ -400,7 +399,7 @@ impl SeafowlContext {
table_log_store.as_ref(),
&actions,
op,
table.get_state(),
table.state.as_ref(),
None,
)
.await?;
Expand Down Expand Up @@ -522,7 +521,6 @@ mod tests {
adds[0].path.clone(),
adds[0].size,
adds[0].partition_values.is_empty(),
adds[0].partition_values_parsed.is_none(),
adds[0].data_change,
serde_json::from_str::<Value>(
adds[0].stats.clone().unwrap().as_str()
Expand All @@ -533,7 +531,6 @@ mod tests {
adds[1].path.clone(),
adds[1].size,
adds[1].partition_values.is_empty(),
adds[1].partition_values_parsed.is_none(),
adds[1].data_change,
serde_json::from_str::<Value>(
adds[1].stats.clone().unwrap().as_str()
Expand All @@ -544,8 +541,7 @@ mod tests {
vec![
(
PART_0_FILE_NAME.to_string(),
1269,
true,
1298,
true,
true,
json!({
Expand All @@ -569,8 +565,7 @@ mod tests {
),
(
PART_1_FILE_NAME.to_string(),
1284,
true,
1313,
true,
true,
json!({
Expand Down Expand Up @@ -599,10 +594,7 @@ mod tests {
object_store
.get_log_store(&table_uuid.to_string())
.object_store(),
vec![
Path::from(PART_0_FILE_NAME.to_string()),
Path::from(PART_1_FILE_NAME.to_string()),
],
vec![PART_0_FILE_NAME.to_string(), PART_1_FILE_NAME.to_string()],
)
.await;
}
Expand Down
16 changes: 2 additions & 14 deletions src/context/logical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ impl SeafowlContext {
// Delegate generic queries to the basic DataFusion logical planner
// (though note EXPLAIN [our custom query] will mean we have to implement EXPLAIN ourselves)
Statement::Explain { .. }
| Statement::ExplainTable { .. }
| Statement::ShowVariable { .. }
| Statement::ShowTables { .. }
| Statement::ShowColumns { .. }
Expand Down Expand Up @@ -309,7 +310,7 @@ impl SeafowlContext {
})),
}))
}
DFStatement::DescribeTableStmt(_) | DFStatement::CreateExternalTable(_) => {
DFStatement::CreateExternalTable(_) => {
self.inner.state().statement_to_plan(stmt).await
}
DFStatement::CopyTo(_) | DFStatement::Explain(_) => {
Expand Down Expand Up @@ -483,19 +484,6 @@ mod tests {
);
}

#[tokio::test]
async fn test_plan_insert_type_mismatch() {
let ctx = in_memory_context_with_test_db().await;

// Try inserting a timestamp into a number (note this will work fine for inserting
// e.g. Utf-8 into numbers at plan time but should fail at execution time if the value
// doesn't convert)
let err = ctx
.create_logical_plan("INSERT INTO testcol.some_table SELECT '2022-01-01', to_timestamp('2022-01-01T12:00:00')")
.await.unwrap_err();
assert_eq!(err.to_string(), "Error during planning: Cannot automatically convert Timestamp(Nanosecond, None) to Float64");
}

#[tokio::test]
async fn test_plan_insert_values_wrong_number() {
let ctx = in_memory_context_with_test_db().await;
Expand Down
30 changes: 22 additions & 8 deletions src/context/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,20 +133,15 @@ impl SeafowlContext {
&self,
table_name: impl Into<TableReference<'a>>,
) -> Result<DeltaTable> {
let table_log_store = self
.inner
self.inner
.table_provider(table_name)
.await?
.as_any()
.downcast_ref::<DeltaTable>()
.ok_or_else(|| {
DataFusionError::Execution("Table {table_name} not found".to_string())
})?
.log_store();

// We can't just keep hold of the downcasted ref from above because of
// `temporary value dropped while borrowed`
Ok(DeltaTable::new(table_log_store, Default::default()))
})
.cloned()
}

// Parse the uuid from the Delta table uri if available
Expand Down Expand Up @@ -264,6 +259,25 @@ mod tests {
use super::test_utils::in_memory_context;
use super::*;

#[tokio::test]
async fn test_timestamp_to_date_casting() -> Result<()> {
let ctx = in_memory_context().await;

let plan = ctx.plan_query("SELECT '1998-11-30 00:00:00'::date").await?;

let results = ctx.collect(plan).await?;
let expected = [
"+-----------------------------+",
"| Utf8(\"1998-11-30 00:00:00\") |",
"+-----------------------------+",
"| 1998-11-30 |",
"+-----------------------------+",
];
assert_batches_eq!(expected, &results);

Ok(())
}

#[rstest]
#[case::regular_type_names("float", "float")]
#[case::legacy_type_names("f32", "f32")]
Expand Down
Loading
Loading