Skip to content

Commit

Permalink
Merge branch 'main' into main
Browse files Browse the repository at this point in the history
  • Loading branch information
rtyler authored Jan 12, 2025
2 parents 65c2951 + 1521a08 commit 0cd363d
Show file tree
Hide file tree
Showing 17 changed files with 210 additions and 269 deletions.
4 changes: 3 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ debug = true
debug = "line-tables-only"

[workspace.dependencies]
delta_kernel = { version = "0.6.0", features = ["default-engine"] }
delta_kernel = { version = "=0.6.0", features = ["default-engine"] }
#delta_kernel = { path = "../delta-kernel-rs/kernel", features = ["sync-engine"] }

# arrow
Expand Down Expand Up @@ -59,6 +59,8 @@ datafusion-sql = { version = "44" }
# serde
serde = { version = "1.0.194", features = ["derive"] }
serde_json = "1"
strum = { version = "*"}


# "stdlib"
bytes = { version = "1" }
Expand Down
24 changes: 16 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ of features outlined in the Delta [protocol][protocol] is also [tracked](#protoc
| Microsoft OneLake | ![done] | ![done] | |
| Google Cloud Storage | ![done] | ![done] | |
| HDFS | ![done] | ![done] | |
| LakeFS | ![done] | ![done] | Python: Rust engine writer only supported |

### Supported Operations

Expand All @@ -151,12 +152,19 @@ of features outlined in the Delta [protocol][protocol] is also [tracked](#protoc
| Create | ![done] | ![done] | Create a new table |
| Read | ![done] | ![done] | Read data from a table |
| Vacuum | ![done] | ![done] | Remove unused files and log entries |
| Delete - partitions | | ![done] | Delete a table partition |
| Delete - predicates | ![done] | ![done] | Delete data based on a predicate |
| Optimize - compaction | ![done] | ![done] | Harmonize the size of data file |
| Optimize - Z-order | ![done] | ![done] | Place similar data into the same file |
| Merge | ![done] | ![done] | Merge a target Delta table with source data |
| Update | ![done] | ![done] | Update values from a table |
| Add Column | ![done] | ![done] | Add new columns or nested fields |
| Add Feature | ![done] | ![done] | Enable delta table features |
| Add Constraints | ![done] | ![done] | Set delta constraints, to verify data on write |
| Drop Constraints | ![done] | ![done] | Removes delta constraints |
| Set Table Properties | ![done] | ![done] | Set delta table properties |
| Convert to Delta | ![done] | ![done] | Convert parquet table to delta table |
| FS check | ![done] | ![done] | Remove corrupted files from table |
| Restore | ![done] | ![done] | Restores table to previous version state |

### Protocol Support Level

Expand All @@ -166,17 +174,17 @@ of features outlined in the Delta [protocol][protocol] is also [tracked](#protoc
| Version 2 | Column Invariants | ![done] |
| Version 3 | Enforce `delta.checkpoint.writeStatsAsJson` | [![open]][writer-rs] |
| Version 3 | Enforce `delta.checkpoint.writeStatsAsStruct` | [![open]][writer-rs] |
| Version 3 | CHECK constraints | [![semi-done]][check-constraints] |
| Version 4 | Change Data Feed | |
| Version 3 | CHECK constraints | [![done]][check-constraints] |
| Version 4 | Change Data Feed | ![done] |
| Version 4 | Generated Columns | |
| Version 5 | Column Mapping | |
| Version 6 | Identity Columns | |
| Version 7 | Table Features | |
| Version 7 | Table Features | ![done] |

| Reader Version | Requirement | Status |
| -------------- | ----------------------------------- | ------ |
| Version 2 | Column Mapping | |
| Version 3 | Table Features (requires reader V7) | |
| Reader Version | Requirement | Status |
| -------------- | ----------------------------------- | ------ |
| Version 2 | Column Mapping | |
| Version 3 | Table Features (requires reader V7) | ![done] |

[datafusion]: https://github.com/apache/arrow-datafusion
[ballista]: https://github.com/apache/arrow-ballista
Expand Down
1 change: 1 addition & 0 deletions crates/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ datafusion-functions-aggregate = { workspace = true, optional = true }
# serde
serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true }
strum = { workspace = true}

# "stdlib"
bytes = { workspace = true }
Expand Down
259 changes: 42 additions & 217 deletions crates/core/src/kernel/models/actions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use url::Url;
use super::schema::StructType;
use crate::kernel::{error::Error, DeltaResult};
use crate::TableProperty;
use delta_kernel::table_features::{ReaderFeatures, WriterFeatures};

/// Defines a file format used in table
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
Expand Down Expand Up @@ -183,18 +184,37 @@ impl Protocol {
mut self,
configuration: &HashMap<String, Option<String>>,
) -> Protocol {
fn parse_bool(value: &Option<String>) -> bool {
value
.as_ref()
.is_some_and(|v| v.to_ascii_lowercase().parse::<bool>().is_ok_and(|v| v))
}

if self.min_writer_version >= 7 {
// TODO: move this is in future to use delta_kernel::table_properties
let mut converted_writer_features = configuration
.iter()
.filter(|(_, value)| {
value
.as_ref()
.is_some_and(|v| v.to_ascii_lowercase().parse::<bool>().is_ok_and(|v| v))
})
.collect::<HashMap<&String, &Option<String>>>()
.keys()
.map(|key| (*key).clone().into())
.filter(|v| !matches!(v, WriterFeatures::Other(_)))
.filter_map(|(key, value)| match key.as_str() {
"delta.enableChangeDataFeed" if parse_bool(value) => {
Some(WriterFeatures::ChangeDataFeed)
}
"delta.appendOnly" if parse_bool(value) => Some(WriterFeatures::AppendOnly),
"delta.enableDeletionVectors" if parse_bool(value) => {
Some(WriterFeatures::DeletionVectors)
}
"delta.enableRowTracking" if parse_bool(value) => {
Some(WriterFeatures::RowTracking)
}
"delta.checkpointPolicy" if value.clone().unwrap_or_default() == "v2" => {
Some(WriterFeatures::V2Checkpoint)
}
_ => None,
})
.collect::<HashSet<WriterFeatures>>();

if configuration
Expand All @@ -215,13 +235,15 @@ impl Protocol {
if self.min_reader_version >= 3 {
let converted_reader_features = configuration
.iter()
.filter(|(_, value)| {
value
.as_ref()
.is_some_and(|v| v.to_ascii_lowercase().parse::<bool>().is_ok_and(|v| v))
.filter_map(|(key, value)| match key.as_str() {
"delta.enableDeletionVectors" if parse_bool(value) => {
Some(ReaderFeatures::DeletionVectors)
}
"delta.checkpointPolicy" if value.clone().unwrap_or_default() == "v2" => {
Some(ReaderFeatures::V2Checkpoint)
}
_ => None,
})
.map(|(key, _)| (*key).clone().into())
.filter(|v| !matches!(v, ReaderFeatures::Other(_)))
.collect::<HashSet<ReaderFeatures>>();
match self.reader_features {
Some(mut features) => {
Expand Down Expand Up @@ -459,225 +481,28 @@ impl fmt::Display for TableFeatures {
}
}

impl TableFeatures {
/// Convert table feature to respective reader or/and write feature
pub fn to_reader_writer_features(&self) -> (Option<ReaderFeatures>, Option<WriterFeatures>) {
let reader_feature = ReaderFeatures::try_from(self).ok();
let writer_feature = WriterFeatures::try_from(self).ok();
(reader_feature, writer_feature)
}
}

/// Features table readers can support as well as let users know
/// what is supported
#[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq, Hash)]
#[serde(rename_all = "camelCase")]
pub enum ReaderFeatures {
/// Mapping of one column to another
ColumnMapping,
/// Deletion vectors for merge, update, delete
DeletionVectors,
/// timestamps without timezone support
#[serde(rename = "timestampNtz")]
TimestampWithoutTimezone,
/// version 2 of checkpointing
V2Checkpoint,
/// If we do not match any other reader features
#[serde(untagged)]
Other(String),
}

impl From<&parquet::record::Field> for ReaderFeatures {
fn from(value: &parquet::record::Field) -> Self {
match value {
parquet::record::Field::Str(feature) => match feature.as_str() {
"columnMapping" => ReaderFeatures::ColumnMapping,
"deletionVectors" | "delta.enableDeletionVectors" => {
ReaderFeatures::DeletionVectors
}
"timestampNtz" => ReaderFeatures::TimestampWithoutTimezone,
"v2Checkpoint" => ReaderFeatures::V2Checkpoint,
f => ReaderFeatures::Other(f.to_string()),
},
f => ReaderFeatures::Other(f.to_string()),
}
}
}

impl From<String> for ReaderFeatures {
fn from(value: String) -> Self {
value.as_str().into()
}
}

impl From<&str> for ReaderFeatures {
fn from(value: &str) -> Self {
match value {
"columnMapping" => ReaderFeatures::ColumnMapping,
"deletionVectors" => ReaderFeatures::DeletionVectors,
"timestampNtz" => ReaderFeatures::TimestampWithoutTimezone,
"v2Checkpoint" => ReaderFeatures::V2Checkpoint,
f => ReaderFeatures::Other(f.to_string()),
}
}
}

impl AsRef<str> for ReaderFeatures {
fn as_ref(&self) -> &str {
match self {
ReaderFeatures::ColumnMapping => "columnMapping",
ReaderFeatures::DeletionVectors => "deletionVectors",
ReaderFeatures::TimestampWithoutTimezone => "timestampNtz",
ReaderFeatures::V2Checkpoint => "v2Checkpoint",
ReaderFeatures::Other(f) => f,
}
}
}

impl fmt::Display for ReaderFeatures {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.as_ref())
}
}

impl TryFrom<&TableFeatures> for ReaderFeatures {
type Error = String;
type Error = strum::ParseError;

fn try_from(value: &TableFeatures) -> Result<Self, Self::Error> {
match ReaderFeatures::from(value.as_ref()) {
ReaderFeatures::Other(_) => {
Err(format!("Table feature {} is not a reader feature", value))
}
value => Ok(value),
}
}
}

/// Features table writers can support as well as let users know
/// what is supported
#[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq, Hash)]
#[serde(rename_all = "camelCase")]
pub enum WriterFeatures {
/// Append Only Tables
AppendOnly,
/// Table invariants
Invariants,
/// Check constraints on columns
CheckConstraints,
/// CDF on a table
ChangeDataFeed,
/// Columns with generated values
GeneratedColumns,
/// Mapping of one column to another
ColumnMapping,
/// ID Columns
IdentityColumns,
/// Deletion vectors for merge, update, delete
DeletionVectors,
/// Row tracking on tables
RowTracking,
/// timestamps without timezone support
#[serde(rename = "timestampNtz")]
TimestampWithoutTimezone,
/// domain specific metadata
DomainMetadata,
/// version 2 of checkpointing
V2Checkpoint,
/// Iceberg compatibility support
IcebergCompatV1,
/// If we do not match any other reader features
#[serde(untagged)]
Other(String),
}

impl From<String> for WriterFeatures {
fn from(value: String) -> Self {
value.as_str().into()
}
}

impl From<&str> for WriterFeatures {
fn from(value: &str) -> Self {
match value {
"appendOnly" | "delta.appendOnly" => WriterFeatures::AppendOnly,
"invariants" => WriterFeatures::Invariants,
"checkConstraints" => WriterFeatures::CheckConstraints,
"changeDataFeed" | "delta.enableChangeDataFeed" => WriterFeatures::ChangeDataFeed,
"generatedColumns" => WriterFeatures::GeneratedColumns,
"columnMapping" => WriterFeatures::ColumnMapping,
"identityColumns" => WriterFeatures::IdentityColumns,
"deletionVectors" | "delta.enableDeletionVectors" => WriterFeatures::DeletionVectors,
"rowTracking" | "delta.enableRowTracking" => WriterFeatures::RowTracking,
"timestampNtz" => WriterFeatures::TimestampWithoutTimezone,
"domainMetadata" => WriterFeatures::DomainMetadata,
"v2Checkpoint" => WriterFeatures::V2Checkpoint,
"icebergCompatV1" => WriterFeatures::IcebergCompatV1,
f => WriterFeatures::Other(f.to_string()),
}
}
}

impl AsRef<str> for WriterFeatures {
fn as_ref(&self) -> &str {
match self {
WriterFeatures::AppendOnly => "appendOnly",
WriterFeatures::Invariants => "invariants",
WriterFeatures::CheckConstraints => "checkConstraints",
WriterFeatures::ChangeDataFeed => "changeDataFeed",
WriterFeatures::GeneratedColumns => "generatedColumns",
WriterFeatures::ColumnMapping => "columnMapping",
WriterFeatures::IdentityColumns => "identityColumns",
WriterFeatures::DeletionVectors => "deletionVectors",
WriterFeatures::RowTracking => "rowTracking",
WriterFeatures::TimestampWithoutTimezone => "timestampNtz",
WriterFeatures::DomainMetadata => "domainMetadata",
WriterFeatures::V2Checkpoint => "v2Checkpoint",
WriterFeatures::IcebergCompatV1 => "icebergCompatV1",
WriterFeatures::Other(f) => f,
}
}
}

impl fmt::Display for WriterFeatures {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.as_ref())
ReaderFeatures::try_from(value.as_ref())
}
}

impl TryFrom<&TableFeatures> for WriterFeatures {
type Error = String;
type Error = strum::ParseError;

fn try_from(value: &TableFeatures) -> Result<Self, Self::Error> {
match WriterFeatures::from(value.as_ref()) {
WriterFeatures::Other(_) => {
Err(format!("Table feature {} is not a writer feature", value))
}
value => Ok(value),
}
WriterFeatures::try_from(value.as_ref())
}
}

impl From<&parquet::record::Field> for WriterFeatures {
fn from(value: &parquet::record::Field) -> Self {
match value {
parquet::record::Field::Str(feature) => match feature.as_str() {
"appendOnly" => WriterFeatures::AppendOnly,
"invariants" => WriterFeatures::Invariants,
"checkConstraints" => WriterFeatures::CheckConstraints,
"changeDataFeed" => WriterFeatures::ChangeDataFeed,
"generatedColumns" => WriterFeatures::GeneratedColumns,
"columnMapping" => WriterFeatures::ColumnMapping,
"identityColumns" => WriterFeatures::IdentityColumns,
"deletionVectors" => WriterFeatures::DeletionVectors,
"rowTracking" => WriterFeatures::RowTracking,
"timestampNtz" => WriterFeatures::TimestampWithoutTimezone,
"domainMetadata" => WriterFeatures::DomainMetadata,
"v2Checkpoint" => WriterFeatures::V2Checkpoint,
"icebergCompatV1" => WriterFeatures::IcebergCompatV1,
f => WriterFeatures::Other(f.to_string()),
},
f => WriterFeatures::Other(f.to_string()),
}
impl TableFeatures {
/// Convert table feature to respective reader or/and write feature
pub fn to_reader_writer_features(&self) -> (Option<ReaderFeatures>, Option<WriterFeatures>) {
let reader_feature = ReaderFeatures::try_from(self).ok();
let writer_feature = WriterFeatures::try_from(self).ok();
(reader_feature, writer_feature)
}
}

Expand Down
Loading

0 comments on commit 0cd363d

Please sign in to comment.