Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/main' into remove-rusoto
Browse files Browse the repository at this point in the history
  • Loading branch information
mightyshazam committed Mar 5, 2024
2 parents 5759a4c + c3d532b commit acf1074
Show file tree
Hide file tree
Showing 23 changed files with 707 additions and 135 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ arrow-ord = { version = "50" }
arrow-row = { version = "50" }
arrow-schema = { version = "50" }
arrow-select = { version = "50" }
object_store = { version = "0.9" }
object_store = { version = "=0.9.0" }
parquet = { version = "50" }

# datafusion
Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/kernel/models/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -643,7 +643,7 @@ mod tests {
use super::*;
use serde_json;
use serde_json::json;
use std::hash::DefaultHasher;
use std::collections::hash_map::DefaultHasher;

#[test]
fn test_serde_data_types() {
Expand Down
2 changes: 0 additions & 2 deletions crates/core/src/kernel/snapshot/log_segment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -515,8 +515,6 @@ pub(super) async fn list_log_files(

#[cfg(test)]
pub(super) mod tests {
use std::str::FromStr;

use deltalake_test::utils::*;

use super::*;
Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
//! # Querying Delta Tables with Datafusion
//!
//! Querying from local filesystem:
//! ```ignore
//! ```
//! use std::sync::Arc;
//! use datafusion::prelude::SessionContext;
//!
Expand Down
115 changes: 94 additions & 21 deletions crates/core/src/operations/cast.rs
Original file line number Diff line number Diff line change
@@ -1,36 +1,109 @@
//! Provide common cast functionality for callers
//!
use arrow_array::{Array, ArrayRef, RecordBatch, StructArray};
use arrow::datatypes::DataType::Dictionary;
use arrow_array::{new_null_array, Array, ArrayRef, RecordBatch, StructArray};
use arrow_cast::{cast_with_options, CastOptions};
use arrow_schema::{DataType, Fields, SchemaRef as ArrowSchemaRef};

use arrow_schema::{
ArrowError, DataType, Field as ArrowField, Fields, Schema as ArrowSchema,
SchemaRef as ArrowSchemaRef,
};
use std::sync::Arc;

use crate::DeltaResult;

pub(crate) fn merge_field(left: &ArrowField, right: &ArrowField) -> Result<ArrowField, ArrowError> {
if let Dictionary(_, value_type) = right.data_type() {
if value_type.equals_datatype(left.data_type()) {
return Ok(left.clone());
}
}
if let Dictionary(_, value_type) = left.data_type() {
if value_type.equals_datatype(right.data_type()) {
return Ok(right.clone());
}
}
let mut new_field = left.clone();
new_field.try_merge(right)?;
Ok(new_field)
}

pub(crate) fn merge_schema(
left: ArrowSchema,
right: ArrowSchema,
) -> Result<ArrowSchema, ArrowError> {
let mut errors = Vec::with_capacity(left.fields().len());
let merged_fields: Result<Vec<ArrowField>, ArrowError> = left
.fields()
.iter()
.map(|field| {
let right_field = right.field_with_name(field.name());
if let Ok(right_field) = right_field {
let field_or_not = merge_field(field.as_ref(), right_field);
match field_or_not {
Err(e) => {
errors.push(e.to_string());
Err(e)
}
Ok(f) => Ok(f),
}
} else {
Ok(field.as_ref().clone())
}
})
.collect();
match merged_fields {
Ok(mut fields) => {
for field in right.fields() {
if !left.field_with_name(field.name()).is_ok() {
fields.push(field.as_ref().clone());
}
}

Ok(ArrowSchema::new(fields))
}
Err(e) => {
errors.push(e.to_string());
Err(ArrowError::SchemaError(errors.join("\n")))
}
}
}

fn cast_struct(
struct_array: &StructArray,
fields: &Fields,
cast_options: &CastOptions,
add_missing: bool,
) -> Result<Vec<Arc<(dyn Array)>>, arrow_schema::ArrowError> {
fields
.iter()
.map(|field| {
let col = struct_array.column_by_name(field.name()).unwrap();
if let (DataType::Struct(_), DataType::Struct(child_fields)) =
(col.data_type(), field.data_type())
{
let child_struct = StructArray::from(col.into_data());
let s = cast_struct(&child_struct, child_fields, cast_options)?;
Ok(Arc::new(StructArray::new(
child_fields.clone(),
s,
child_struct.nulls().map(ToOwned::to_owned),
)) as ArrayRef)
} else if is_cast_required(col.data_type(), field.data_type()) {
cast_with_options(col, field.data_type(), cast_options)
} else {
Ok(col.clone())
let col_or_not = struct_array.column_by_name(field.name());
match col_or_not {
None => match add_missing {
true => Ok(new_null_array(field.data_type(), struct_array.len())),
false => Err(arrow_schema::ArrowError::SchemaError(format!(
"Could not find column {0}",
field.name()
))),
},
Some(col) => {
if let (DataType::Struct(_), DataType::Struct(child_fields)) =
(col.data_type(), field.data_type())
{
let child_struct = StructArray::from(col.into_data());
let s =
cast_struct(&child_struct, child_fields, cast_options, add_missing)?;
Ok(Arc::new(StructArray::new(
child_fields.clone(),
s,
child_struct.nulls().map(ToOwned::to_owned),
)) as ArrayRef)
} else if is_cast_required(col.data_type(), field.data_type()) {
cast_with_options(col, field.data_type(), cast_options)
} else {
Ok(col.clone())
}
}
}
})
.collect::<Result<Vec<_>, _>>()
Expand All @@ -51,6 +124,7 @@ pub fn cast_record_batch(
batch: &RecordBatch,
target_schema: ArrowSchemaRef,
safe: bool,
add_missing: bool,
) -> DeltaResult<RecordBatch> {
let cast_options = CastOptions {
safe,
Expand All @@ -62,8 +136,7 @@ pub fn cast_record_batch(
batch.columns().to_owned(),
None,
);

let columns = cast_struct(&s, target_schema.fields(), &cast_options)?;
let columns = cast_struct(&s, target_schema.fields(), &cast_options, add_missing)?;
Ok(RecordBatch::try_new(target_schema, columns)?)
}

Expand Down Expand Up @@ -93,7 +166,7 @@ mod tests {
)]);
let target_schema = Arc::new(Schema::new(fields)) as SchemaRef;

let result = cast_record_batch(&record_batch, target_schema, false);
let result = cast_record_batch(&record_batch, target_schema, false, false);

let schema = result.unwrap().schema();
let field = schema.column_with_name("list_column").unwrap().1;
Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/operations/convert_to_delta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ impl ConvertToDeltaBuilder {
}

/// Consume self into CreateBuilder with corresponding add actions, schemas and operation meta
async fn into_create_builder(mut self) -> Result<CreateBuilder, Error> {
async fn into_create_builder(self) -> Result<CreateBuilder, Error> {
// Use the specified log store. If a log store is not provided, create a new store from the specified path.
// Return an error if neither log store nor path is provided
let log_store = if let Some(log_store) = self.log_store {
Expand Down
11 changes: 9 additions & 2 deletions crates/core/src/operations/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
//! .await?;
//! ````
use core::panic;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::{Instant, SystemTime, UNIX_EPOCH};
Expand Down Expand Up @@ -167,9 +168,15 @@ async fn excute_non_empty_expr(
None,
writer_properties,
false,
false,
None,
)
.await?;
.await?
.into_iter()
.map(|a| match a {
Action::Add(a) => a,
_ => panic!("Expected Add action"),
})
.collect::<Vec<Add>>();

let read_records = scan.parquet_scan.metrics().and_then(|m| m.output_rows());
let filter_records = filter.metrics().and_then(|m| m.output_rows());
Expand Down
6 changes: 3 additions & 3 deletions crates/core/src/operations/merge/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1320,7 +1320,7 @@ async fn execute(
build_case(copy_when, copy_then)?,
));

let mut new_columns = {
let new_columns = {
let plan = projection.into_unoptimized_plan();
let mut fields: Vec<Expr> = plan
.schema()
Expand Down Expand Up @@ -1379,13 +1379,13 @@ async fn execute(
None,
writer_properties,
safe_cast,
false,
None,
)
.await?;

metrics.rewrite_time_ms = Instant::now().duration_since(rewrite_start).as_millis() as u64;

let mut actions: Vec<Action> = add_actions.into_iter().map(Action::Add).collect();
let mut actions: Vec<Action> = add_actions.clone();
metrics.num_target_files_added = actions.len();

let survivors = barrier
Expand Down
8 changes: 6 additions & 2 deletions crates/core/src/operations/optimize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -457,8 +457,12 @@ impl MergePlan {
while let Some(maybe_batch) = read_stream.next().await {
let mut batch = maybe_batch?;

batch =
super::cast::cast_record_batch(&batch, task_parameters.file_schema.clone(), false)?;
batch = super::cast::cast_record_batch(
&batch,
task_parameters.file_schema.clone(),
false,
false,
)?;
partial_metrics.num_batches += 1;
writer.write(&batch).await.map_err(DeltaTableError::from)?;
}
Expand Down
4 changes: 2 additions & 2 deletions crates/core/src/operations/update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ async fn execute(
None,
writer_properties,
safe_cast,
false,
None,
)
.await?;

Expand All @@ -377,7 +377,7 @@ async fn execute(
.duration_since(UNIX_EPOCH)
.unwrap()
.as_millis() as i64;
let mut actions: Vec<Action> = add_actions.into_iter().map(Action::Add).collect();
let mut actions: Vec<Action> = add_actions.clone();

metrics.num_added_files = actions.len();
metrics.num_removed_files = candidates.candidates.len();
Expand Down
Loading

0 comments on commit acf1074

Please sign in to comment.