Skip to content

Commit

Permalink
test: integration tests lakefs
Browse files Browse the repository at this point in the history
  • Loading branch information
ion-elgreco committed Jan 6, 2025
1 parent 523d86f commit 64d0bb5
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 30 deletions.
3 changes: 2 additions & 1 deletion .github/workflows/python_build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ jobs:
run: make setup-dat

- name: Run tests
run: uv run pytest -m '((s3 or azure) and integration) or not integration and not benchmark' --doctest-modules
run: uv run pytest -m '((s3 or azure or lakefs) and integration) or not integration and not benchmark' --doctest-modules

- name: Test without pandas
run: |
Expand Down Expand Up @@ -109,6 +109,7 @@ jobs:
- name: Run tests
run: make test-pyspark


multi-python-running:
name: Running with Python ${{ matrix.python-version }}
runs-on: ubuntu-latest
Expand Down
10 changes: 8 additions & 2 deletions crates/core/src/protocol/checkpoints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,10 @@ impl From<Utf8Error> for ProtocolError {
pub const CHECKPOINT_RECORD_BATCH_SIZE: usize = 5000;

/// Creates checkpoint at current table version
pub async fn create_checkpoint(table: &DeltaTable, operation_id: Option<Uuid>) -> Result<(), ProtocolError> {
pub async fn create_checkpoint(
table: &DeltaTable,
operation_id: Option<Uuid>,
) -> Result<(), ProtocolError> {
create_checkpoint_for(
table.version(),
table.snapshot().map_err(|_| ProtocolError::NoMetaData)?,
Expand All @@ -100,7 +103,10 @@ pub async fn create_checkpoint(table: &DeltaTable, operation_id: Option<Uuid>) -

/// Delete expires log files before given version from table. The table log retention is based on
/// the `logRetentionDuration` property of the Delta Table, 30 days by default.
pub async fn cleanup_metadata(table: &DeltaTable, operation_id: Option<Uuid>) -> Result<usize, ProtocolError> {
pub async fn cleanup_metadata(
table: &DeltaTable,
operation_id: Option<Uuid>,
) -> Result<usize, ProtocolError> {
let log_retention_timestamp = Utc::now().timestamp_millis()
- table
.snapshot()
Expand Down
101 changes: 74 additions & 27 deletions python/tests/test_lakefs.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import os
import uuid
from datetime import timedelta
from typing import TYPE_CHECKING

import pyarrow as pa
import pytest
Expand All @@ -12,6 +13,18 @@
from deltalake.writer import write_deltalake
from tests.test_alter import _sort_fields

if TYPE_CHECKING:
import lakefs as lakefs


@pytest.fixture
def lakefs_client():
import lakefs

return lakefs.Client(
username="LAKEFSID", password="LAKEFSKEY", host="http://127.0.0.1:8000"
)


@pytest.fixture
def lakefs_path() -> str:
Expand All @@ -28,6 +41,7 @@ def lakefs_storage_options():
}


@pytest.mark.lakefs
def test_create(lakefs_path: str, sample_data: pa.Table, lakefs_storage_options):
dt = DeltaTable.create(
lakefs_path,
Expand Down Expand Up @@ -75,6 +89,7 @@ def test_create(lakefs_path: str, sample_data: pa.Table, lakefs_storage_options)
assert last_action["operation"] == "CREATE OR REPLACE TABLE"


@pytest.mark.lakefs
def test_delete(lakefs_path: str, sample_data: pa.Table, lakefs_storage_options):
write_deltalake(
lakefs_path,
Expand All @@ -98,8 +113,7 @@ def test_delete(lakefs_path: str, sample_data: pa.Table, lakefs_storage_options)
assert len(dt.files()) == 0


# TODO: figure out how to run multiple commit operations :S
@pytest.mark.skip
@pytest.mark.lakefs
def test_optimize_min_commit_interval(
lakefs_path: str, sample_data: pa.Table, lakefs_storage_options
):
Expand Down Expand Up @@ -139,8 +153,8 @@ def test_optimize_min_commit_interval(
assert dt.version() == old_version + 5


@pytest.mark.lakefs
def test_optimize(lakefs_path: str, sample_data: pa.Table, lakefs_storage_options):
print(lakefs_path)
write_deltalake(
lakefs_path,
sample_data,
Expand Down Expand Up @@ -176,37 +190,25 @@ def test_optimize(lakefs_path: str, sample_data: pa.Table, lakefs_storage_option
assert dt.version() == old_version + 1


# TODO: delete file from LakeFS and commit deletion
@pytest.mark.skip
def test_repair_with_dry_run(lakefs_path, sample_data, lakefs_storage_options):
@pytest.mark.lakefs
def test_repair_wo_dry_run(
lakefs_path, sample_data, lakefs_storage_options, lakefs_client: "lakefs.Client"
):
import lakefs

write_deltalake(
lakefs_path, sample_data, mode="append", storage_options=lakefs_storage_options
)
write_deltalake(
lakefs_path, sample_data, mode="append", storage_options=lakefs_storage_options
)
dt = DeltaTable(lakefs_path, storage_options=lakefs_storage_options)
os.remove(dt.file_uris()[0])

metrics = dt.repair(dry_run=True)
last_action = dt.history(1)[0]

assert len(metrics["files_removed"]) == 1
assert metrics["dry_run"] is True
assert last_action["operation"] == "WRITE"


# TODO: delete file from LakeFS and commit deletion
@pytest.mark.skip
def test_repair_wo_dry_run(lakefs_path, sample_data, lakefs_storage_options):
write_deltalake(
lakefs_path, sample_data, mode="append", storage_options=lakefs_storage_options
branch = lakefs.Branch(
repository_id="bronze", branch_id="main", client=lakefs_client
)
write_deltalake(
lakefs_path, sample_data, mode="append", storage_options=lakefs_storage_options
)
dt = DeltaTable(lakefs_path, storage_options=lakefs_storage_options)
os.remove(dt.file_uris()[0])
branch.object(dt.file_uris()[0].replace("lakefs://bronze/main/", "")).delete()
branch.commit("remove commit file for test")

commit_properties = CommitProperties(custom_metadata={"userName": "John Doe"})
metrics = dt.repair(dry_run=False, commit_properties=commit_properties)
Expand All @@ -218,6 +220,7 @@ def test_repair_wo_dry_run(lakefs_path, sample_data, lakefs_storage_options):
assert last_action["userName"] == "John Doe"


@pytest.mark.lakefs
def test_add_constraint(lakefs_path, sample_table: pa.Table, lakefs_storage_options):
write_deltalake(lakefs_path, sample_table, storage_options=lakefs_storage_options)

Expand Down Expand Up @@ -253,6 +256,7 @@ def test_add_constraint(lakefs_path, sample_table: pa.Table, lakefs_storage_opti
)


@pytest.mark.lakefs
def test_drop_constraint(lakefs_path, sample_table: pa.Table, lakefs_storage_options):
write_deltalake(lakefs_path, sample_table, storage_options=lakefs_storage_options)

Expand All @@ -265,6 +269,7 @@ def test_drop_constraint(lakefs_path, sample_table: pa.Table, lakefs_storage_opt
assert dt.version() == 2


@pytest.mark.lakefs
def test_set_table_properties(
lakefs_path, sample_table: pa.Table, lakefs_storage_options
):
Expand All @@ -284,6 +289,7 @@ def test_set_table_properties(
assert protocol.min_writer_version == 4


@pytest.mark.lakefs
def test_add_feautres(lakefs_path, sample_table: pa.Table, lakefs_storage_options):
write_deltalake(lakefs_path, sample_table, storage_options=lakefs_storage_options)
dt = DeltaTable(lakefs_path, storage_options=lakefs_storage_options)
Expand All @@ -309,6 +315,7 @@ def test_add_feautres(lakefs_path, sample_table: pa.Table, lakefs_storage_option
) # type: ignore


@pytest.mark.lakefs
def test_merge(lakefs_path, sample_table: pa.Table, lakefs_storage_options):
write_deltalake(
lakefs_path, sample_table, mode="append", storage_options=lakefs_storage_options
Expand Down Expand Up @@ -349,6 +356,7 @@ def test_merge(lakefs_path, sample_table: pa.Table, lakefs_storage_options):
assert result == expected


@pytest.mark.lakefs
def test_restore(
lakefs_path,
sample_data: pa.Table,
Expand All @@ -374,6 +382,7 @@ def test_restore(
assert dt.version() == old_version + 1


@pytest.mark.lakefs
def test_add_column(lakefs_path, sample_data: pa.Table, lakefs_storage_options):
write_deltalake(
lakefs_path, sample_data, mode="append", storage_options=lakefs_storage_options
Expand All @@ -393,6 +402,7 @@ def test_add_column(lakefs_path, sample_data: pa.Table, lakefs_storage_options):
[*current_fields, *new_fields_to_add]
)


@pytest.fixture()
def sample_table_update():
nrows = 5
Expand All @@ -407,9 +417,14 @@ def sample_table_update():
}
)


@pytest.mark.lakefs
def test_update(lakefs_path, sample_table_update: pa.Table, lakefs_storage_options):
write_deltalake(
lakefs_path, sample_table_update, mode="append", storage_options=lakefs_storage_options
lakefs_path,
sample_table_update,
mode="append",
storage_options=lakefs_storage_options,
)

dt = DeltaTable(lakefs_path, storage_options=lakefs_storage_options)
Expand Down Expand Up @@ -437,4 +452,36 @@ def test_update(lakefs_path, sample_table_update: pa.Table, lakefs_storage_optio

assert last_action["operation"] == "UPDATE"
assert last_action["userName"] == "John Doe"
assert result == expected
assert result == expected


@pytest.mark.lakefs
def test_checkpoint(sample_data: pa.Table, lakefs_storage_options, lakefs_client):
import lakefs

table = str(uuid.uuid4())
tmp_table_path = os.path.join("lakefs://bronze/main", table)
checkpoint_path = os.path.join(table, "_delta_log", "_last_checkpoint")
last_checkpoint_path = os.path.join(
table, "_delta_log", "00000000000000000000.checkpoint.parquet"
)

branch = lakefs.Branch(
repository_id="bronze", branch_id="main", client=lakefs_client
)

# TODO: Include binary after fixing issue "Json error: binary type is not supported"
sample_data = sample_data.drop(["binary"])
write_deltalake(
str(tmp_table_path), sample_data, storage_options=lakefs_storage_options
)

assert not branch.object(checkpoint_path).exists()

delta_table = DeltaTable(
str(tmp_table_path), storage_options=lakefs_storage_options
)
delta_table.create_checkpoint()

assert branch.object(last_checkpoint_path).exists()
assert branch.object(checkpoint_path).exists()

0 comments on commit 64d0bb5

Please sign in to comment.