Skip to content

Commit

Permalink
fix: reading single latest version in cdf
Browse files Browse the repository at this point in the history
Signed-off-by: Ion Koutsouris <[email protected]>
  • Loading branch information
ion-elgreco committed Feb 14, 2025
1 parent a8ceac9 commit c47ab43
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 3 deletions.
2 changes: 1 addition & 1 deletion crates/core/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ pub enum DeltaTableError {
#[error("Reading a table version: {version} that does not have change data enabled")]
ChangeDataNotEnabled { version: i64 },

#[error("Invalid version start version {start} is greater than version {end}")]
#[error("Invalid version. Start version {start} is greater than end version {end}")]
ChangeDataInvalidVersionRange { start: i64, end: i64 },

#[error("End timestamp {ending_timestamp} is greater than latest commit timestamp")]
Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/operations/load_cdf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ impl CdfLoadBuilder {
Err(DeltaTableError::ChangeDataInvalidVersionRange { start, end })
};
}
if start >= latest_version {
if start > latest_version {
return if self.allow_out_of_range {
Ok((change_files, add_files, remove_files))
} else {
Expand Down
36 changes: 35 additions & 1 deletion python/tests/test_cdf.py
Original file line number Diff line number Diff line change
Expand Up @@ -687,7 +687,10 @@ def test_read_cdf_version_out_of_range():
with pytest.raises(DeltaError) as e:
dt.load_cdf(4).read_all().to_pydict()

assert "invalid table version" in str(e).lower()
assert (
"invalid version. start version 4 is greater than end version 3"
in str(e).lower()
)


def test_read_cdf_version_out_of_range_with_flag():
Expand All @@ -714,3 +717,34 @@ def test_read_timestamp_cdf_out_of_range_with_flag():
b = dt.load_cdf(starting_timestamp=start, allow_out_of_range=True).read_all()

assert len(b) == 0


def test_read_cdf_last_version(tmp_path):
data = pa.Table.from_pydict({"foo": [1, 2, 3]})

expected = pa.Table.from_pydict(
{
"foo": [1, 2, 3],
"_change_type": ["insert", "insert", "insert"],
"_commit_version": [0, 0, 0],
}
)

write_deltalake(
tmp_path,
data=data,
configuration={"delta.enableChangeDataFeed": "true"},
)

data = (
DeltaTable(tmp_path)
.load_cdf(
starting_version=0,
ending_version=0,
allow_out_of_range=False,
columns=["foo", "_change_type", "_commit_version"],
)
.read_all()
)

assert expected == data

0 comments on commit c47ab43

Please sign in to comment.