Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: load cdf latest version #3218

Merged
merged 2 commits into from
Feb 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/core/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ pub enum DeltaTableError {
#[error("Reading a table version: {version} that does not have change data enabled")]
ChangeDataNotEnabled { version: i64 },

#[error("Invalid version start version {start} is greater than version {end}")]
#[error("Invalid version. Start version {start} is greater than end version {end}")]
ChangeDataInvalidVersionRange { start: i64, end: i64 },

#[error("End timestamp {ending_timestamp} is greater than latest commit timestamp")]
Expand Down
23 changes: 16 additions & 7 deletions crates/core/src/logstore/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -431,13 +431,8 @@ pub async fn get_latest_version(
) -> DeltaResult<i64> {
let version_start = match get_last_checkpoint(log_store).await {
Ok(last_check_point) => last_check_point.version,
Err(ProtocolError::CheckpointNotFound) => {
// no checkpoint
-1
}
Err(e) => {
return Err(DeltaTableError::from(e));
}
Err(ProtocolError::CheckpointNotFound) => -1, // no checkpoint
Err(e) => return Err(DeltaTableError::from(e)),
};

debug!("latest checkpoint version: {version_start}");
Expand All @@ -451,6 +446,7 @@ pub async fn get_latest_version(
let offset_path = commit_uri_from_version(max_version);
let object_store = log_store.object_store(None);
let mut files = object_store.list_with_offset(prefix, &offset_path);
let mut empty_stream = true;

while let Some(obj_meta) = files.next().await {
let obj_meta = obj_meta?;
Expand All @@ -461,15 +457,28 @@ pub async fn get_latest_version(
// self.version_timestamp
// .insert(log_version, obj_meta.last_modified.timestamp());
}
empty_stream = false;
}

if max_version < 0 {
return Err(DeltaTableError::not_a_table(log_store.root_uri()));
}

// This implies no files were fetched during list_offset so either the starting_version is the latest
// or starting_version is invalid, so we use current_version -1, and do one more try.
if empty_stream {
let obj_meta = object_store
.head(&commit_uri_from_version(max_version))
.await;
if obj_meta.is_err() {
return Box::pin(get_latest_version(log_store, -1)).await;
}
}

Ok::<i64, DeltaTableError>(max_version)
}
.await?;

Ok(version)
}

Expand Down
10 changes: 5 additions & 5 deletions crates/core/src/operations/load_cdf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ impl CdfLoadBuilder {
Err(DeltaTableError::ChangeDataInvalidVersionRange { start, end })
};
}
if start >= latest_version {
if start > latest_version {
return if self.allow_out_of_range {
Ok((change_files, add_files, remove_files))
} else {
Expand Down Expand Up @@ -671,10 +671,10 @@ pub(crate) mod tests {
.await;

assert!(table.is_err());
assert!(matches!(
table.unwrap_err(),
DeltaTableError::InvalidVersion { .. }
));
assert!(table
.unwrap_err()
.to_string()
.contains("Invalid version. Start version 5 is greater than end version 4"));

Ok(())
}
Expand Down
37 changes: 29 additions & 8 deletions crates/core/src/writer/record_batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -503,9 +503,12 @@ mod tests {

#[tokio::test]
async fn test_buffer_len_includes_unflushed_row_group() {
let table_dir = tempfile::tempdir().unwrap();
let table_path = table_dir.path().to_str().unwrap();

let batch = get_record_batch(None, false);
let partition_cols = vec![];
let table = create_initialized_table(&partition_cols).await;
let table = create_initialized_table(table_path, &partition_cols).await;
let mut writer = RecordBatchWriter::for_table(&table).unwrap();

writer.write(batch).await.unwrap();
Expand All @@ -515,9 +518,12 @@ mod tests {

#[tokio::test]
async fn test_divide_record_batch_no_partition() {
let table_dir = tempfile::tempdir().unwrap();
let table_path = table_dir.path().to_str().unwrap();

let batch = get_record_batch(None, false);
let partition_cols = vec![];
let table = create_initialized_table(&partition_cols).await;
let table = create_initialized_table(table_path, &partition_cols).await;
let mut writer = RecordBatchWriter::for_table(&table).unwrap();

let partitions = writer.divide_by_partition_values(&batch).unwrap();
Expand All @@ -528,9 +534,12 @@ mod tests {

#[tokio::test]
async fn test_divide_record_batch_single_partition() {
let table_dir = tempfile::tempdir().unwrap();
let table_path = table_dir.path().to_str().unwrap();

let batch = get_record_batch(None, false);
let partition_cols = vec!["modified".to_string()];
let table = create_initialized_table(&partition_cols).await;
let table = create_initialized_table(table_path, &partition_cols).await;
let mut writer = RecordBatchWriter::for_table(&table).unwrap();

let partitions = writer.divide_by_partition_values(&batch).unwrap();
Expand Down Expand Up @@ -613,9 +622,11 @@ mod tests {

#[tokio::test]
async fn test_divide_record_batch_multiple_partitions() {
let table_dir = tempfile::tempdir().unwrap();
let table_path = table_dir.path().to_str().unwrap();
let batch = get_record_batch(None, false);
let partition_cols = vec!["modified".to_string(), "id".to_string()];
let table = create_initialized_table(&partition_cols).await;
let table = create_initialized_table(table_path, &partition_cols).await;
let mut writer = RecordBatchWriter::for_table(&table).unwrap();

let partitions = writer.divide_by_partition_values(&batch).unwrap();
Expand All @@ -631,9 +642,11 @@ mod tests {

#[tokio::test]
async fn test_write_no_partitions() {
let table_dir = tempfile::tempdir().unwrap();
let table_path = table_dir.path().to_str().unwrap();
let batch = get_record_batch(None, false);
let partition_cols = vec![];
let table = create_initialized_table(&partition_cols).await;
let table = create_initialized_table(table_path, &partition_cols).await;
let mut writer = RecordBatchWriter::for_table(&table).unwrap();

writer.write(batch).await.unwrap();
Expand All @@ -643,9 +656,11 @@ mod tests {

#[tokio::test]
async fn test_write_multiple_partitions() {
let table_dir = tempfile::tempdir().unwrap();
let table_path = table_dir.path().to_str().unwrap();
let batch = get_record_batch(None, false);
let partition_cols = vec!["modified".to_string(), "id".to_string()];
let table = create_initialized_table(&partition_cols).await;
let table = create_initialized_table(table_path, &partition_cols).await;
let mut writer = RecordBatchWriter::for_table(&table).unwrap();

writer.write(batch).await.unwrap();
Expand Down Expand Up @@ -714,9 +729,12 @@ mod tests {

#[tokio::test]
async fn test_write_mismatched_schema() {
let table_dir = tempfile::tempdir().unwrap();
let table_path = table_dir.path().to_str().unwrap();

let batch = get_record_batch(None, false);
let partition_cols = vec![];
let table = create_initialized_table(&partition_cols).await;
let table = create_initialized_table(table_path, &partition_cols).await;
let mut writer = RecordBatchWriter::for_table(&table).unwrap();

// Write the first batch with the first schema to the table
Expand Down Expand Up @@ -895,9 +913,12 @@ mod tests {

#[tokio::test]
async fn test_schema_evolution_column_type_mismatch() {
let table_dir = tempfile::tempdir().unwrap();
let table_path = table_dir.path().to_str().unwrap();

let batch = get_record_batch(None, false);
let partition_cols = vec![];
let mut table = create_initialized_table(&partition_cols).await;
let mut table = create_initialized_table(table_path, &partition_cols).await;

let mut writer = RecordBatchWriter::for_table(&table).unwrap();

Expand Down
7 changes: 2 additions & 5 deletions crates/core/src/writer/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -290,13 +290,10 @@ pub fn create_bare_table() -> DeltaTable {
.unwrap()
}

pub async fn create_initialized_table(partition_cols: &[String]) -> DeltaTable {
pub async fn create_initialized_table(table_path: &str, partition_cols: &[String]) -> DeltaTable {
let table_schema: StructType = get_delta_schema();
let table_dir = tempfile::tempdir().unwrap();
let table_path = table_dir.path();

CreateBuilder::new()
.with_location(table_path.to_str().unwrap())
.with_location(table_path)
.with_table_name("test-table")
.with_comment("A table for running tests")
.with_columns(table_schema.fields().cloned())
Expand Down
45 changes: 26 additions & 19 deletions crates/core/tests/command_restore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,13 @@ use std::fs;
use std::sync::Arc;
use std::thread;
use std::time::Duration;
use tempfile::TempDir;

#[derive(Debug)]
struct Context {
pub tmp_dir: TempDir,
pub table: DeltaTable,
}

async fn setup_test() -> Result<Context, Box<dyn Error>> {
async fn setup_test(table_uri: &str) -> Result<Context, Box<dyn Error>> {
let columns = vec![
StructField::new(
"id".to_string(),
Expand All @@ -34,9 +32,6 @@ async fn setup_test() -> Result<Context, Box<dyn Error>> {
true,
),
];

let tmp_dir = tempfile::tempdir().unwrap();
let table_uri = tmp_dir.path().to_str().to_owned().unwrap();
let table = DeltaOps::try_from_uri(table_uri)
.await?
.create()
Expand Down Expand Up @@ -65,7 +60,7 @@ async fn setup_test() -> Result<Context, Box<dyn Error>> {
.await
.unwrap();

Ok(Context { tmp_dir, table })
Ok(Context { table })
}

fn get_record_batch() -> RecordBatch {
Expand Down Expand Up @@ -95,13 +90,16 @@ fn get_record_batch() -> RecordBatch {

#[tokio::test]
async fn test_restore_by_version() -> Result<(), Box<dyn Error>> {
let context = setup_test().await?;
let tmp_dir = tempfile::tempdir().unwrap();
let table_uri = tmp_dir.path().to_str().to_owned().unwrap();

let context = setup_test(table_uri).await?;
let table = context.table;
let result = DeltaOps(table).restore().with_version_to_restore(1).await?;
assert_eq!(result.1.num_restored_file, 1);
assert_eq!(result.1.num_removed_file, 2);
assert_eq!(result.0.snapshot()?.version(), 4);
let table_uri = context.tmp_dir.path().to_str().to_owned().unwrap();

let mut table = DeltaOps::try_from_uri(table_uri).await?;
table.0.load_version(1).await?;
let curr_files = table.0.snapshot()?.file_paths_iter().collect_vec();
Expand All @@ -118,7 +116,9 @@ async fn test_restore_by_version() -> Result<(), Box<dyn Error>> {

#[tokio::test]
async fn test_restore_by_datetime() -> Result<(), Box<dyn Error>> {
let context = setup_test().await?;
let tmp_dir = tempfile::tempdir().unwrap();
let table_uri = tmp_dir.path().to_str().to_owned().unwrap();
let context = setup_test(table_uri).await?;
let table = context.table;
let version = 1;

Expand All @@ -142,7 +142,9 @@ async fn test_restore_by_datetime() -> Result<(), Box<dyn Error>> {

#[tokio::test]
async fn test_restore_with_error_params() -> Result<(), Box<dyn Error>> {
let context = setup_test().await?;
let tmp_dir = tempfile::tempdir().unwrap();
let table_uri = tmp_dir.path().to_str().to_owned().unwrap();
let context = setup_test(table_uri).await?;
let table = context.table;
let history = table.history(Some(10)).await?;
let timestamp = history.get(1).unwrap().timestamp.unwrap();
Expand All @@ -157,7 +159,6 @@ async fn test_restore_with_error_params() -> Result<(), Box<dyn Error>> {
assert!(result.is_err());

// version too large
let table_uri = context.tmp_dir.path().to_str().to_owned().unwrap();
let ops = DeltaOps::try_from_uri(table_uri).await?;
let result = ops.restore().with_version_to_restore(5).await;
assert!(result.is_err());
Expand All @@ -166,10 +167,12 @@ async fn test_restore_with_error_params() -> Result<(), Box<dyn Error>> {

#[tokio::test]
async fn test_restore_file_missing() -> Result<(), Box<dyn Error>> {
let context = setup_test().await?;
let tmp_dir = tempfile::tempdir().unwrap();
let table_uri = tmp_dir.path().to_str().to_owned().unwrap();
let context = setup_test(table_uri).await?;

for file in context.table.snapshot()?.log_data() {
let p = context.tmp_dir.path().join(file.path().as_ref());
let p = tmp_dir.path().join(file.path().as_ref());
fs::remove_file(p).unwrap();
}

Expand All @@ -179,7 +182,7 @@ async fn test_restore_file_missing() -> Result<(), Box<dyn Error>> {
.all_tombstones(context.table.object_store().clone())
.await?
{
let p = context.tmp_dir.path().join(file.clone().path);
let p = tmp_dir.path().join(file.clone().path);
fs::remove_file(p).unwrap();
}

Expand All @@ -193,10 +196,12 @@ async fn test_restore_file_missing() -> Result<(), Box<dyn Error>> {

#[tokio::test]
async fn test_restore_allow_file_missing() -> Result<(), Box<dyn Error>> {
let context = setup_test().await?;
let tmp_dir = tempfile::tempdir().unwrap();
let table_uri = tmp_dir.path().to_str().to_owned().unwrap();
let context = setup_test(table_uri).await?;

for file in context.table.snapshot()?.log_data() {
let p = context.tmp_dir.path().join(file.path().as_ref());
let p = tmp_dir.path().join(file.path().as_ref());
fs::remove_file(p).unwrap();
}

Expand All @@ -206,7 +211,7 @@ async fn test_restore_allow_file_missing() -> Result<(), Box<dyn Error>> {
.all_tombstones(context.table.object_store().clone())
.await?
{
let p = context.tmp_dir.path().join(file.clone().path);
let p = tmp_dir.path().join(file.clone().path);
fs::remove_file(p).unwrap();
}

Expand All @@ -221,7 +226,9 @@ async fn test_restore_allow_file_missing() -> Result<(), Box<dyn Error>> {

#[tokio::test]
async fn test_restore_transaction_conflict() -> Result<(), Box<dyn Error>> {
let context = setup_test().await?;
let tmp_dir = tempfile::tempdir().unwrap();
let table_uri = tmp_dir.path().to_str().to_owned().unwrap();
let context = setup_test(table_uri).await?;
let mut table = context.table;
table.load_version(2).await?;

Expand Down
10 changes: 6 additions & 4 deletions crates/core/tests/integration_datafusion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1184,9 +1184,11 @@ async fn simple_query(context: &IntegrationContext) -> TestResult {
}

mod date_partitions {
use tempfile::TempDir;

use super::*;

async fn setup_test() -> Result<DeltaTable, Box<dyn Error>> {
async fn setup_test(table_uri: &str) -> Result<DeltaTable, Box<dyn Error>> {
let columns = vec![
StructField::new(
"id".to_owned(),
Expand All @@ -1200,8 +1202,6 @@ mod date_partitions {
),
];

let tmp_dir = tempfile::tempdir().unwrap();
let table_uri = tmp_dir.path().to_str().to_owned().unwrap();
let dt = DeltaOps::try_from_uri(table_uri)
.await?
.create()
Expand Down Expand Up @@ -1238,7 +1238,9 @@ mod date_partitions {
#[tokio::test]
async fn test_issue_1445_date_partition() -> Result<()> {
let ctx = SessionContext::new();
let mut dt = setup_test().await.unwrap();
let tmp_dir = tempfile::tempdir().unwrap();
let table_uri = tmp_dir.path().to_str().to_owned().unwrap();
let mut dt = setup_test(table_uri).await.unwrap();
let mut writer = RecordBatchWriter::for_table(&dt)?;
write(
&mut writer,
Expand Down
Loading
Loading