Skip to content

Commit

Permalink
feat: enable generatedcolumns merge
Browse files Browse the repository at this point in the history
  • Loading branch information
ion-elgreco committed Jan 12, 2025
1 parent e3f55b7 commit 6efe64b
Showing 1 changed file with 103 additions and 16 deletions.
119 changes: 103 additions & 16 deletions crates/core/src/operations/merge/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,14 +74,15 @@ use crate::delta_datafusion::{
register_store, DataFusionMixins, DeltaColumn, DeltaScan, DeltaScanConfigBuilder,
DeltaSessionConfig, DeltaTableProvider,
};
use crate::kernel::Action;
use crate::kernel::{Action, DataCheck, StructTypeExt};
use crate::logstore::LogStoreRef;
use crate::operations::cdc::*;
use crate::operations::merge::barrier::find_node;
use crate::operations::transaction::CommitBuilder;
use crate::operations::write::{write_execution_plan, write_execution_plan_cdc, WriterStatsConfig};
use crate::protocol::{DeltaOperation, MergePredicate};
use crate::table::state::DeltaTableState;
use crate::table::GeneratedColumn;
use crate::{DeltaResult, DeltaTable, DeltaTableError};

mod barrier;
Expand Down Expand Up @@ -750,10 +751,73 @@ async fn execute(
None => TableReference::bare(UNNAMED_TABLE),
};

/// Add generated column expressions to a dataframe
fn add_missing_generated_columns(
mut df: DataFrame,
generated_cols: &Vec<GeneratedColumn>,
) -> DeltaResult<(DataFrame, Vec<String>)> {
let mut missing_cols = vec![];
for generated_col in generated_cols {
let col_name = generated_col.get_name();

if !df
.clone()
.schema()
.field_names()
.contains(&col_name.to_string())
{
debug!("Adding missing generated column {} in source as placeholder", col_name);
// If column doesn't exist, we add a null column, later we will generate the values after
// all the merge is projected.
// Other generated columns that were provided upon the start we only validate during write
missing_cols.push(col_name.to_string());
df = df.clone().with_column(col_name, Expr::Literal(ScalarValue::Null))?;
}
}
Ok((df, missing_cols))
}

/// Add generated column expressions to a dataframe
fn add_generated_columns(
mut df: DataFrame,
generated_cols: &Vec<GeneratedColumn>,
generated_cols_missing_in_source: &Vec<String>,
state: &SessionState,
) -> DeltaResult<DataFrame> {
debug!("Generating columns in dataframe");
for generated_col in generated_cols {
// We only validate columns that were missing from the start. We don't update
// update generated columns that were provided during runtime
if !generated_cols_missing_in_source.contains(&generated_col.name) {
continue;
}

let generation_expr = state.create_logical_expr(
generated_col.get_generation_expression(),
df.clone().schema(),
)?;
let col_name = generated_col.get_name();

df = df.clone().with_column(
generated_col.get_name(),
when(col(col_name).is_null(), generation_expr).otherwise(col(col_name))?,
)?
}
Ok(df)
}

let generated_col_expressions = snapshot
.schema()
.get_generated_columns()
.unwrap_or_default();

let (source, missing_generated_columns) = add_missing_generated_columns(source, &generated_col_expressions)?;
// This is only done to provide the source columns with a correct table reference. Just renaming the columns does not work
let source = LogicalPlanBuilder::scan(
source_name.clone(),
provider_as_source(source.into_view()),
provider_as_source(
source.into_view(),
),
None,
)?
.build()?;
Expand Down Expand Up @@ -1160,26 +1224,38 @@ async fn execute(
lit(5),
))?;

change_data.push(
cdc_projection
.clone()
.filter(
col(SOURCE_COLUMN)
.is_true()
.and(col(TARGET_COLUMN).is_null()),
)?
.select(write_projection.clone())?
.with_column(CDC_COLUMN_NAME, lit("insert"))?,
);
let mut cdc_insert_df = cdc_projection
.clone()
.filter(
col(SOURCE_COLUMN)
.is_true()
.and(col(TARGET_COLUMN).is_null()),
)?
.select(write_projection.clone())?
.with_column(CDC_COLUMN_NAME, lit("insert"))?;

cdc_insert_df = add_generated_columns(
cdc_insert_df,
&generated_col_expressions,
&missing_generated_columns,
&state)?;

let after = cdc_projection
change_data.push(cdc_insert_df);

let mut after = cdc_projection
.clone()
.filter(col(TARGET_COLUMN).is_true())?
.select(write_projection.clone())?;

after = add_generated_columns(
after,
&generated_col_expressions,
&missing_generated_columns,
&state)?;

// Extra select_columns is required so that before and after have same schema order
// DataFusion doesn't have UnionByName yet, see https://github.com/apache/datafusion/issues/12650
let before = cdc_projection
let mut before = cdc_projection
.clone()
.filter(col(crate::delta_datafusion::PATH_COLUMN).is_not_null())?
.select(
Expand All @@ -1199,11 +1275,22 @@ async fn execute(
.collect::<Vec<_>>(),
)?;

before = add_generated_columns(
before,
&generated_col_expressions,
&missing_generated_columns,
&state)?;

let tracker = CDCTracker::new(before, after);
change_data.push(tracker.collect()?);
}

let project = filtered.clone().select(write_projection)?;
let mut project = filtered.clone().select(write_projection)?;
project = add_generated_columns(
project,
&generated_col_expressions,
&missing_generated_columns,
&state)?;

let merge_final = &project.into_unoptimized_plan();
let write = state.create_physical_plan(merge_final).await?;
Expand Down

0 comments on commit 6efe64b

Please sign in to comment.