From 64d0bb5055942c14d80aa1cacadfd9c247e8541a Mon Sep 17 00:00:00 2001 From: Ion Koutsouris <15728914+ion-elgreco@users.noreply.github.com> Date: Mon, 6 Jan 2025 19:46:22 +0100 Subject: [PATCH] test: integration tests lakefs --- .github/workflows/python_build.yml | 3 +- crates/core/src/protocol/checkpoints.rs | 10 ++- python/tests/test_lakefs.py | 101 +++++++++++++++++------- 3 files changed, 84 insertions(+), 30 deletions(-) diff --git a/.github/workflows/python_build.yml b/.github/workflows/python_build.yml index a76cc1f472..00f3f8c56f 100644 --- a/.github/workflows/python_build.yml +++ b/.github/workflows/python_build.yml @@ -77,7 +77,7 @@ jobs: run: make setup-dat - name: Run tests - run: uv run pytest -m '((s3 or azure) and integration) or not integration and not benchmark' --doctest-modules + run: uv run pytest -m '((s3 or azure or lakefs) and integration) or not integration and not benchmark' --doctest-modules - name: Test without pandas run: | @@ -109,6 +109,7 @@ jobs: - name: Run tests run: make test-pyspark + multi-python-running: name: Running with Python ${{ matrix.python-version }} runs-on: ubuntu-latest diff --git a/crates/core/src/protocol/checkpoints.rs b/crates/core/src/protocol/checkpoints.rs index 0cb8bbca80..f85127b772 100644 --- a/crates/core/src/protocol/checkpoints.rs +++ b/crates/core/src/protocol/checkpoints.rs @@ -87,7 +87,10 @@ impl From for ProtocolError { pub const CHECKPOINT_RECORD_BATCH_SIZE: usize = 5000; /// Creates checkpoint at current table version -pub async fn create_checkpoint(table: &DeltaTable, operation_id: Option) -> Result<(), ProtocolError> { +pub async fn create_checkpoint( + table: &DeltaTable, + operation_id: Option, +) -> Result<(), ProtocolError> { create_checkpoint_for( table.version(), table.snapshot().map_err(|_| ProtocolError::NoMetaData)?, @@ -100,7 +103,10 @@ pub async fn create_checkpoint(table: &DeltaTable, operation_id: Option) - /// Delete expires log files before given version from table. The table log retention is based on /// the `logRetentionDuration` property of the Delta Table, 30 days by default. -pub async fn cleanup_metadata(table: &DeltaTable, operation_id: Option) -> Result { +pub async fn cleanup_metadata( + table: &DeltaTable, + operation_id: Option, +) -> Result { let log_retention_timestamp = Utc::now().timestamp_millis() - table .snapshot() diff --git a/python/tests/test_lakefs.py b/python/tests/test_lakefs.py index b1d33ed960..19678b9e8c 100644 --- a/python/tests/test_lakefs.py +++ b/python/tests/test_lakefs.py @@ -1,6 +1,7 @@ import os import uuid from datetime import timedelta +from typing import TYPE_CHECKING import pyarrow as pa import pytest @@ -12,6 +13,18 @@ from deltalake.writer import write_deltalake from tests.test_alter import _sort_fields +if TYPE_CHECKING: + import lakefs as lakefs + + +@pytest.fixture +def lakefs_client(): + import lakefs + + return lakefs.Client( + username="LAKEFSID", password="LAKEFSKEY", host="http://127.0.0.1:8000" + ) + @pytest.fixture def lakefs_path() -> str: @@ -28,6 +41,7 @@ def lakefs_storage_options(): } +@pytest.mark.lakefs def test_create(lakefs_path: str, sample_data: pa.Table, lakefs_storage_options): dt = DeltaTable.create( lakefs_path, @@ -75,6 +89,7 @@ def test_create(lakefs_path: str, sample_data: pa.Table, lakefs_storage_options) assert last_action["operation"] == "CREATE OR REPLACE TABLE" +@pytest.mark.lakefs def test_delete(lakefs_path: str, sample_data: pa.Table, lakefs_storage_options): write_deltalake( lakefs_path, @@ -98,8 +113,7 @@ def test_delete(lakefs_path: str, sample_data: pa.Table, lakefs_storage_options) assert len(dt.files()) == 0 -# TODO: figure out how to run multiple commit operations :S -@pytest.mark.skip +@pytest.mark.lakefs def test_optimize_min_commit_interval( lakefs_path: str, sample_data: pa.Table, lakefs_storage_options ): @@ -139,8 +153,8 @@ def test_optimize_min_commit_interval( assert dt.version() == old_version + 5 +@pytest.mark.lakefs def test_optimize(lakefs_path: str, sample_data: pa.Table, lakefs_storage_options): - print(lakefs_path) write_deltalake( lakefs_path, sample_data, @@ -176,9 +190,12 @@ def test_optimize(lakefs_path: str, sample_data: pa.Table, lakefs_storage_option assert dt.version() == old_version + 1 -# TODO: delete file from LakeFS and commit deletion -@pytest.mark.skip -def test_repair_with_dry_run(lakefs_path, sample_data, lakefs_storage_options): +@pytest.mark.lakefs +def test_repair_wo_dry_run( + lakefs_path, sample_data, lakefs_storage_options, lakefs_client: "lakefs.Client" +): + import lakefs + write_deltalake( lakefs_path, sample_data, mode="append", storage_options=lakefs_storage_options ) @@ -186,27 +203,12 @@ def test_repair_with_dry_run(lakefs_path, sample_data, lakefs_storage_options): lakefs_path, sample_data, mode="append", storage_options=lakefs_storage_options ) dt = DeltaTable(lakefs_path, storage_options=lakefs_storage_options) - os.remove(dt.file_uris()[0]) - - metrics = dt.repair(dry_run=True) - last_action = dt.history(1)[0] - - assert len(metrics["files_removed"]) == 1 - assert metrics["dry_run"] is True - assert last_action["operation"] == "WRITE" - -# TODO: delete file from LakeFS and commit deletion -@pytest.mark.skip -def test_repair_wo_dry_run(lakefs_path, sample_data, lakefs_storage_options): - write_deltalake( - lakefs_path, sample_data, mode="append", storage_options=lakefs_storage_options + branch = lakefs.Branch( + repository_id="bronze", branch_id="main", client=lakefs_client ) - write_deltalake( - lakefs_path, sample_data, mode="append", storage_options=lakefs_storage_options - ) - dt = DeltaTable(lakefs_path, storage_options=lakefs_storage_options) - os.remove(dt.file_uris()[0]) + branch.object(dt.file_uris()[0].replace("lakefs://bronze/main/", "")).delete() + branch.commit("remove commit file for test") commit_properties = CommitProperties(custom_metadata={"userName": "John Doe"}) metrics = dt.repair(dry_run=False, commit_properties=commit_properties) @@ -218,6 +220,7 @@ def test_repair_wo_dry_run(lakefs_path, sample_data, lakefs_storage_options): assert last_action["userName"] == "John Doe" +@pytest.mark.lakefs def test_add_constraint(lakefs_path, sample_table: pa.Table, lakefs_storage_options): write_deltalake(lakefs_path, sample_table, storage_options=lakefs_storage_options) @@ -253,6 +256,7 @@ def test_add_constraint(lakefs_path, sample_table: pa.Table, lakefs_storage_opti ) +@pytest.mark.lakefs def test_drop_constraint(lakefs_path, sample_table: pa.Table, lakefs_storage_options): write_deltalake(lakefs_path, sample_table, storage_options=lakefs_storage_options) @@ -265,6 +269,7 @@ def test_drop_constraint(lakefs_path, sample_table: pa.Table, lakefs_storage_opt assert dt.version() == 2 +@pytest.mark.lakefs def test_set_table_properties( lakefs_path, sample_table: pa.Table, lakefs_storage_options ): @@ -284,6 +289,7 @@ def test_set_table_properties( assert protocol.min_writer_version == 4 +@pytest.mark.lakefs def test_add_feautres(lakefs_path, sample_table: pa.Table, lakefs_storage_options): write_deltalake(lakefs_path, sample_table, storage_options=lakefs_storage_options) dt = DeltaTable(lakefs_path, storage_options=lakefs_storage_options) @@ -309,6 +315,7 @@ def test_add_feautres(lakefs_path, sample_table: pa.Table, lakefs_storage_option ) # type: ignore +@pytest.mark.lakefs def test_merge(lakefs_path, sample_table: pa.Table, lakefs_storage_options): write_deltalake( lakefs_path, sample_table, mode="append", storage_options=lakefs_storage_options @@ -349,6 +356,7 @@ def test_merge(lakefs_path, sample_table: pa.Table, lakefs_storage_options): assert result == expected +@pytest.mark.lakefs def test_restore( lakefs_path, sample_data: pa.Table, @@ -374,6 +382,7 @@ def test_restore( assert dt.version() == old_version + 1 +@pytest.mark.lakefs def test_add_column(lakefs_path, sample_data: pa.Table, lakefs_storage_options): write_deltalake( lakefs_path, sample_data, mode="append", storage_options=lakefs_storage_options @@ -393,6 +402,7 @@ def test_add_column(lakefs_path, sample_data: pa.Table, lakefs_storage_options): [*current_fields, *new_fields_to_add] ) + @pytest.fixture() def sample_table_update(): nrows = 5 @@ -407,9 +417,14 @@ def sample_table_update(): } ) + +@pytest.mark.lakefs def test_update(lakefs_path, sample_table_update: pa.Table, lakefs_storage_options): write_deltalake( - lakefs_path, sample_table_update, mode="append", storage_options=lakefs_storage_options + lakefs_path, + sample_table_update, + mode="append", + storage_options=lakefs_storage_options, ) dt = DeltaTable(lakefs_path, storage_options=lakefs_storage_options) @@ -437,4 +452,36 @@ def test_update(lakefs_path, sample_table_update: pa.Table, lakefs_storage_optio assert last_action["operation"] == "UPDATE" assert last_action["userName"] == "John Doe" - assert result == expected \ No newline at end of file + assert result == expected + + +@pytest.mark.lakefs +def test_checkpoint(sample_data: pa.Table, lakefs_storage_options, lakefs_client): + import lakefs + + table = str(uuid.uuid4()) + tmp_table_path = os.path.join("lakefs://bronze/main", table) + checkpoint_path = os.path.join(table, "_delta_log", "_last_checkpoint") + last_checkpoint_path = os.path.join( + table, "_delta_log", "00000000000000000000.checkpoint.parquet" + ) + + branch = lakefs.Branch( + repository_id="bronze", branch_id="main", client=lakefs_client + ) + + # TODO: Include binary after fixing issue "Json error: binary type is not supported" + sample_data = sample_data.drop(["binary"]) + write_deltalake( + str(tmp_table_path), sample_data, storage_options=lakefs_storage_options + ) + + assert not branch.object(checkpoint_path).exists() + + delta_table = DeltaTable( + str(tmp_table_path), storage_options=lakefs_storage_options + ) + delta_table.create_checkpoint() + + assert branch.object(last_checkpoint_path).exists() + assert branch.object(checkpoint_path).exists()