Skip to content

Commit

Permalink
fix: reading single latest version in cdf
Browse files Browse the repository at this point in the history
Signed-off-by: Ion Koutsouris <[email protected]>
  • Loading branch information
ion-elgreco committed Feb 14, 2025
1 parent 6630851 commit fb92039
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 5 deletions.
2 changes: 1 addition & 1 deletion crates/core/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ pub enum DeltaTableError {
#[error("Reading a table version: {version} that does not have change data enabled")]
ChangeDataNotEnabled { version: i64 },

#[error("Invalid version start version {start} is greater than version {end}")]
#[error("Invalid version. Start version {start} is greater than end version {end}")]
ChangeDataInvalidVersionRange { start: i64, end: i64 },

#[error("End timestamp {ending_timestamp} is greater than latest commit timestamp")]
Expand Down
5 changes: 3 additions & 2 deletions crates/core/src/logstore/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -467,7 +467,9 @@ pub async fn get_latest_version(
// This implies no files were fetched during list_offset so either the starting_version is the latest
// or starting_version is invalid, so we use current_version -1, and do one more try.
if empty_stream {
let obj_meta = object_store.head(&commit_uri_from_version(max_version)).await;
let obj_meta = object_store
.head(&commit_uri_from_version(max_version))
.await;
if obj_meta.is_err() {
return Box::pin(get_latest_version(log_store, -1)).await;
}
Expand All @@ -480,7 +482,6 @@ pub async fn get_latest_version(
Ok(version)
}


/// Default implementation for retrieving the earliest version
pub async fn get_earliest_version(
log_store: &dyn LogStore,
Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/operations/load_cdf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ impl CdfLoadBuilder {
Err(DeltaTableError::ChangeDataInvalidVersionRange { start, end })
};
}
if start >= latest_version {
if start > latest_version {
return if self.allow_out_of_range {
Ok((change_files, add_files, remove_files))
} else {
Expand Down
36 changes: 35 additions & 1 deletion python/tests/test_cdf.py
Original file line number Diff line number Diff line change
Expand Up @@ -687,7 +687,10 @@ def test_read_cdf_version_out_of_range():
with pytest.raises(DeltaError) as e:
dt.load_cdf(4).read_all().to_pydict()

assert "invalid table version" in str(e).lower()
assert (
"invalid version. start version 4 is greater than end version 3"
in str(e).lower()
)


def test_read_cdf_version_out_of_range_with_flag():
Expand All @@ -714,3 +717,34 @@ def test_read_timestamp_cdf_out_of_range_with_flag():
b = dt.load_cdf(starting_timestamp=start, allow_out_of_range=True).read_all()

assert len(b) == 0


def test_read_cdf_last_version(tmp_path):
data = pa.Table.from_pydict({"foo": [1, 2, 3]})

expected = pa.Table.from_pydict(
{
"foo": [1, 2, 3],
"_change_type": ["insert", "insert", "insert"],
"_commit_version": [0, 0, 0],
}
)

write_deltalake(
tmp_path,
data=data,
configuration={"delta.enableChangeDataFeed": "true"},
)

data = (
DeltaTable(tmp_path)
.load_cdf(
starting_version=0,
ending_version=0,
allow_out_of_range=False,
columns=["foo", "_change_type", "_commit_version"],
)
.read_all()
)

assert expected == data

0 comments on commit fb92039

Please sign in to comment.