From 56dfd25a8e9395527f4b87de5c6f0f6f3698c7bb Mon Sep 17 00:00:00 2001 From: Robert Pack <42610831+roeap@users.noreply.github.com> Date: Wed, 5 Jul 2023 21:10:52 +0200 Subject: [PATCH] chore: update datafusion and related crates (#1504) # Description Updating datafusion and related crates to latest version. With the updated object store, we unfortunately loose support for `aws-profile`. Since object sore now also contains logic for parsing urls, that we currently maintain here, I was planning on adopting these new APIs and recovering profile support in a follow up PR. This will then also remove the ignored deprecations from this PR. --- docker-compose.yml | 5 +-- python/Cargo.toml | 4 +-- python/src/lib.rs | 3 +- python/src/schema.rs | 24 +++++--------- rust/Cargo.toml | 40 +++++++++++------------- rust/src/action/checkpoints.rs | 2 ++ rust/src/data_catalog/storage/mod.rs | 2 +- rust/src/delta_datafusion.rs | 10 +++--- rust/src/operations/delete.rs | 31 +++++++++--------- rust/src/operations/transaction/state.rs | 6 +++- rust/src/operations/update.rs | 19 ++++++----- rust/src/storage/config.rs | 3 ++ rust/src/storage/file.rs | 12 +++++-- rust/src/storage/mod.rs | 8 +++++ rust/src/storage/s3.rs | 8 +++-- rust/src/storage/utils.rs | 1 + rust/src/writer/json.rs | 34 ++++++++++---------- rust/tests/datafusion_test.rs | 3 +- rust/tests/repair_s3_rename_test.rs | 8 +++-- 19 files changed, 123 insertions(+), 100 deletions(-) diff --git a/docker-compose.yml b/docker-compose.yml index 444f0edc15..4f11c3dae5 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -18,8 +18,9 @@ services: test: [ "CMD", "curl", "-f", "http://localhost:4566/health" ] fake-gcs: - image: fsouza/fake-gcs-server - command: ["-scheme", "http", "-port", "4443", "-external-url", "http://[::]:4443", "-backend", "memory"] + # Custom image - see fsouza/fake-gcs-server#1164 + image: tustvold/fake-gcs-server + command: ["-scheme", "http", "-public-host", "localhost:4443", "-backend", "memory"] ports: - 4443:4443 diff --git a/python/Cargo.toml b/python/Cargo.toml index d3c22af26f..9749226ce8 100644 --- a/python/Cargo.toml +++ b/python/Cargo.toml @@ -18,7 +18,7 @@ doc = false name = "deltalake._internal" [dependencies] -arrow-schema = { version = "40", features = ["serde"] } +arrow-schema = { version = "42", features = ["serde"] } chrono = "0" env_logger = "0" futures = "0.3" @@ -35,7 +35,7 @@ num_cpus = "1" reqwest = { version = "*", features = ["native-tls-vendored"] } [dependencies.pyo3] -version = "0.18" +version = "0.19" features = ["extension-module", "abi3", "abi3-py37"] [dependencies.deltalake] diff --git a/python/src/lib.rs b/python/src/lib.rs index ca7247c365..efd074b64f 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -775,7 +775,7 @@ fn write_new_deltalake( Ok(()) } -#[pyclass(name = "DeltaDataChecker", text_signature = "(invariants)")] +#[pyclass(name = "DeltaDataChecker")] struct PyDeltaDataChecker { inner: DeltaDataChecker, rt: tokio::runtime::Runtime, @@ -784,6 +784,7 @@ struct PyDeltaDataChecker { #[pymethods] impl PyDeltaDataChecker { #[new] + #[pyo3(signature = (invariants))] fn new(invariants: Vec<(String, String)>) -> Self { let invariants: Vec = invariants .into_iter() diff --git a/python/src/schema.rs b/python/src/schema.rs index 1e1a75c387..43d386a617 100644 --- a/python/src/schema.rs +++ b/python/src/schema.rs @@ -113,7 +113,7 @@ fn python_type_to_schema(ob: PyObject, py: Python) -> PyResult { /// * "decimal(, )" /// /// :param data_type: string representation of the data type -#[pyclass(module = "deltalake.schema", text_signature = "(data_type)")] +#[pyclass(module = "deltalake.schema")] #[derive(Clone)] pub struct PrimitiveType { inner_type: String, @@ -132,6 +132,7 @@ impl TryFrom for PrimitiveType { #[pymethods] impl PrimitiveType { #[new] + #[pyo3(signature = (data_type))] fn new(data_type: String) -> PyResult { if data_type.starts_with("decimal") { if try_parse_decimal_type(&data_type).is_none() { @@ -246,10 +247,7 @@ impl PrimitiveType { /// ArrayType(PrimitiveType("integer"), contains_null=True) /// >>> ArrayType("integer", contains_null=False) /// ArrayType(PrimitiveType("integer"), contains_null=False) -#[pyclass( - module = "deltalake.schema", - text_signature = "(element_type, contains_null=True)" -)] +#[pyclass(module = "deltalake.schema")] #[derive(Clone)] pub struct ArrayType { inner_type: SchemaTypeArray, @@ -411,10 +409,7 @@ impl ArrayType { /// MapType(PrimitiveType("integer"), PrimitiveType("string"), value_contains_null=True) /// >>> MapType("integer", "string", value_contains_null=False) /// MapType(PrimitiveType("integer"), PrimitiveType("string"), value_contains_null=False) -#[pyclass( - module = "deltalake.schema", - text_signature = "(key_type, value_type, value_contains_null=True)" -)] +#[pyclass(module = "deltalake.schema")] #[derive(Clone)] pub struct MapType { inner_type: SchemaTypeMap, @@ -597,10 +592,7 @@ impl MapType { /// /// >>> Field("my_col", "integer", metadata={"custom_metadata": {"test": 2}}) /// Field("my_col", PrimitiveType("integer"), nullable=True, metadata={"custom_metadata": {"test": 2}}) -#[pyclass( - module = "deltalake.schema", - text_signature = "(name, type, nullable=True, metadata=None)" -)] +#[pyclass(module = "deltalake.schema")] #[derive(Clone)] pub struct Field { inner: SchemaField, @@ -778,7 +770,7 @@ impl Field { /// /// >>> StructType([Field("x", "integer"), Field("y", "string")]) /// StructType([Field(x, PrimitiveType("integer"), nullable=True), Field(y, PrimitiveType("string"), nullable=True)]) -#[pyclass(subclass, module = "deltalake.schema", text_signature = "(fields)")] +#[pyclass(subclass, module = "deltalake.schema")] #[derive(Clone)] pub struct StructType { inner_type: SchemaTypeStruct, @@ -951,13 +943,13 @@ pub fn schema_to_pyobject(schema: &Schema, py: Python) -> PyResult { /// >>> import pyarrow as pa /// >>> Schema.from_pyarrow(pa.schema({"x": pa.int32(), "y": pa.string()})) /// Schema([Field(x, PrimitiveType("integer"), nullable=True), Field(y, PrimitiveType("string"), nullable=True)]) -#[pyclass(extends = StructType, name = "Schema", module = "deltalake.schema", -text_signature = "(fields)")] +#[pyclass(extends = StructType, name = "Schema", module = "deltalake.schema")] pub struct PySchema; #[pymethods] impl PySchema { #[new] + #[pyo3(signature = (fields))] fn new(fields: Vec>) -> PyResult<(Self, StructType)> { let fields: Vec = fields .into_iter() diff --git a/rust/Cargo.toml b/rust/Cargo.toml index bbdaaff974..75621cc361 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -13,14 +13,14 @@ readme = "README.md" edition = "2021" [dependencies] -arrow = { version = "40", optional = true } -arrow-array = { version = "40", optional = true } -arrow-buffer = { version = "40", optional = true } -arrow-cast = { version = "40", optional = true } -arrow-ord = { version = "40", optional = true } -arrow-row = { version = "40", optional = true } -arrow-schema = { version = "40", optional = true } -arrow-select = { version = "40", optional = true } +arrow = { version = "42", optional = true } +arrow-array = { version = "42", optional = true } +arrow-buffer = { version = "42", optional = true } +arrow-cast = { version = "42", optional = true } +arrow-ord = { version = "42", optional = true } +arrow-row = { version = "42", optional = true } +arrow-schema = { version = "42", optional = true } +arrow-select = { version = "42", optional = true } async-trait = "0.1" bytes = "1" chrono = { version = "0.4.22", default-features = false, features = ["clock"] } @@ -38,10 +38,10 @@ libc = ">=0.2.90, <1" num-bigint = "0.4" num_cpus = "1" num-traits = "0.2.15" -object_store = "0.5.6" +object_store = "0.6.1" once_cell = "1.16.0" parking_lot = "0.12" -parquet = { version = "40", features = [ +parquet = { version = "42", features = [ "async", "object_store", ], optional = true } @@ -50,7 +50,7 @@ percent-encoding = "2" serde = { version = "1", features = ["derive"] } serde_json = "1" thiserror = "1" -tokio = { version = "1", features = ["macros", "rt", "parking_lot"] } +tokio = { version = "1.28", features = ["macros", "rt", "rt-multi-thread", "sync", "fs", "parking_lot"] } regex = "1" uuid = { version = "1", features = ["serde", "v4"] } url = "2.3" @@ -65,7 +65,7 @@ rusoto_dynamodb = { version = "0.47", default-features = false, optional = true rusoto_glue = { version = "0.47", default-features = false, optional = true } # Unity -reqwest = { version = "0.11", default-features = false, features = [ +reqwest = { version = "0.11.18", default-features = false, features = [ "rustls-tls", "json", ], optional = true } @@ -74,15 +74,15 @@ reqwest-retry = { version = "0.2.2", optional = true } # Datafusion dashmap = { version = "5", optional = true } -datafusion = { version = "26", optional = true } -datafusion-expr = { version = "26", optional = true } -datafusion-common = { version = "26", optional = true } -datafusion-proto = { version = "26", optional = true } -datafusion-sql = { version = "26", optional = true } -datafusion-physical-expr = { version = "26", optional = true } +datafusion = { version = "27", optional = true } +datafusion-expr = { version = "27", optional = true } +datafusion-common = { version = "27", optional = true } +datafusion-proto = { version = "27", optional = true } +datafusion-sql = { version = "27", optional = true } +datafusion-physical-expr = { version = "27", optional = true } -sqlparser = { version = "0.34", optional = true } +sqlparser = { version = "0.35", optional = true } # NOTE dependencies only for integration tests fs_extra = { version = "1.2.0", optional = true } @@ -135,7 +135,6 @@ s3-native-tls = [ "rusoto_dynamodb/native-tls", "dynamodb_lock/native-tls", "object_store/aws", - "object_store/aws_profile", ] s3 = [ "rusoto_core/rustls", @@ -144,7 +143,6 @@ s3 = [ "rusoto_dynamodb/rustls", "dynamodb_lock/rustls", "object_store/aws", - "object_store/aws_profile", ] unity-experimental = ["reqwest", "reqwest-middleware", "reqwest-retry"] diff --git a/rust/src/action/checkpoints.rs b/rust/src/action/checkpoints.rs index 18054fa1ba..f3a280ad3b 100644 --- a/rust/src/action/checkpoints.rs +++ b/rust/src/action/checkpoints.rs @@ -211,6 +211,7 @@ pub async fn cleanup_expired_logs_for( location: Path::from(""), last_modified: DateTime::::MIN_UTC, size: 0, + e_tag: None, }, ); let file_needs_time_adjustment = @@ -255,6 +256,7 @@ pub async fn cleanup_expired_logs_for( location: current_file.1.location.clone(), last_modified: last_file.1.last_modified.add(Duration::seconds(1)), size: 0, + e_tag: None, }, ); maybe_delete_files.push(updated); diff --git a/rust/src/data_catalog/storage/mod.rs b/rust/src/data_catalog/storage/mod.rs index 37083411a5..726afee102 100644 --- a/rust/src/data_catalog/storage/mod.rs +++ b/rust/src/data_catalog/storage/mod.rs @@ -145,7 +145,7 @@ impl SchemaProvider for ListingSchemaProvider { mod tests { use super::*; use datafusion::assert_batches_sorted_eq; - use datafusion::catalog::catalog::{CatalogProvider, MemoryCatalogProvider}; + use datafusion::catalog::{CatalogProvider, MemoryCatalogProvider}; use datafusion::execution::context::SessionContext; #[test] diff --git a/rust/src/delta_datafusion.rs b/rust/src/delta_datafusion.rs index 479269c2cc..3147a88938 100644 --- a/rust/src/delta_datafusion.rs +++ b/rust/src/delta_datafusion.rs @@ -36,8 +36,9 @@ use arrow_array::StringArray; use arrow_schema::Field; use async_trait::async_trait; use chrono::{DateTime, NaiveDateTime, Utc}; -use datafusion::datasource::datasource::TableProviderFactory; use datafusion::datasource::file_format::{parquet::ParquetFormat, FileFormat}; +use datafusion::datasource::physical_plan::FileScanConfig; +use datafusion::datasource::provider::TableProviderFactory; use datafusion::datasource::{listing::PartitionedFile, MemTable, TableProvider, TableType}; use datafusion::execution::context::{SessionContext, SessionState, TaskContext}; use datafusion::execution::runtime_env::RuntimeEnv; @@ -45,7 +46,6 @@ use datafusion::execution::FunctionRegistry; use datafusion::optimizer::utils::conjunction; use datafusion::physical_expr::PhysicalSortExpr; use datafusion::physical_optimizer::pruning::{PruningPredicate, PruningStatistics}; -use datafusion::physical_plan::file_format::FileScanConfig; use datafusion::physical_plan::filter::FilterExec; use datafusion::physical_plan::limit::LocalLimitExec; use datafusion::physical_plan::{ @@ -1377,7 +1377,6 @@ mod tests { use arrow::array::StructArray; use arrow::datatypes::{DataType, Field, Schema}; use chrono::{TimeZone, Utc}; - use datafusion::from_slice::FromSlice; use datafusion::physical_plan::empty::EmptyExec; use datafusion_proto::physical_plan::AsExecutionPlan; use datafusion_proto::protobuf; @@ -1558,6 +1557,7 @@ mod tests { location: Path::from("year=2015/month=1/part-00000-4dcb50d3-d017-450c-9df7-a7257dbd3c5d-c000.snappy.parquet".to_string()), last_modified: Utc.timestamp_millis_opt(1660497727833).unwrap(), size: 10644, + e_tag: None }, partition_values: [ScalarValue::Int64(Some(2015)), ScalarValue::Int64(Some(1))].to_vec(), range: None, @@ -1575,8 +1575,8 @@ mod tests { let batch = RecordBatch::try_new( Arc::clone(&schema), vec![ - Arc::new(arrow::array::StringArray::from_slice(["a", "b", "c", "d"])), - Arc::new(arrow::array::Int32Array::from_slice([1, 10, 10, 100])), + Arc::new(arrow::array::StringArray::from(vec!["a", "b", "c", "d"])), + Arc::new(arrow::array::Int32Array::from(vec![1, 10, 10, 100])), ], ) .unwrap(); diff --git a/rust/src/operations/delete.rs b/rust/src/operations/delete.rs index 80be16e088..37cff07ba4 100644 --- a/rust/src/operations/delete.rs +++ b/rust/src/operations/delete.rs @@ -333,7 +333,6 @@ mod tests { use arrow::datatypes::{Field, Schema}; use arrow::record_batch::RecordBatch; use datafusion::assert_batches_sorted_eq; - use datafusion::from_slice::FromSlice; use datafusion::prelude::*; use std::sync::Arc; @@ -358,9 +357,9 @@ mod tests { let batch = RecordBatch::try_new( Arc::clone(&schema), vec![ - Arc::new(arrow::array::StringArray::from_slice(["A", "B", "A", "A"])), - Arc::new(arrow::array::Int32Array::from_slice([1, 10, 10, 100])), - Arc::new(arrow::array::StringArray::from_slice([ + Arc::new(arrow::array::StringArray::from(vec!["A", "B", "A", "A"])), + Arc::new(arrow::array::Int32Array::from(vec![1, 10, 10, 100])), + Arc::new(arrow::array::StringArray::from(vec![ "2021-02-02", "2021-02-02", "2021-02-02", @@ -411,9 +410,9 @@ mod tests { let batch = RecordBatch::try_new( Arc::clone(&schema), vec![ - Arc::new(arrow::array::StringArray::from_slice(["A", "B", "A", "A"])), - Arc::new(arrow::array::Int32Array::from_slice([1, 10, 10, 100])), - Arc::new(arrow::array::StringArray::from_slice([ + Arc::new(arrow::array::StringArray::from(vec!["A", "B", "A", "A"])), + Arc::new(arrow::array::Int32Array::from(vec![1, 10, 10, 100])), + Arc::new(arrow::array::StringArray::from(vec![ "2021-02-02", "2021-02-02", "2021-02-02", @@ -435,9 +434,9 @@ mod tests { let batch = RecordBatch::try_new( Arc::clone(&schema), vec![ - Arc::new(arrow::array::StringArray::from_slice(["A", "B", "A", "A"])), - Arc::new(arrow::array::Int32Array::from_slice([0, 20, 10, 100])), - Arc::new(arrow::array::StringArray::from_slice([ + Arc::new(arrow::array::StringArray::from(vec!["A", "B", "A", "A"])), + Arc::new(arrow::array::Int32Array::from(vec![0, 20, 10, 100])), + Arc::new(arrow::array::StringArray::from(vec![ "2021-02-02", "2021-02-02", "2021-02-02", @@ -586,9 +585,9 @@ mod tests { let batch = RecordBatch::try_new( Arc::clone(&schema), vec![ - Arc::new(arrow::array::StringArray::from_slice(["A", "B", "A", "A"])), - Arc::new(arrow::array::Int32Array::from_slice([0, 20, 10, 100])), - Arc::new(arrow::array::StringArray::from_slice([ + Arc::new(arrow::array::StringArray::from(vec!["A", "B", "A", "A"])), + Arc::new(arrow::array::Int32Array::from(vec![0, 20, 10, 100])), + Arc::new(arrow::array::StringArray::from(vec![ "2021-02-02", "2021-02-03", "2021-02-02", @@ -644,9 +643,9 @@ mod tests { let batch = RecordBatch::try_new( Arc::clone(&schema), vec![ - Arc::new(arrow::array::StringArray::from_slice(["A", "B", "A", "A"])), - Arc::new(arrow::array::Int32Array::from_slice([0, 20, 10, 100])), - Arc::new(arrow::array::StringArray::from_slice([ + Arc::new(arrow::array::StringArray::from(vec!["A", "B", "A", "A"])), + Arc::new(arrow::array::Int32Array::from(vec![0, 20, 10, 100])), + Arc::new(arrow::array::StringArray::from(vec![ "2021-02-02", "2021-02-03", "2021-02-02", diff --git a/rust/src/operations/transaction/state.rs b/rust/src/operations/transaction/state.rs index cb1b336c3c..7962ae6e4d 100644 --- a/rust/src/operations/transaction/state.rs +++ b/rust/src/operations/transaction/state.rs @@ -4,9 +4,9 @@ use arrow::array::ArrayRef; use arrow::datatypes::{ DataType, Field as ArrowField, Schema as ArrowSchema, SchemaRef as ArrowSchemaRef, }; +use datafusion::datasource::physical_plan::wrap_partition_type_in_dict; use datafusion::optimizer::utils::conjunction; use datafusion::physical_optimizer::pruning::{PruningPredicate, PruningStatistics}; -use datafusion::physical_plan::file_format::wrap_partition_type_in_dict; use datafusion_common::config::ConfigOptions; use datafusion_common::scalar::ScalarValue; use datafusion_common::{Column, DFSchema, Result as DFResult, TableReference}; @@ -362,6 +362,10 @@ impl ContextProvider for DummyContextProvider { fn options(&self) -> &ConfigOptions { &self.options } + + fn get_window_meta(&self, _name: &str) -> Option> { + unimplemented!() + } } #[cfg(test)] diff --git a/rust/src/operations/update.rs b/rust/src/operations/update.rs index 34dbf27113..a9cf28abb5 100644 --- a/rust/src/operations/update.rs +++ b/rust/src/operations/update.rs @@ -596,7 +596,6 @@ mod tests { use arrow::record_batch::RecordBatch; use arrow_array::Int32Array; use datafusion::assert_batches_sorted_eq; - use datafusion::from_slice::FromSlice; use datafusion::prelude::*; use std::sync::Arc; @@ -650,9 +649,9 @@ mod tests { let batch = RecordBatch::try_new( Arc::clone(&schema), vec![ - Arc::new(arrow::array::StringArray::from_slice(["A", "B", "A", "A"])), - Arc::new(arrow::array::Int32Array::from_slice([1, 10, 10, 100])), - Arc::new(arrow::array::StringArray::from_slice([ + Arc::new(arrow::array::StringArray::from(vec!["A", "B", "A", "A"])), + Arc::new(arrow::array::Int32Array::from(vec![1, 10, 10, 100])), + Arc::new(arrow::array::StringArray::from(vec![ "2021-02-02", "2021-02-02", "2021-02-02", @@ -701,9 +700,9 @@ mod tests { let batch = RecordBatch::try_new( Arc::clone(&schema), vec![ - Arc::new(arrow::array::StringArray::from_slice(["A", "B", "A", "A"])), - Arc::new(arrow::array::Int32Array::from_slice([1, 10, 10, 100])), - Arc::new(arrow::array::StringArray::from_slice([ + Arc::new(arrow::array::StringArray::from(vec!["A", "B", "A", "A"])), + Arc::new(arrow::array::Int32Array::from(vec![1, 10, 10, 100])), + Arc::new(arrow::array::StringArray::from(vec![ "2021-02-02", "2021-02-02", "2021-02-03", @@ -756,9 +755,9 @@ mod tests { let batch = RecordBatch::try_new( Arc::clone(&schema), vec![ - Arc::new(arrow::array::StringArray::from_slice(["A", "B", "A", "A"])), - Arc::new(arrow::array::Int32Array::from_slice([1, 10, 10, 100])), - Arc::new(arrow::array::StringArray::from_slice([ + Arc::new(arrow::array::StringArray::from(vec!["A", "B", "A", "A"])), + Arc::new(arrow::array::Int32Array::from(vec![1, 10, 10, 100])), + Arc::new(arrow::array::StringArray::from(vec![ "2021-02-02", "2021-02-02", "2021-02-03", diff --git a/rust/src/storage/config.rs b/rust/src/storage/config.rs index 873f29dd1b..a89cbb3fc5 100644 --- a/rust/src/storage/config.rs +++ b/rust/src/storage/config.rs @@ -149,6 +149,7 @@ fn try_configure_memory(storage_url: &Url) -> DeltaResult> { } #[cfg(feature = "gcs")] +#[allow(deprecated)] fn try_configure_gcs( storage_url: &Url, options: &StorageOptions, @@ -172,6 +173,7 @@ fn try_configure_gcs( } #[cfg(feature = "azure")] +#[allow(deprecated)] fn try_configure_azure( storage_url: &Url, options: &StorageOptions, @@ -196,6 +198,7 @@ fn try_configure_azure( } #[cfg(any(feature = "s3", feature = "s3-native-tls"))] +#[allow(deprecated)] fn try_configure_s3( storage_url: &Url, options: &StorageOptions, diff --git a/rust/src/storage/file.rs b/rust/src/storage/file.rs index b0775416b4..d9188b758c 100644 --- a/rust/src/storage/file.rs +++ b/rust/src/storage/file.rs @@ -5,8 +5,8 @@ use bytes::Bytes; use futures::stream::BoxStream; use object_store::{ - local::LocalFileSystem, path::Path as ObjectStorePath, Error as ObjectStoreError, GetResult, - ListResult, MultipartId, ObjectMeta as ObjStoreObjectMeta, ObjectStore, + local::LocalFileSystem, path::Path as ObjectStorePath, Error as ObjectStoreError, GetOptions, + GetResult, ListResult, MultipartId, ObjectMeta as ObjStoreObjectMeta, ObjectStore, Result as ObjectStoreResult, }; use std::ops::Range; @@ -153,6 +153,14 @@ impl ObjectStore for FileStorageBackend { self.inner.get(location).await } + async fn get_opts( + &self, + location: &ObjectStorePath, + options: GetOptions, + ) -> ObjectStoreResult { + self.inner.get_opts(location, options).await + } + async fn get_range( &self, location: &ObjectStorePath, diff --git a/rust/src/storage/mod.rs b/rust/src/storage/mod.rs index 20ce1bc97d..6d4bd080e0 100644 --- a/rust/src/storage/mod.rs +++ b/rust/src/storage/mod.rs @@ -8,6 +8,7 @@ use std::sync::Arc; use bytes::Bytes; use futures::{stream::BoxStream, StreamExt}; use lazy_static::lazy_static; +use object_store::GetOptions; use serde::de::{Error, SeqAccess, Visitor}; use serde::ser::SerializeSeq; use serde::{Deserialize, Deserializer, Serialize, Serializer}; @@ -210,6 +211,13 @@ impl ObjectStore for DeltaObjectStore { self.storage.get(location).await } + /// Perform a get request with options + /// + /// Note: options.range will be ignored if [`GetResult::File`] + async fn get_opts(&self, location: &Path, options: GetOptions) -> ObjectStoreResult { + self.storage.get_opts(location, options).await + } + /// Return the bytes that are stored at the specified location /// in the given byte range async fn get_range(&self, location: &Path, range: Range) -> ObjectStoreResult { diff --git a/rust/src/storage/s3.rs b/rust/src/storage/s3.rs index cc8c0714d5..b7da2ae4d7 100644 --- a/rust/src/storage/s3.rs +++ b/rust/src/storage/s3.rs @@ -7,8 +7,8 @@ use dynamodb_lock::{DynamoError, LockClient, LockItem, DEFAULT_MAX_RETRY_ACQUIRE use futures::stream::BoxStream; use object_store::path::Path; use object_store::{ - DynObjectStore, Error as ObjectStoreError, GetResult, ListResult, MultipartId, ObjectMeta, - ObjectStore, Result as ObjectStoreResult, + DynObjectStore, Error as ObjectStoreError, GetOptions, GetResult, ListResult, MultipartId, + ObjectMeta, ObjectStore, Result as ObjectStoreResult, }; use rusoto_core::{HttpClient, Region}; use rusoto_credential::AutoRefreshingProvider; @@ -451,6 +451,10 @@ impl ObjectStore for S3StorageBackend { self.inner.get(location).await } + async fn get_opts(&self, location: &Path, options: GetOptions) -> ObjectStoreResult { + self.inner.get_opts(location, options).await + } + async fn get_range(&self, location: &Path, range: Range) -> ObjectStoreResult { self.inner.get_range(location, range).await } diff --git a/rust/src/storage/utils.rs b/rust/src/storage/utils.rs index 30eabb4e70..5034d93387 100644 --- a/rust/src/storage/utils.rs +++ b/rust/src/storage/utils.rs @@ -94,6 +94,7 @@ impl TryFrom<&Add> for ObjectMeta { location: Path::parse(value.path.as_str())?, last_modified, size: value.size as usize, + e_tag: None, }) } } diff --git a/rust/src/writer/json.rs b/rust/src/writer/json.rs index 28d6992749..fc98cb0b90 100644 --- a/rust/src/writer/json.rs +++ b/rust/src/writer/json.rs @@ -310,27 +310,25 @@ impl DeltaWriter> for JsonWriter { async fn write(&mut self, values: Vec) -> Result<(), DeltaTableError> { let mut partial_writes: Vec<(Value, ParquetError)> = Vec::new(); let arrow_schema = self.arrow_schema(); + let divided = self.divide_by_partition_values(values)?; + let partition_columns = self.partition_columns.clone(); + let writer_properties = self.writer_properties.clone(); - for (key, values) in self.divide_by_partition_values(values)? { + for (key, values) in divided { match self.arrow_writers.get_mut(&key) { - Some(writer) => collect_partial_write_failure( - &mut partial_writes, - writer - .write_values(&self.partition_columns, arrow_schema.clone(), values) - .await, - )?, + Some(writer) => { + let result = writer + .write_values(&partition_columns, arrow_schema.clone(), values) + .await; + collect_partial_write_failure(&mut partial_writes, result)?; + } None => { - let schema = - arrow_schema_without_partitions(&arrow_schema, &self.partition_columns); - let mut writer = DataArrowWriter::new(schema, self.writer_properties.clone())?; - - collect_partial_write_failure( - &mut partial_writes, - writer - .write_values(&self.partition_columns, self.arrow_schema(), values) - .await, - )?; - + let schema = arrow_schema_without_partitions(&arrow_schema, &partition_columns); + let mut writer = DataArrowWriter::new(schema, writer_properties.clone())?; + let result = writer + .write_values(&partition_columns, arrow_schema.clone(), values) + .await; + collect_partial_write_failure(&mut partial_writes, result)?; self.arrow_writers.insert(key, writer); } } diff --git a/rust/tests/datafusion_test.rs b/rust/tests/datafusion_test.rs index 5d3e07f003..e23e321dfd 100644 --- a/rust/tests/datafusion_test.rs +++ b/rust/tests/datafusion_test.rs @@ -11,10 +11,11 @@ use arrow::datatypes::{ use arrow::record_batch::RecordBatch; use common::datafusion::context_with_delta_table_factory; use datafusion::assert_batches_sorted_eq; +use datafusion::datasource::physical_plan::ParquetExec; use datafusion::datasource::TableProvider; use datafusion::execution::context::{SessionContext, SessionState, TaskContext}; use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec; -use datafusion::physical_plan::{common::collect, file_format::ParquetExec, metrics::Label}; +use datafusion::physical_plan::{common::collect, metrics::Label}; use datafusion::physical_plan::{visit_execution_plan, ExecutionPlan, ExecutionPlanVisitor}; use datafusion_common::scalar::ScalarValue; use datafusion_common::ScalarValue::*; diff --git a/rust/tests/repair_s3_rename_test.rs b/rust/tests/repair_s3_rename_test.rs index b4200566c0..25a9a5e060 100644 --- a/rust/tests/repair_s3_rename_test.rs +++ b/rust/tests/repair_s3_rename_test.rs @@ -9,8 +9,8 @@ use deltalake::{storage::s3::S3StorageBackend, DeltaTableBuilder, ObjectStore}; use futures::stream::BoxStream; use object_store::path::Path; use object_store::{ - DynObjectStore, Error as ObjectStoreError, GetResult, ListResult, MultipartId, ObjectMeta, - Result as ObjectStoreResult, + DynObjectStore, Error as ObjectStoreError, GetOptions, GetResult, ListResult, MultipartId, + ObjectMeta, Result as ObjectStoreResult, }; use serial_test::serial; use std::ops::Range; @@ -177,6 +177,10 @@ impl ObjectStore for DelayedObjectStore { self.inner.get(location).await } + async fn get_opts(&self, location: &Path, options: GetOptions) -> ObjectStoreResult { + self.inner.get_opts(location, options).await + } + async fn get_range(&self, location: &Path, range: Range) -> ObjectStoreResult { self.inner.get_range(location, range).await }