Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: generated columns #3123

Merged
merged 7 commits into from
Jan 15, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 24 additions & 2 deletions crates/core/src/delta_datafusion/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ use crate::kernel::{Add, DataCheck, EagerSnapshot, Invariant, Snapshot, StructTy
use crate::logstore::LogStoreRef;
use crate::table::builder::ensure_table_uri;
use crate::table::state::DeltaTableState;
use crate::table::Constraint;
use crate::table::{Constraint, GeneratedColumn};
use crate::{open_table, open_table_with_storage_options, DeltaTable};

pub(crate) const PATH_COLUMN: &str = "__delta_rs_path";
Expand Down Expand Up @@ -1159,6 +1159,7 @@ pub(crate) async fn execute_plan_to_batch(
pub struct DeltaDataChecker {
constraints: Vec<Constraint>,
invariants: Vec<Invariant>,
generated_columns: Vec<GeneratedColumn>,
non_nullable_columns: Vec<String>,
ctx: SessionContext,
}
Expand All @@ -1169,6 +1170,7 @@ impl DeltaDataChecker {
Self {
invariants: vec![],
constraints: vec![],
generated_columns: vec![],
non_nullable_columns: vec![],
ctx: DeltaSessionContext::default().into(),
}
Expand All @@ -1179,6 +1181,7 @@ impl DeltaDataChecker {
Self {
invariants,
constraints: vec![],
generated_columns: vec![],
non_nullable_columns: vec![],
ctx: DeltaSessionContext::default().into(),
}
Expand All @@ -1189,6 +1192,18 @@ impl DeltaDataChecker {
Self {
constraints,
invariants: vec![],
generated_columns: vec![],
non_nullable_columns: vec![],
ctx: DeltaSessionContext::default().into(),
}
}

/// Create a new DeltaDataChecker with a specified set of generated columns
pub fn new_with_generated_columns(generated_columns: Vec<GeneratedColumn>) -> Self {
Self {
constraints: vec![],
invariants: vec![],
generated_columns,
non_nullable_columns: vec![],
ctx: DeltaSessionContext::default().into(),
}
Expand All @@ -1209,6 +1224,10 @@ impl DeltaDataChecker {
/// Create a new DeltaDataChecker
pub fn new(snapshot: &DeltaTableState) -> Self {
let invariants = snapshot.schema().get_invariants().unwrap_or_default();
let generated_columns = snapshot
.schema()
.get_generated_columns()
.unwrap_or_default();
let constraints = snapshot.table_config().get_constraints();
let non_nullable_columns = snapshot
.schema()
Expand All @@ -1224,6 +1243,7 @@ impl DeltaDataChecker {
Self {
invariants,
constraints,
generated_columns,
non_nullable_columns,
ctx: DeltaSessionContext::default().into(),
}
Expand All @@ -1236,7 +1256,9 @@ impl DeltaDataChecker {
pub async fn check_batch(&self, record_batch: &RecordBatch) -> Result<(), DeltaTableError> {
self.check_nullability(record_batch)?;
self.enforce_checks(record_batch, &self.invariants).await?;
self.enforce_checks(record_batch, &self.constraints).await
self.enforce_checks(record_batch, &self.constraints).await?;
self.enforce_checks(record_batch, &self.generated_columns)
.await
}

/// Return true if all the nullability checks are valid
Expand Down
9 changes: 9 additions & 0 deletions crates/core/src/kernel/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,15 @@ pub enum Error {
line: String,
},

/// Error returned when the log contains invalid stats JSON.
#[error("Invalid JSON in generation expression, line=`{line}`, err=`{json_err}`")]
InvalidGenerationExpressionJson {
/// JSON error details returned when parsing the generation expression JSON.
json_err: serde_json::error::Error,
/// Generation expression.
line: String,
},

#[error("Table metadata is invalid: {0}")]
MetadataError(String),

Expand Down
101 changes: 90 additions & 11 deletions crates/core/src/kernel/models/actions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@ use std::collections::{HashMap, HashSet};
use std::fmt::{self, Display};
use std::str::FromStr;

use delta_kernel::schema::{DataType, StructField};
use maplit::hashset;
use serde::{Deserialize, Serialize};
use tracing::warn;
use url::Url;

use super::schema::StructType;
use super::StructTypeExt;
use crate::kernel::{error::Error, DeltaResult};
use crate::TableProperty;
use delta_kernel::table_features::{ReaderFeatures, WriterFeatures};
Expand Down Expand Up @@ -115,6 +117,19 @@ impl Metadata {
}
}

/// checks if table contains timestamp_ntz in any field including nested fields.
pub fn contains_timestampntz<'a>(mut fields: impl Iterator<Item = &'a StructField>) -> bool {
fn _check_type(dtype: &DataType) -> bool {
match dtype {
&DataType::TIMESTAMP_NTZ => true,
DataType::Array(inner) => _check_type(inner.element_type()),
DataType::Struct(inner) => inner.fields().any(|f| _check_type(f.data_type())),
_ => false,
}
}
fields.any(|f| _check_type(f.data_type()))
}

#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, Default)]
#[serde(rename_all = "camelCase")]
/// Defines a protocol action
Expand Down Expand Up @@ -146,8 +161,8 @@ impl Protocol {
}
}

/// set the reader features in the protocol action, automatically bumps min_reader_version
pub fn with_reader_features(
/// Append the reader features in the protocol action, automatically bumps min_reader_version
pub fn append_reader_features(
mut self,
reader_features: impl IntoIterator<Item = impl Into<ReaderFeatures>>,
) -> Self {
Expand All @@ -156,14 +171,20 @@ impl Protocol {
.map(Into::into)
.collect::<HashSet<_>>();
if !all_reader_features.is_empty() {
self.min_reader_version = 3
self.min_reader_version = 3;
match self.reader_features {
Some(mut features) => {
features.extend(all_reader_features);
self.reader_features = Some(features);
}
None => self.reader_features = Some(all_reader_features),
};
}
self.reader_features = Some(all_reader_features);
self
}

/// set the writer features in the protocol action, automatically bumps min_writer_version
pub fn with_writer_features(
/// Append the writer features in the protocol action, automatically bumps min_writer_version
pub fn append_writer_features(
mut self,
writer_features: impl IntoIterator<Item = impl Into<WriterFeatures>>,
) -> Self {
Expand All @@ -172,9 +193,16 @@ impl Protocol {
.map(|c| c.into())
.collect::<HashSet<_>>();
if !all_writer_feautures.is_empty() {
self.min_writer_version = 7
self.min_writer_version = 7;

match self.writer_features {
Some(mut features) => {
features.extend(all_writer_feautures);
self.writer_features = Some(features);
}
None => self.writer_features = Some(all_writer_feautures),
};
}
self.writer_features = Some(all_writer_feautures);
self
}

Expand Down Expand Up @@ -255,6 +283,32 @@ impl Protocol {
}
self
}

/// Will apply the column metadata to the protocol by either bumping the version or setting
/// features
pub fn apply_column_metadata_to_protocol(
mut self,
schema: &StructType,
) -> DeltaResult<Protocol> {
let generated_cols = schema.get_generated_columns()?;
let invariants = schema.get_invariants()?;
let contains_timestamp_ntz = self.contains_timestampntz(schema.fields());

if contains_timestamp_ntz {
self = self.enable_timestamp_ntz()
}

if !generated_cols.is_empty() {
self = self.enable_generated_columns()
}

if !invariants.is_empty() {
self = self.enable_invariants()
}

Ok(self)
}

/// Will apply the properties to the protocol by either bumping the version or setting
/// features
pub fn apply_properties_to_protocol(
Expand Down Expand Up @@ -391,10 +445,35 @@ impl Protocol {
}
Ok(self)
}

/// checks if table contains timestamp_ntz in any field including nested fields.
fn contains_timestampntz<'a>(&self, fields: impl Iterator<Item = &'a StructField>) -> bool {
contains_timestampntz(fields)
}

/// Enable timestamp_ntz in the protocol
pub fn enable_timestamp_ntz(mut self) -> Protocol {
self = self.with_reader_features(vec![ReaderFeatures::TimestampWithoutTimezone]);
self = self.with_writer_features(vec![WriterFeatures::TimestampWithoutTimezone]);
fn enable_timestamp_ntz(mut self) -> Self {
self = self.append_reader_features([ReaderFeatures::TimestampWithoutTimezone]);
self = self.append_writer_features([WriterFeatures::TimestampWithoutTimezone]);
self
}

/// Enabled generated columns
fn enable_generated_columns(mut self) -> Self {
if self.min_writer_version < 4 {
self.min_writer_version = 4;
}
if self.min_writer_version >= 7 {
self = self.append_writer_features([WriterFeatures::GeneratedColumns]);
}
self
}

/// Enabled generated columns
fn enable_invariants(mut self) -> Self {
if self.min_writer_version >= 7 {
self = self.append_writer_features([WriterFeatures::Invariants]);
}
self
}
}
Expand Down
94 changes: 94 additions & 0 deletions crates/core/src/kernel/models/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use serde_json::Value;

use crate::kernel::error::Error;
use crate::kernel::DataCheck;
use crate::table::GeneratedColumn;

/// Type alias for a top level schema
pub type Schema = StructType;
Expand Down Expand Up @@ -49,9 +50,59 @@ impl DataCheck for Invariant {
pub trait StructTypeExt {
/// Get all invariants in the schemas
fn get_invariants(&self) -> Result<Vec<Invariant>, Error>;

/// Get all generated column expressions
fn get_generated_columns(&self) -> Result<Vec<GeneratedColumn>, Error>;
}

impl StructTypeExt for StructType {
/// Get all get_generated_columns in the schemas
fn get_generated_columns(&self) -> Result<Vec<GeneratedColumn>, Error> {
let mut remaining_fields: Vec<(String, StructField)> = self
.fields()
.map(|field| (field.name.clone(), field.clone()))
.collect();
let mut generated_cols: Vec<GeneratedColumn> = Vec::new();

while let Some((field_path, field)) = remaining_fields.pop() {
if let Some(MetadataValue::String(generated_col_string)) = field
.metadata
.get(ColumnMetadataKey::GenerationExpression.as_ref())
{
let json: Value = serde_json::from_str(generated_col_string).map_err(|e| {
Error::InvalidGenerationExpressionJson {
json_err: e,
line: generated_col_string.to_string(),
}
})?;
match json {
Value::String(sql) => generated_cols.push(GeneratedColumn::new(
&field_path,
&sql,
field.data_type(),
)),
Value::Number(sql) => generated_cols.push(GeneratedColumn::new(
&field_path,
&format!("{}", sql),
field.data_type(),
)),
Value::Bool(sql) => generated_cols.push(GeneratedColumn::new(
&field_path,
&format!("{}", sql),
field.data_type(),
)),
Value::Array(sql) => generated_cols.push(GeneratedColumn::new(
&field_path,
&format!("{:?}", sql),
field.data_type(),
)),
_ => (), // Other types not sure what to do then
};
}
}
Ok(generated_cols)
}

/// Get all invariants in the schemas
fn get_invariants(&self) -> Result<Vec<Invariant>, Error> {
let mut remaining_fields: Vec<(String, StructField)> = self
Expand Down Expand Up @@ -131,6 +182,49 @@ mod tests {
use serde_json;
use serde_json::json;

#[test]
fn test_get_generated_columns() {
let schema: StructType = serde_json::from_value(json!(
{
"type":"struct",
"fields":[
{"name":"id","type":"integer","nullable":true,"metadata":{}},
{"name":"gc","type":"integer","nullable":true,"metadata":{}}]
}
))
.unwrap();
let cols = schema.get_generated_columns().unwrap();
assert_eq!(cols.len(), 0);

let schema: StructType = serde_json::from_value(json!(
{
"type":"struct",
"fields":[
{"name":"id","type":"integer","nullable":true,"metadata":{}},
{"name":"gc","type":"integer","nullable":true,"metadata":{"delta.generationExpression":"\"5\""}}]
}
)).unwrap();
let cols = schema.get_generated_columns().unwrap();
assert_eq!(cols.len(), 1);
assert_eq!(cols[0].data_type, DataType::INTEGER);
assert_eq!(
cols[0].validation_expr,
"gc = 5 OR (gc IS NULL AND 5 IS NULL)"
);

let schema: StructType = serde_json::from_value(json!(
{
"type":"struct",
"fields":[
{"name":"id","type":"integer","nullable":true,"metadata":{}},
{"name":"gc","type":"integer","nullable":true,"metadata":{"delta.generationExpression":"\"5\""}},
{"name":"id2","type":"integer","nullable":true,"metadata":{"delta.generationExpression":"\"id * 10\""}},]
}
)).unwrap();
let cols = schema.get_generated_columns().unwrap();
assert_eq!(cols.len(), 2);
}

#[test]
fn test_get_invariants() {
let schema: StructType = serde_json::from_value(json!({
Expand Down
Loading
Loading