Skip to content

Commit

Permalink
fix: get_latest_version returning incorrect version
Browse files Browse the repository at this point in the history
Signed-off-by: Ion Koutsouris <[email protected]>
  • Loading branch information
ion-elgreco committed Feb 14, 2025
1 parent cf5f38a commit a8ceac9
Showing 1 changed file with 15 additions and 7 deletions.
22 changes: 15 additions & 7 deletions crates/core/src/logstore/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -431,13 +431,8 @@ pub async fn get_latest_version(
) -> DeltaResult<i64> {
let version_start = match get_last_checkpoint(log_store).await {
Ok(last_check_point) => last_check_point.version,
Err(ProtocolError::CheckpointNotFound) => {
// no checkpoint
-1
}
Err(e) => {
return Err(DeltaTableError::from(e));
}
Err(ProtocolError::CheckpointNotFound) => -1, // no checkpoint
Err(e) => return Err(DeltaTableError::from(e)),
};

debug!("latest checkpoint version: {version_start}");
Expand All @@ -451,6 +446,7 @@ pub async fn get_latest_version(
let offset_path = commit_uri_from_version(max_version);
let object_store = log_store.object_store(None);
let mut files = object_store.list_with_offset(prefix, &offset_path);
let mut empty_stream = true;

while let Some(obj_meta) = files.next().await {
let obj_meta = obj_meta?;
Expand All @@ -461,18 +457,30 @@ pub async fn get_latest_version(
// self.version_timestamp
// .insert(log_version, obj_meta.last_modified.timestamp());
}
empty_stream = false;
}

if max_version < 0 {
return Err(DeltaTableError::not_a_table(log_store.root_uri()));
}

// This implies no files were fetched during list_offset so either the starting_version is the latest
// or starting_version is invalid, so we use current_version -1, and do one more try.
if empty_stream {
let obj_meta = object_store.head(&commit_uri_from_version(max_version)).await;
if obj_meta.is_err() {
return Box::pin(get_latest_version(log_store, -1)).await;
}
}

Ok::<i64, DeltaTableError>(max_version)
}
.await?;

Ok(version)
}


/// Default implementation for retrieving the earliest version
pub async fn get_earliest_version(
log_store: &dyn LogStore,
Expand Down

0 comments on commit a8ceac9

Please sign in to comment.