diff --git a/crates/core/src/kernel/snapshot/log_segment.rs b/crates/core/src/kernel/snapshot/log_segment.rs index 11bab3e771..7906dd6bef 100644 --- a/crates/core/src/kernel/snapshot/log_segment.rs +++ b/crates/core/src/kernel/snapshot/log_segment.rs @@ -28,6 +28,7 @@ lazy_static! { static ref CHECKPOINT_FILE_PATTERN: Regex = Regex::new(r"\d+\.checkpoint(\.\d+\.\d+)?\.parquet").unwrap(); static ref DELTA_FILE_PATTERN: Regex = Regex::new(r"^\d+\.json$").unwrap(); + static ref CRC_FILE_PATTERN: Regex = Regex::new(r"^(\.\d+(\.crc|\.json)|\d+)\.crc$").unwrap(); pub(super) static ref TOMBSTONE_SCHEMA: StructType = StructType::new(vec![ActionType::Remove.schema_field().clone(),]); } @@ -61,6 +62,12 @@ pub(crate) trait PathExt { .map(|name| DELTA_FILE_PATTERN.captures(name).is_some()) .unwrap_or(false) } + + fn is_crc_file(&self) -> bool { + self.filename() + .map(|name| CRC_FILE_PATTERN.captures(name).is_some()) + .unwrap() + } } impl PathExt for Path { diff --git a/crates/core/src/logstore/mod.rs b/crates/core/src/logstore/mod.rs index 74b9c56bc4..d4914158d9 100644 --- a/crates/core/src/logstore/mod.rs +++ b/crates/core/src/logstore/mod.rs @@ -265,20 +265,23 @@ pub trait LogStore: Send + Sync + AsAny { /// Check if the location is a delta table location async fn is_delta_table_location(&self) -> DeltaResult { - // TODO We should really be using HEAD here, but this fails in windows tests let object_store = self.object_store(None); let mut stream = object_store.list(Some(self.log_path())); - if let Some(res) = stream.next().await { + while let Some(res) = stream.next().await { match res { Ok(meta) => { - Ok(meta.location.is_commit_file() || meta.location.is_checkpoint_file()) + // crc files are valid files according to the protocol + if meta.location.is_crc_file() { + continue; + } + return Ok(meta.location.is_commit_file() || meta.location.is_checkpoint_file()); } - Err(ObjectStoreError::NotFound { .. }) => Ok(false), - Err(err) => Err(err)?, + Err(ObjectStoreError::NotFound { .. }) => return Ok(false), + Err(err) => return Err(err.into()), } - } else { - Ok(false) } + + Ok(false) } #[cfg(feature = "datafusion")] @@ -679,6 +682,69 @@ mod tests { .await .expect("Failed to identify table")); } + + #[tokio::test] + async fn test_is_location_a_table_crc() { + use object_store::path::Path; + use object_store::{PutOptions, PutPayload}; + let location = Url::parse("memory://table").unwrap(); + let store = + logstore_for(location, HashMap::default(), None).expect("Failed to get logstore"); + assert!(!store + .is_delta_table_location() + .await + .expect("Failed to identify table")); + + // Save .crc files to the transaction log directory (all 3 formats) + let payload = PutPayload::from_static(b"test"); + + let _put = store + .object_store(None) + .put_opts( + &Path::from("_delta_log/.0.crc.crc"), + payload.clone(), + PutOptions::default(), + ) + .await + .expect("Failed to put"); + + let _put = store + .object_store(None) + .put_opts( + &Path::from("_delta_log/.0.json.crc"), + payload.clone(), + PutOptions::default(), + ) + .await + .expect("Failed to put"); + + let _put = store + .object_store(None) + .put_opts( + &Path::from("_delta_log/0.crc"), + payload.clone(), + PutOptions::default(), + ) + .await + .expect("Failed to put"); + + // Now add a commit + let _put = store + .object_store(None) + .put_opts( + &Path::from("_delta_log/0.json"), + payload.clone(), + PutOptions::default(), + ) + .await + .expect("Failed to put"); + + // The table should be considered a delta table + assert!(store + .is_delta_table_location() + .await + .expect("Failed to identify table")); + } } #[cfg(feature = "datafusion")]