Skip to content

Commit

Permalink
Merge pull request #486 from splitgraph/upgrade-deps
Browse files Browse the repository at this point in the history
Upgrade to DataFusion 34 and Arrow 49
  • Loading branch information
gruuya authored Jan 15, 2024
2 parents ddf9c89 + edf5b4a commit dcb2f44
Show file tree
Hide file tree
Showing 20 changed files with 995 additions and 959 deletions.
1,559 changes: 755 additions & 804 deletions Cargo.lock

Large diffs are not rendered by default.

42 changes: 21 additions & 21 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,15 @@ object-store-s3 = ["object_store/aws"]
remote-tables = ["dep:datafusion-remote-tables"]

[dependencies]
arrow = "47.0.0"
arrow-buffer = "47.0.0"
arrow-csv = "47.0.0"
arrow-flight = { version = "47.0.0", optional = true }
arrow = "49.0.0"
arrow-buffer = "49.0.0"
arrow-csv = "49.0.0"
arrow-flight = { version = "49.0.0", optional = true }
# For the JSON format support
# https://github.com/apache/arrow-rs/pull/2868
# https://github.com/apache/arrow-rs/pull/2724
arrow-integration-test = "47.0.0"
arrow-schema = "47.0.0"
arrow-integration-test = "49.0.0"
arrow-schema = "49.0.0"
async-trait = "0.1.64"
base64 = "0.21.0"

Expand All @@ -50,26 +50,26 @@ clap = { version = "3.2.19", features = [ "derive" ] }
config = "0.13.3"

# PG wire protocol support
convergence = { git = "https://github.com/splitgraph/convergence", branch = "datafusion-32-upgrade", optional = true }
convergence-arrow = { git = "https://github.com/splitgraph/convergence", branch = "datafusion-32-upgrade", optional = true }
convergence = { git = "https://github.com/splitgraph/convergence", branch = "datafusion-34-upgrade", optional = true }
convergence-arrow = { git = "https://github.com/splitgraph/convergence", branch = "datafusion-34-upgrade", optional = true }

dashmap = "5.4.0"

datafusion = "32.0.0"
datafusion-common = "32.0.0"
datafusion-expr = "32.0.0"
datafusion = "34.0.0"
datafusion-common = "34.0.0"
datafusion-expr = "34.0.0"

datafusion-remote-tables = { path = "./datafusion_remote_tables", optional = true }

deltalake = { git = "https://github.com/delta-io/delta-rs", rev = "2b913b37e71ed96212dcec8c3fc8e865754ced82", features = ["s3-native-tls", "datafusion-ext"] }
deltalake = { git = "https://github.com/delta-io/delta-rs", rev = "9264edea89a2fc1c35f4a6b9faab125748ff3651", features = ["s3-native-tls", "datafusion-ext"] }

futures = "0.3"
hex = ">=0.4.0"
itertools = ">=0.10.0"
lazy_static = ">=1.4.0"
log = "0.4"
moka = { version = "0.11.0", default_features = false, features = ["future", "atomic64", "quanta"] }
object_store = "0.7"
moka = { version = "0.12.2", default_features = false, features = ["future", "atomic64", "quanta"] }
object_store = "0.8"
parking_lot = "0.12.1"
percent-encoding = "2.2.0"
pretty_env_logger = "0.4"
Expand All @@ -83,26 +83,26 @@ reqwest = { version = "0.11.14", features = [ "stream" ] }
rmp = "0.8.11"
rmp-serde = "1.1.1"
rmpv = { version = "1.0.0", features = ["with-serde"] }
rustyline = "12.0"
rustyline = "13.0"
serde = "1.0.156"
serde_json = "1.0.93"
sha2 = ">=0.10.1"
sqlparser = "0.38"
sqlparser = "0.40"
sqlx = { version = "0.7.1", features = [ "runtime-tokio-rustls", "sqlite", "any", "uuid" ] }
strum = ">=0.24"
strum_macros = ">=0.24"
tempfile = "3"
thiserror = "1"
tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "signal", "process"] }
tonic = { version = "0.10.0", optional = true }
url = "2.2"
url = "2.5"
uuid = "1.2.1"
warp = "0.3.5"
wasi-common = "14.0.0"
warp = "0.3.6"

# For WASM user-defined functions
wasmtime = "14.0.0"
wasmtime-wasi = "14.0.0"
wasi-common = "16.0.0"
wasmtime = "16.0.0"
wasmtime-wasi = "16.0.0"

[dev-dependencies]
assert_cmd = "2"
Expand Down
14 changes: 7 additions & 7 deletions datafusion_remote_tables/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,17 @@ license = "Apache-2.0"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
arrow = "47.0.0"
arrow-buffer = "47.0.0"
arrow-schema = "47.0.0"
arrow = "49.0.0"
arrow-buffer = "49.0.0"
arrow-schema = "49.0.0"
async-trait = "0.1.64"

# Remote query execution for a variety of DBs
connectorx = { git = "https://github.com/splitgraph/connector-x", branch = "datafusion-32-upgrade", features = [ "dst_arrow", "src_postgres", "src_mysql", "src_sqlite" ] }
connectorx = { git = "https://github.com/splitgraph/connector-x", branch = "datafusion-34-upgrade", features = [ "dst_arrow", "src_postgres", "src_mysql", "src_sqlite" ] }

datafusion = "32.0.0"
datafusion-common = "32.0.0"
datafusion-expr = "32.0.0"
datafusion = "34.0.0"
datafusion-common = "34.0.0"
datafusion-expr = "34.0.0"
itertools = ">=0.10.0"
log = "0.4"
tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "signal", "process"] }
Expand Down
2 changes: 1 addition & 1 deletion datafusion_remote_tables/src/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,12 @@ use datafusion::common::DataFusionError;
use datafusion::datasource::TableProvider;
use datafusion::error::Result;
use datafusion::execution::context::SessionState;
use datafusion::optimizer::utils::conjunction;
use datafusion::physical_expr::expressions::{cast, col};
use datafusion::physical_expr::PhysicalExpr;
use datafusion::physical_plan::memory::MemoryExec;
use datafusion::physical_plan::projection::ProjectionExec;
use datafusion::physical_plan::ExecutionPlan;
use datafusion_expr::utils::conjunction;
use datafusion_expr::{Expr, TableProviderFilterPushDown, TableType};
use log::debug;
use std::any::Any;
Expand Down
3 changes: 1 addition & 2 deletions src/context/delta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -428,7 +428,7 @@ impl SeafowlContext {
let store = table.object_store();

// List all objects with the table prefix...
let objects = store.list(None).await?.map_ok(|m| m.location).boxed();
let objects = store.list(None).map_ok(|m| m.location).boxed();

// ... and delete them in bulk (if applicable).
let _paths = store
Expand All @@ -449,7 +449,6 @@ mod tests {
use arrow::{array::Int32Array, datatypes::DataType, record_batch::RecordBatch};
use arrow_schema::{Field, Schema};
use datafusion::physical_plan::{memory::MemoryExec, ExecutionPlan};
use deltalake::logstore::LogStore;
use object_store::{local::LocalFileSystem, memory::InMemory, path::Path};
use rstest::rstest;
use serde_json::{json, Value};
Expand Down
32 changes: 30 additions & 2 deletions src/context/logical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ use crate::{
use datafusion::common::DFSchema;
use datafusion::error::{DataFusionError as Error, Result};
use datafusion::execution::context::SessionState;
use datafusion::optimizer::analyzer::Analyzer;
use datafusion::optimizer::optimizer::Optimizer;
use datafusion::optimizer::simplify_expressions::SimplifyExpressions;
use datafusion::optimizer::{OptimizerContext, OptimizerRule};
use datafusion::sql::parser::{CopyToSource, CopyToStatement};
use datafusion::{prelude::SessionContext, sql::TableReference};
use datafusion_expr::logical_plan::{Extension, LogicalPlan};
Expand Down Expand Up @@ -108,7 +112,7 @@ impl SeafowlContext {
| Statement::CreateSchema { .. }
| Statement::CreateView { .. }
| Statement::CreateDatabase { .. } => self.inner.state().statement_to_plan(stmt).await,
Statement::Insert{ ref mut source, .. } => {
Statement::Insert{ source: Some(ref mut source), .. } => {
let state = self.rewrite_time_travel_query(source).await?;
let plan = state.statement_to_plan(stmt).await?;
state.optimize(&plan)
Expand All @@ -121,7 +125,31 @@ impl SeafowlContext {
if with_hints.is_empty() && joins.is_empty() => {
let state = self.inner.state();
let plan = state.statement_to_plan(stmt).await?;
state.optimize(&plan)

// Create a custom optimizer to avoid mangling effects of some optimizers (like
// `CommonSubexprEliminate`) which can add nested Projection plans and rewrite
// expressions.
// We also need to do a analyze round beforehand for type coercion.
let analyzer = Analyzer::new();
let plan = analyzer.execute_and_check(
&plan,
self.inner.copied_config().options(),
|_, _| {},
)?;
let optimizer = Optimizer::with_rules(
vec![
Arc::new(SimplifyExpressions::new()),
]
);
let config = OptimizerContext::default();
optimizer.optimize(&plan, &config, |plan: &LogicalPlan, rule: &dyn OptimizerRule| {
debug!(
"After applying rule '{}':\n{}\n",
rule.name(),
plan.display_indent()
)
}
)
},
Statement::Delete{ .. } => {
let state = self.inner.state();
Expand Down
2 changes: 1 addition & 1 deletion src/context/physical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ use url::Url;
/// This is used for queries that are actually run before we produce the plan,
/// since they have to manipulate catalog metadata or use async to write to it.
fn make_dummy_exec() -> Arc<dyn ExecutionPlan> {
Arc::new(EmptyExec::new(false, SchemaRef::new(Schema::empty())))
Arc::new(EmptyExec::new(SchemaRef::new(Schema::empty())))
}

impl SeafowlContext {
Expand Down
3 changes: 2 additions & 1 deletion src/datafusion/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -498,7 +498,7 @@ impl<'a> DFParser<'a> {
self.parser
.parse_keywords(&[Keyword::IF, Keyword::NOT, Keyword::EXISTS]);
let table_name = self.parser.parse_object_name()?;
let (columns, _) = self.parse_columns()?;
let (columns, constraints) = self.parse_columns()?;

#[derive(Default)]
struct Builder {
Expand Down Expand Up @@ -617,6 +617,7 @@ impl<'a> DFParser<'a> {
.unwrap_or(CompressionTypeVariant::UNCOMPRESSED),
unbounded,
options: builder.options.unwrap_or(HashMap::new()),
constraints,
};
Ok(Statement::CreateExternalTable(create))
}
Expand Down
6 changes: 5 additions & 1 deletion src/datafusion/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ pub(crate) fn convert_simple_data_type(sql_type: &SQLDataType) -> Result<DataTyp
SQLDataType::Char(_)
| SQLDataType::Varchar(_)
| SQLDataType::Text
| SQLDataType::String => Ok(DataType::Utf8),
| SQLDataType::String(_) => Ok(DataType::Utf8),
SQLDataType::Timestamp(None, tz_info) => {
let tz = if matches!(tz_info, TimezoneInfo::Tz)
|| matches!(tz_info, TimezoneInfo::WithTimeZone)
Expand Down Expand Up @@ -134,6 +134,10 @@ pub(crate) fn convert_simple_data_type(sql_type: &SQLDataType) -> Result<DataTyp
| SQLDataType::BigNumeric(_)
| SQLDataType::BigDecimal(_)
| SQLDataType::Clob(_)
| SQLDataType::Bytes(_)
| SQLDataType::Int64
| SQLDataType::Float64
| SQLDataType::Struct(_)
=> not_impl_err!(
"Unsupported SQL type {sql_type:?}"
),
Expand Down
22 changes: 7 additions & 15 deletions src/delta_rs/backports.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,7 @@ use chrono::{NaiveDateTime, TimeZone, Utc};
use datafusion::datasource::file_format::parquet::ParquetFormat;
use datafusion::datasource::file_format::FileFormat;
use datafusion::datasource::listing::PartitionedFile;
use datafusion::datasource::physical_plan::{
wrap_partition_type_in_dict, FileScanConfig,
};
use datafusion::datasource::physical_plan::FileScanConfig;
use datafusion::execution::context::SessionState;
use datafusion::physical_expr::PhysicalExpr;
use datafusion::physical_plan::ExecutionPlan;
Expand Down Expand Up @@ -43,7 +41,7 @@ pub async fn parquet_scan_from_actions(
.push(part);
}

let table_partition_cols = table.get_metadata()?.partition_columns.clone();
let table_partition_cols = table.metadata()?.partition_columns.clone();
let file_schema = Arc::new(Schema::new(
schema
.fields()
Expand All @@ -66,19 +64,12 @@ pub async fn parquet_scan_from_actions(
object_store_url,
file_schema,
file_groups: file_groups.into_values().collect(),
statistics: table.state.datafusion_table_statistics(),
statistics: table.state.datafusion_table_statistics()?,
projection: projection.cloned(),
limit,
table_partition_cols: table_partition_cols
.iter()
.map(|c| {
Ok((
c.to_owned(),
wrap_partition_type_in_dict(
schema.field_with_name(c)?.data_type().clone(),
),
))
})
.map(|c| Ok(schema.field_with_name(c)?.clone()))
.collect::<Result<Vec<_>, ArrowError>>()?,
output_ordering: vec![],
infinite_source: false,
Expand Down Expand Up @@ -117,6 +108,7 @@ fn partitioned_file_from_action(action: &Add, schema: &Schema) -> PartitionedFil
last_modified,
size: action.size as usize,
e_tag: None,
version: None,
},
partition_values,
range: None,
Expand All @@ -142,7 +134,7 @@ fn to_correct_scalar_value(
)
.ok()?;
let cast_arr = cast_with_options(
&time_nanos.to_array(),
&time_nanos.to_array().ok()?,
field_dt,
&CastOptions {
safe: false,
Expand All @@ -164,7 +156,7 @@ fn to_correct_scalar_value(
)
.ok()?;
let cast_arr = cast_with_options(
&time_nanos.to_array(),
&time_nanos.to_array().ok()?,
field_dt,
&CastOptions {
safe: false,
Expand Down
Loading

0 comments on commit dcb2f44

Please sign in to comment.