From 4b1c050e2d6767db550ca9e2f3e5da627884ca99 Mon Sep 17 00:00:00 2001 From: Ion Koutsouris <15728914+ion-elgreco@users.noreply.github.com> Date: Sun, 15 Dec 2024 13:52:33 +0100 Subject: [PATCH 1/2] fix: raise during checkpoint creating with without_files --- crates/core/src/operations/transaction/mod.rs | 6 +++ crates/core/src/protocol/checkpoints.rs | 6 +++ python/tests/test_checkpoint.py | 40 +++++++++++++++++++ 3 files changed, 52 insertions(+) diff --git a/crates/core/src/operations/transaction/mod.rs b/crates/core/src/operations/transaction/mod.rs index 85fba6cfd9..227e9ea63a 100644 --- a/crates/core/src/operations/transaction/mod.rs +++ b/crates/core/src/operations/transaction/mod.rs @@ -82,6 +82,7 @@ use futures::future::BoxFuture; use object_store::path::Path; use object_store::Error as ObjectStoreError; use serde_json::Value; +use tracing::warn; use self::conflict_checker::{TransactionInfo, WinningCommitSummary}; use crate::checkpoints::{cleanup_expired_logs_for, create_checkpoint_for}; @@ -702,6 +703,11 @@ impl<'a> PostCommit<'a> { log_store: &LogStoreRef, version: i64, ) -> DeltaResult<()> { + if !table_state.load_config().require_files { + warn!("Checkpoint creation in post_commit_hook has been skipped due to table being initialized without files."); + return Ok(()) + } + let checkpoint_interval = table_state.config().checkpoint_interval() as i64; if ((version + 1) % checkpoint_interval) == 0 { create_checkpoint_for(version, table_state, log_store.as_ref()).await? diff --git a/crates/core/src/protocol/checkpoints.rs b/crates/core/src/protocol/checkpoints.rs index 3419d80587..0a9a7f036f 100644 --- a/crates/core/src/protocol/checkpoints.rs +++ b/crates/core/src/protocol/checkpoints.rs @@ -145,6 +145,12 @@ pub async fn create_checkpoint_for( state: &DeltaTableState, log_store: &dyn LogStore, ) -> Result<(), ProtocolError> { + if !state.load_config().require_files { + return Err(ProtocolError::Generic( + "Table has not yet been initialized with files, therefore creating a checkpoint is not possible.".to_string() + )); + } + if version != state.version() { error!( "create_checkpoint_for called with version {version} but table state contains: {}. The table state may need to be reloaded", diff --git a/python/tests/test_checkpoint.py b/python/tests/test_checkpoint.py index 309a1f3663..2bbc04fac5 100644 --- a/python/tests/test_checkpoint.py +++ b/python/tests/test_checkpoint.py @@ -9,6 +9,7 @@ import pytest from deltalake import DeltaTable, write_deltalake +from deltalake.exceptions import DeltaError from deltalake.table import PostCommitHookProperties @@ -32,6 +33,45 @@ def test_checkpoint(tmp_path: pathlib.Path, sample_data: pa.Table): assert checkpoint_path.exists() +def test_checkpoint_without_files(tmp_path: pathlib.Path, sample_data: pa.Table): + tmp_table_path = tmp_path / "path" / "to" / "table" + checkpoint_path = tmp_table_path / "_delta_log" / "_last_checkpoint" + last_checkpoint_path = ( + tmp_table_path / "_delta_log" / "00000000000000000000.checkpoint.parquet" + ) + + # TODO: Include binary after fixing issue "Json error: binary type is not supported" + sample_data = sample_data.drop(["binary"]) + write_deltalake( + str(tmp_table_path), + sample_data, + configuration={"delta.checkpointInterval": "2"}, + ) + + assert not checkpoint_path.exists() + + delta_table = DeltaTable(str(tmp_table_path), without_files=True) + with pytest.raises( + DeltaError, + match="Table has not yet been initialized with files, therefore creating a checkpoint is not possible.", + ): + delta_table.create_checkpoint() + + for i in range(3): + write_deltalake(delta_table, sample_data, mode="append") + + assert not checkpoint_path.exists() + + delta_table = DeltaTable(str(tmp_table_path), without_files=False) + delta_table.create_checkpoint() + + assert checkpoint_path.exists() + last_checkpoint_path = ( + tmp_table_path / "_delta_log" / "00000000000000000003.checkpoint.parquet" + ) + assert last_checkpoint_path.exists() + + def setup_cleanup_metadata(tmp_path: pathlib.Path, sample_data: pa.Table): tmp_table_path = tmp_path / "path" / "to" / "table" first_log_path = tmp_table_path / "_delta_log" / "00000000000000000000.json" From 13f6644376922267580602500696642dcdf21f09 Mon Sep 17 00:00:00 2001 From: Ion Koutsouris <15728914+ion-elgreco@users.noreply.github.com> Date: Sun, 15 Dec 2024 13:54:49 +0100 Subject: [PATCH 2/2] chore: fmt --- crates/core/src/operations/transaction/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/core/src/operations/transaction/mod.rs b/crates/core/src/operations/transaction/mod.rs index 227e9ea63a..88b28a8627 100644 --- a/crates/core/src/operations/transaction/mod.rs +++ b/crates/core/src/operations/transaction/mod.rs @@ -705,7 +705,7 @@ impl<'a> PostCommit<'a> { ) -> DeltaResult<()> { if !table_state.load_config().require_files { warn!("Checkpoint creation in post_commit_hook has been skipped due to table being initialized without files."); - return Ok(()) + return Ok(()); } let checkpoint_interval = table_state.config().checkpoint_interval() as i64;