Skip to content

Commit

Permalink
fix: reading single latest version in cdf
Browse files Browse the repository at this point in the history
Signed-off-by: Ion Koutsouris <[email protected]>
  • Loading branch information
ion-elgreco committed Feb 14, 2025
1 parent 6630851 commit d160dcc
Show file tree
Hide file tree
Showing 8 changed files with 107 additions and 45 deletions.
2 changes: 1 addition & 1 deletion crates/core/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ pub enum DeltaTableError {
#[error("Reading a table version: {version} that does not have change data enabled")]
ChangeDataNotEnabled { version: i64 },

#[error("Invalid version start version {start} is greater than version {end}")]
#[error("Invalid version. Start version {start} is greater than end version {end}")]
ChangeDataInvalidVersionRange { start: i64, end: i64 },

#[error("End timestamp {ending_timestamp} is greater than latest commit timestamp")]
Expand Down
5 changes: 3 additions & 2 deletions crates/core/src/logstore/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -467,7 +467,9 @@ pub async fn get_latest_version(
// This implies no files were fetched during list_offset so either the starting_version is the latest
// or starting_version is invalid, so we use current_version -1, and do one more try.
if empty_stream {
let obj_meta = object_store.head(&commit_uri_from_version(max_version)).await;
let obj_meta = object_store
.head(&commit_uri_from_version(max_version))
.await;
if obj_meta.is_err() {
return Box::pin(get_latest_version(log_store, -1)).await;
}
Expand All @@ -480,7 +482,6 @@ pub async fn get_latest_version(
Ok(version)
}


/// Default implementation for retrieving the earliest version
pub async fn get_earliest_version(
log_store: &dyn LogStore,
Expand Down
10 changes: 5 additions & 5 deletions crates/core/src/operations/load_cdf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ impl CdfLoadBuilder {
Err(DeltaTableError::ChangeDataInvalidVersionRange { start, end })
};
}
if start >= latest_version {
if start > latest_version {
return if self.allow_out_of_range {
Ok((change_files, add_files, remove_files))
} else {
Expand Down Expand Up @@ -671,10 +671,10 @@ pub(crate) mod tests {
.await;

assert!(table.is_err());
assert!(matches!(
table.unwrap_err(),
DeltaTableError::InvalidVersion { .. }
));
assert!(table
.unwrap_err()
.to_string()
.contains("Invalid version. Start version 5 is greater than end version 4"));

Ok(())
}
Expand Down
37 changes: 29 additions & 8 deletions crates/core/src/writer/record_batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -503,9 +503,12 @@ mod tests {

#[tokio::test]
async fn test_buffer_len_includes_unflushed_row_group() {
let table_dir = tempfile::tempdir().unwrap();
let table_path = table_dir.path().to_str().unwrap();

let batch = get_record_batch(None, false);
let partition_cols = vec![];
let table = create_initialized_table(&partition_cols).await;
let table = create_initialized_table(table_path, &partition_cols).await;
let mut writer = RecordBatchWriter::for_table(&table).unwrap();

writer.write(batch).await.unwrap();
Expand All @@ -515,9 +518,12 @@ mod tests {

#[tokio::test]
async fn test_divide_record_batch_no_partition() {
let table_dir = tempfile::tempdir().unwrap();
let table_path = table_dir.path().to_str().unwrap();

let batch = get_record_batch(None, false);
let partition_cols = vec![];
let table = create_initialized_table(&partition_cols).await;
let table = create_initialized_table(table_path, &partition_cols).await;
let mut writer = RecordBatchWriter::for_table(&table).unwrap();

let partitions = writer.divide_by_partition_values(&batch).unwrap();
Expand All @@ -528,9 +534,12 @@ mod tests {

#[tokio::test]
async fn test_divide_record_batch_single_partition() {
let table_dir = tempfile::tempdir().unwrap();
let table_path = table_dir.path().to_str().unwrap();

let batch = get_record_batch(None, false);
let partition_cols = vec!["modified".to_string()];
let table = create_initialized_table(&partition_cols).await;
let table = create_initialized_table(table_path, &partition_cols).await;
let mut writer = RecordBatchWriter::for_table(&table).unwrap();

let partitions = writer.divide_by_partition_values(&batch).unwrap();
Expand Down Expand Up @@ -613,9 +622,11 @@ mod tests {

#[tokio::test]
async fn test_divide_record_batch_multiple_partitions() {
let table_dir = tempfile::tempdir().unwrap();
let table_path = table_dir.path().to_str().unwrap();
let batch = get_record_batch(None, false);
let partition_cols = vec!["modified".to_string(), "id".to_string()];
let table = create_initialized_table(&partition_cols).await;
let table = create_initialized_table(table_path, &partition_cols).await;
let mut writer = RecordBatchWriter::for_table(&table).unwrap();

let partitions = writer.divide_by_partition_values(&batch).unwrap();
Expand All @@ -631,9 +642,11 @@ mod tests {

#[tokio::test]
async fn test_write_no_partitions() {
let table_dir = tempfile::tempdir().unwrap();
let table_path = table_dir.path().to_str().unwrap();
let batch = get_record_batch(None, false);
let partition_cols = vec![];
let table = create_initialized_table(&partition_cols).await;
let table = create_initialized_table(table_path, &partition_cols).await;
let mut writer = RecordBatchWriter::for_table(&table).unwrap();

writer.write(batch).await.unwrap();
Expand All @@ -643,9 +656,11 @@ mod tests {

#[tokio::test]
async fn test_write_multiple_partitions() {
let table_dir = tempfile::tempdir().unwrap();
let table_path = table_dir.path().to_str().unwrap();
let batch = get_record_batch(None, false);
let partition_cols = vec!["modified".to_string(), "id".to_string()];
let table = create_initialized_table(&partition_cols).await;
let table = create_initialized_table(table_path, &partition_cols).await;
let mut writer = RecordBatchWriter::for_table(&table).unwrap();

writer.write(batch).await.unwrap();
Expand Down Expand Up @@ -714,9 +729,12 @@ mod tests {

#[tokio::test]
async fn test_write_mismatched_schema() {
let table_dir = tempfile::tempdir().unwrap();
let table_path = table_dir.path().to_str().unwrap();

let batch = get_record_batch(None, false);
let partition_cols = vec![];
let table = create_initialized_table(&partition_cols).await;
let table = create_initialized_table(table_path, &partition_cols).await;
let mut writer = RecordBatchWriter::for_table(&table).unwrap();

// Write the first batch with the first schema to the table
Expand Down Expand Up @@ -895,9 +913,12 @@ mod tests {

#[tokio::test]
async fn test_schema_evolution_column_type_mismatch() {
let table_dir = tempfile::tempdir().unwrap();
let table_path = table_dir.path().to_str().unwrap();

let batch = get_record_batch(None, false);
let partition_cols = vec![];
let mut table = create_initialized_table(&partition_cols).await;
let mut table = create_initialized_table(table_path, &partition_cols).await;

let mut writer = RecordBatchWriter::for_table(&table).unwrap();

Expand Down
7 changes: 2 additions & 5 deletions crates/core/src/writer/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -290,13 +290,10 @@ pub fn create_bare_table() -> DeltaTable {
.unwrap()
}

pub async fn create_initialized_table(partition_cols: &[String]) -> DeltaTable {
pub async fn create_initialized_table(table_path: &str, partition_cols: &[String]) -> DeltaTable {
let table_schema: StructType = get_delta_schema();
let table_dir = tempfile::tempdir().unwrap();
let table_path = table_dir.path();

CreateBuilder::new()
.with_location(table_path.to_str().unwrap())
.with_location(table_path)
.with_table_name("test-table")
.with_comment("A table for running tests")
.with_columns(table_schema.fields().cloned())
Expand Down
45 changes: 26 additions & 19 deletions crates/core/tests/command_restore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,13 @@ use std::fs;
use std::sync::Arc;
use std::thread;
use std::time::Duration;
use tempfile::TempDir;

#[derive(Debug)]
struct Context {
pub tmp_dir: TempDir,
pub table: DeltaTable,
}

async fn setup_test() -> Result<Context, Box<dyn Error>> {
async fn setup_test(table_uri: &str) -> Result<Context, Box<dyn Error>> {
let columns = vec![
StructField::new(
"id".to_string(),
Expand All @@ -34,9 +32,6 @@ async fn setup_test() -> Result<Context, Box<dyn Error>> {
true,
),
];

let tmp_dir = tempfile::tempdir().unwrap();
let table_uri = tmp_dir.path().to_str().to_owned().unwrap();
let table = DeltaOps::try_from_uri(table_uri)
.await?
.create()
Expand Down Expand Up @@ -65,7 +60,7 @@ async fn setup_test() -> Result<Context, Box<dyn Error>> {
.await
.unwrap();

Ok(Context { tmp_dir, table })
Ok(Context { table })
}

fn get_record_batch() -> RecordBatch {
Expand Down Expand Up @@ -95,13 +90,16 @@ fn get_record_batch() -> RecordBatch {

#[tokio::test]
async fn test_restore_by_version() -> Result<(), Box<dyn Error>> {
let context = setup_test().await?;
let tmp_dir = tempfile::tempdir().unwrap();
let table_uri = tmp_dir.path().to_str().to_owned().unwrap();

let context = setup_test(table_uri).await?;
let table = context.table;
let result = DeltaOps(table).restore().with_version_to_restore(1).await?;
assert_eq!(result.1.num_restored_file, 1);
assert_eq!(result.1.num_removed_file, 2);
assert_eq!(result.0.snapshot()?.version(), 4);
let table_uri = context.tmp_dir.path().to_str().to_owned().unwrap();

let mut table = DeltaOps::try_from_uri(table_uri).await?;
table.0.load_version(1).await?;
let curr_files = table.0.snapshot()?.file_paths_iter().collect_vec();
Expand All @@ -118,7 +116,9 @@ async fn test_restore_by_version() -> Result<(), Box<dyn Error>> {

#[tokio::test]
async fn test_restore_by_datetime() -> Result<(), Box<dyn Error>> {
let context = setup_test().await?;
let tmp_dir = tempfile::tempdir().unwrap();
let table_uri = tmp_dir.path().to_str().to_owned().unwrap();
let context = setup_test(table_uri).await?;
let table = context.table;
let version = 1;

Expand All @@ -142,7 +142,9 @@ async fn test_restore_by_datetime() -> Result<(), Box<dyn Error>> {

#[tokio::test]
async fn test_restore_with_error_params() -> Result<(), Box<dyn Error>> {
let context = setup_test().await?;
let tmp_dir = tempfile::tempdir().unwrap();
let table_uri = tmp_dir.path().to_str().to_owned().unwrap();
let context = setup_test(table_uri).await?;
let table = context.table;
let history = table.history(Some(10)).await?;
let timestamp = history.get(1).unwrap().timestamp.unwrap();
Expand All @@ -157,7 +159,6 @@ async fn test_restore_with_error_params() -> Result<(), Box<dyn Error>> {
assert!(result.is_err());

// version too large
let table_uri = context.tmp_dir.path().to_str().to_owned().unwrap();
let ops = DeltaOps::try_from_uri(table_uri).await?;
let result = ops.restore().with_version_to_restore(5).await;
assert!(result.is_err());
Expand All @@ -166,10 +167,12 @@ async fn test_restore_with_error_params() -> Result<(), Box<dyn Error>> {

#[tokio::test]
async fn test_restore_file_missing() -> Result<(), Box<dyn Error>> {
let context = setup_test().await?;
let tmp_dir = tempfile::tempdir().unwrap();
let table_uri = tmp_dir.path().to_str().to_owned().unwrap();
let context = setup_test(table_uri).await?;

for file in context.table.snapshot()?.log_data() {
let p = context.tmp_dir.path().join(file.path().as_ref());
let p = tmp_dir.path().join(file.path().as_ref());
fs::remove_file(p).unwrap();
}

Expand All @@ -179,7 +182,7 @@ async fn test_restore_file_missing() -> Result<(), Box<dyn Error>> {
.all_tombstones(context.table.object_store().clone())
.await?
{
let p = context.tmp_dir.path().join(file.clone().path);
let p = tmp_dir.path().join(file.clone().path);
fs::remove_file(p).unwrap();
}

Expand All @@ -193,10 +196,12 @@ async fn test_restore_file_missing() -> Result<(), Box<dyn Error>> {

#[tokio::test]
async fn test_restore_allow_file_missing() -> Result<(), Box<dyn Error>> {
let context = setup_test().await?;
let tmp_dir = tempfile::tempdir().unwrap();
let table_uri = tmp_dir.path().to_str().to_owned().unwrap();
let context = setup_test(table_uri).await?;

for file in context.table.snapshot()?.log_data() {
let p = context.tmp_dir.path().join(file.path().as_ref());
let p = tmp_dir.path().join(file.path().as_ref());
fs::remove_file(p).unwrap();
}

Expand All @@ -206,7 +211,7 @@ async fn test_restore_allow_file_missing() -> Result<(), Box<dyn Error>> {
.all_tombstones(context.table.object_store().clone())
.await?
{
let p = context.tmp_dir.path().join(file.clone().path);
let p = tmp_dir.path().join(file.clone().path);
fs::remove_file(p).unwrap();
}

Expand All @@ -221,7 +226,9 @@ async fn test_restore_allow_file_missing() -> Result<(), Box<dyn Error>> {

#[tokio::test]
async fn test_restore_transaction_conflict() -> Result<(), Box<dyn Error>> {
let context = setup_test().await?;
let tmp_dir = tempfile::tempdir().unwrap();
let table_uri = tmp_dir.path().to_str().to_owned().unwrap();
let context = setup_test(table_uri).await?;
let mut table = context.table;
table.load_version(2).await?;

Expand Down
10 changes: 6 additions & 4 deletions crates/core/tests/integration_datafusion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1184,9 +1184,11 @@ async fn simple_query(context: &IntegrationContext) -> TestResult {
}

mod date_partitions {
use tempfile::TempDir;

use super::*;

async fn setup_test() -> Result<DeltaTable, Box<dyn Error>> {
async fn setup_test(table_uri: &str) -> Result<DeltaTable, Box<dyn Error>> {
let columns = vec![
StructField::new(
"id".to_owned(),
Expand All @@ -1200,8 +1202,6 @@ mod date_partitions {
),
];

let tmp_dir = tempfile::tempdir().unwrap();
let table_uri = tmp_dir.path().to_str().to_owned().unwrap();
let dt = DeltaOps::try_from_uri(table_uri)
.await?
.create()
Expand Down Expand Up @@ -1238,7 +1238,9 @@ mod date_partitions {
#[tokio::test]
async fn test_issue_1445_date_partition() -> Result<()> {
let ctx = SessionContext::new();
let mut dt = setup_test().await.unwrap();
let tmp_dir = tempfile::tempdir().unwrap();
let table_uri = tmp_dir.path().to_str().to_owned().unwrap();
let mut dt = setup_test(table_uri).await.unwrap();
let mut writer = RecordBatchWriter::for_table(&dt)?;
write(
&mut writer,
Expand Down
Loading

0 comments on commit d160dcc

Please sign in to comment.