diff --git a/crates/core/src/operations/merge/mod.rs b/crates/core/src/operations/merge/mod.rs index 8a4640b9a3..712e961e96 100644 --- a/crates/core/src/operations/merge/mod.rs +++ b/crates/core/src/operations/merge/mod.rs @@ -74,7 +74,7 @@ use crate::delta_datafusion::{ register_store, DataFusionMixins, DeltaColumn, DeltaScan, DeltaScanConfigBuilder, DeltaSessionConfig, DeltaTableProvider, }; -use crate::kernel::Action; +use crate::kernel::{Action, DataCheck, StructTypeExt}; use crate::logstore::LogStoreRef; use crate::operations::cdc::*; use crate::operations::merge::barrier::find_node; @@ -82,6 +82,7 @@ use crate::operations::transaction::CommitBuilder; use crate::operations::write::{write_execution_plan, write_execution_plan_cdc, WriterStatsConfig}; use crate::protocol::{DeltaOperation, MergePredicate}; use crate::table::state::DeltaTableState; +use crate::table::GeneratedColumn; use crate::{DeltaResult, DeltaTable, DeltaTableError}; mod barrier; @@ -750,10 +751,73 @@ async fn execute( None => TableReference::bare(UNNAMED_TABLE), }; + /// Add generated column expressions to a dataframe + fn add_missing_generated_columns( + mut df: DataFrame, + generated_cols: &Vec, + ) -> DeltaResult<(DataFrame, Vec)> { + let mut missing_cols = vec![]; + for generated_col in generated_cols { + let col_name = generated_col.get_name(); + + if !df + .clone() + .schema() + .field_names() + .contains(&col_name.to_string()) + { + debug!("Adding missing generated column {} in source as placeholder", col_name); + // If column doesn't exist, we add a null column, later we will generate the values after + // all the merge is projected. + // Other generated columns that were provided upon the start we only validate during write + missing_cols.push(col_name.to_string()); + df = df.clone().with_column(col_name, Expr::Literal(ScalarValue::Null))?; + } + } + Ok((df, missing_cols)) + } + + /// Add generated column expressions to a dataframe + fn add_generated_columns( + mut df: DataFrame, + generated_cols: &Vec, + generated_cols_missing_in_source: &Vec, + state: &SessionState, + ) -> DeltaResult { + debug!("Generating columns in dataframe"); + for generated_col in generated_cols { + // We only validate columns that were missing from the start. We don't update + // update generated columns that were provided during runtime + if !generated_cols_missing_in_source.contains(&generated_col.name) { + continue; + } + + let generation_expr = state.create_logical_expr( + generated_col.get_generation_expression(), + df.clone().schema(), + )?; + let col_name = generated_col.get_name(); + + df = df.clone().with_column( + generated_col.get_name(), + when(col(col_name).is_null(), generation_expr).otherwise(col(col_name))?, + )? + } + Ok(df) + } + + let generated_col_expressions = snapshot + .schema() + .get_generated_columns() + .unwrap_or_default(); + + let (source, missing_generated_columns) = add_missing_generated_columns(source, &generated_col_expressions)?; // This is only done to provide the source columns with a correct table reference. Just renaming the columns does not work let source = LogicalPlanBuilder::scan( source_name.clone(), - provider_as_source(source.into_view()), + provider_as_source( + source.into_view(), + ), None, )? .build()?; @@ -1160,26 +1224,38 @@ async fn execute( lit(5), ))?; - change_data.push( - cdc_projection - .clone() - .filter( - col(SOURCE_COLUMN) - .is_true() - .and(col(TARGET_COLUMN).is_null()), - )? - .select(write_projection.clone())? - .with_column(CDC_COLUMN_NAME, lit("insert"))?, - ); + let mut cdc_insert_df = cdc_projection + .clone() + .filter( + col(SOURCE_COLUMN) + .is_true() + .and(col(TARGET_COLUMN).is_null()), + )? + .select(write_projection.clone())? + .with_column(CDC_COLUMN_NAME, lit("insert"))?; + + cdc_insert_df = add_generated_columns( + cdc_insert_df, + &generated_col_expressions, + &missing_generated_columns, + &state)?; - let after = cdc_projection + change_data.push(cdc_insert_df); + + let mut after = cdc_projection .clone() .filter(col(TARGET_COLUMN).is_true())? .select(write_projection.clone())?; + after = add_generated_columns( + after, + &generated_col_expressions, + &missing_generated_columns, + &state)?; + // Extra select_columns is required so that before and after have same schema order // DataFusion doesn't have UnionByName yet, see https://github.com/apache/datafusion/issues/12650 - let before = cdc_projection + let mut before = cdc_projection .clone() .filter(col(crate::delta_datafusion::PATH_COLUMN).is_not_null())? .select( @@ -1199,11 +1275,22 @@ async fn execute( .collect::>(), )?; + before = add_generated_columns( + before, + &generated_col_expressions, + &missing_generated_columns, + &state)?; + let tracker = CDCTracker::new(before, after); change_data.push(tracker.collect()?); } - let project = filtered.clone().select(write_projection)?; + let mut project = filtered.clone().select(write_projection)?; + project = add_generated_columns( + project, + &generated_col_expressions, + &missing_generated_columns, + &state)?; let merge_final = &project.into_unoptimized_plan(); let write = state.create_physical_plan(merge_final).await?;