diff --git a/crates/core/src/operations/write.rs b/crates/core/src/operations/write.rs index 5deaaa1393..1801a36353 100644 --- a/crates/core/src/operations/write.rs +++ b/crates/core/src/operations/write.rs @@ -309,20 +309,45 @@ impl WriteBuilder { } async fn check_preconditions(&self) -> DeltaResult> { + if self.schema_mode == Some(SchemaMode::Overwrite) && self.mode != SaveMode::Overwrite { + return Err(DeltaTableError::Generic( + "Schema overwrite not supported for Append".to_string(), + )); + } + + let batches: &Vec = match &self.batches { + Some(batches) => { + if batches.is_empty() { + error!("The WriteBuilder was an empty set of batches!"); + return Err(WriteError::MissingData.into()); + } + batches + } + None => { + if self.input.is_none() { + error!("The WriteBuilder must have an input plan _or_ batches!"); + return Err(WriteError::MissingData.into()); + } + // provide an empty array in the case that an input plan exists + &vec![] + } + }; + + let schema: StructType = match &self.input { + Some(plan) => (plan.schema()).try_into()?, + None => (batches[0].schema()).try_into()?, + }; + match &self.snapshot { Some(snapshot) => { - PROTOCOL.can_write_to(snapshot)?; - - let schema: StructType = if let Some(plan) = &self.input { - (plan.schema()).try_into()? - } else if let Some(batches) = &self.batches { - if batches.is_empty() { - return Err(WriteError::MissingData.into()); + if self.mode == SaveMode::Overwrite { + PROTOCOL.check_append_only(&snapshot.snapshot)?; + if !snapshot.load_config().require_files { + return Err(DeltaTableError::NotInitializedWithFiles("WRITE".into())); } - (batches[0].schema()).try_into()? - } else { - return Err(WriteError::MissingData.into()); - }; + } + + PROTOCOL.can_write_to(snapshot)?; if self.schema_mode.is_none() { PROTOCOL.check_can_write_timestamp_ntz(snapshot, &schema)?; @@ -335,16 +360,6 @@ impl WriteBuilder { } } None => { - let schema: StructType = if let Some(plan) = &self.input { - Ok(plan.schema().try_into()?) - } else if let Some(batches) = &self.batches { - if batches.is_empty() { - return Err(WriteError::MissingData.into()); - } - Ok(batches[0].schema().try_into()?) - } else { - Err(WriteError::MissingData) - }?; let mut builder = CreateBuilder::new() .with_log_store(self.log_store.clone()) .with_columns(schema.fields().cloned()) @@ -786,21 +801,8 @@ impl std::future::IntoFuture for WriteBuilder { let mut metrics = WriteMetrics::default(); let exec_start = Instant::now(); - if this.mode == SaveMode::Overwrite { - if let Some(snapshot) = &this.snapshot { - PROTOCOL.check_append_only(&snapshot.snapshot)?; - if !snapshot.load_config().require_files { - return Err(DeltaTableError::NotInitializedWithFiles("WRITE".into())); - } - } - } - if this.schema_mode == Some(SchemaMode::Overwrite) && this.mode != SaveMode::Overwrite { - return Err(DeltaTableError::Generic( - "Schema overwrite not supported for Append".to_string(), - )); - } - - // Create table actions to initialize table in case it does not yet exist and should be created + // Create table actions to initialize table in case it does not yet exist and should be + // created let mut actions = this.check_preconditions().await?; let active_partitions = this @@ -2320,4 +2322,147 @@ mod tests { assert!(!cdc_actions.is_empty()); Ok(()) } + + /// SMall module to collect test cases which validate the [WriteBuilder]'s + /// check_preconditions() function + mod check_preconditions_test { + use super::*; + + #[tokio::test] + async fn test_schema_overwrite_on_append() -> DeltaResult<()> { + let table_schema = get_delta_schema(); + let batch = get_record_batch(None, false); + let table = DeltaOps::new_in_memory() + .create() + .with_columns(table_schema.fields().cloned()) + .await?; + let writer = DeltaOps(table) + .write(vec![batch]) + .with_schema_mode(SchemaMode::Overwrite) + .with_save_mode(SaveMode::Append); + + let check = writer.check_preconditions().await; + assert!(check.is_err()); + Ok(()) + } + + #[tokio::test] + async fn test_savemode_overwrite_on_append_table() -> DeltaResult<()> { + let table_schema = get_delta_schema(); + let batch = get_record_batch(None, false); + let table = DeltaOps::new_in_memory() + .create() + .with_configuration_property(TableProperty::AppendOnly, Some("true".to_string())) + .with_columns(table_schema.fields().cloned()) + .await?; + let writer = DeltaOps(table) + .write(vec![batch]) + .with_save_mode(SaveMode::Overwrite); + + let check = writer.check_preconditions().await; + assert!(check.is_err()); + Ok(()) + } + + #[tokio::test] + async fn test_empty_set_of_batches() -> DeltaResult<()> { + let table_schema = get_delta_schema(); + let table = DeltaOps::new_in_memory() + .create() + .with_columns(table_schema.fields().cloned()) + .await?; + let writer = DeltaOps(table).write(vec![]); + + match writer.check_preconditions().await { + Ok(_) => panic!("Expected check_preconditions to fail!"), + Err(DeltaTableError::GenericError { .. }) => {} + Err(e) => panic!("Unexpected error returned: {e:#?}"), + } + Ok(()) + } + + #[tokio::test] + async fn test_errorifexists() -> DeltaResult<()> { + let table_schema = get_delta_schema(); + let batch = get_record_batch(None, false); + let table = DeltaOps::new_in_memory() + .create() + .with_columns(table_schema.fields().cloned()) + .await?; + let writer = DeltaOps(table) + .write(vec![batch]) + .with_save_mode(SaveMode::ErrorIfExists); + + match writer.check_preconditions().await { + Ok(_) => panic!("Expected check_preconditions to fail!"), + Err(DeltaTableError::GenericError { .. }) => {} + Err(e) => panic!("Unexpected error returned: {e:#?}"), + } + Ok(()) + } + + #[tokio::test] + async fn test_allow_empty_batches_with_input_plan() -> DeltaResult<()> { + let table_schema = get_delta_schema(); + let table = DeltaOps::new_in_memory() + .create() + .with_columns(table_schema.fields().cloned()) + .await?; + + let ctx = SessionContext::new(); + let plan = ctx + .sql("SELECT 1 as id") + .await + .unwrap() + .create_physical_plan() + .await + .unwrap(); + let writer = WriteBuilder::new(table.log_store.clone(), table.state) + .with_input_execution_plan(plan) + .with_save_mode(SaveMode::Overwrite); + + let _ = writer.check_preconditions().await?; + Ok(()) + } + + #[tokio::test] + async fn test_no_snapshot_create_actions() -> DeltaResult<()> { + let table_schema = get_delta_schema(); + let table = DeltaOps::new_in_memory() + .create() + .with_columns(table_schema.fields().cloned()) + .await?; + let batch = get_record_batch(None, false); + let writer = + WriteBuilder::new(table.log_store.clone(), None).with_input_batches(vec![batch]); + + let actions = writer.check_preconditions().await?; + assert_eq!( + actions.len(), + 2, + "Expecting a Protocol and a Metadata action in {actions:?}" + ); + + Ok(()) + } + + #[tokio::test] + async fn test_no_snapshot_err_no_batches_check() -> DeltaResult<()> { + let table_schema = get_delta_schema(); + let table = DeltaOps::new_in_memory() + .create() + .with_columns(table_schema.fields().cloned()) + .await?; + let writer = + WriteBuilder::new(table.log_store.clone(), None).with_input_batches(vec![]); + + match writer.check_preconditions().await { + Ok(_) => panic!("Expected check_preconditions to fail!"), + Err(DeltaTableError::GenericError { .. }) => {} + Err(e) => panic!("Unexpected error returned: {e:#?}"), + } + + Ok(()) + } + } }