diff --git a/crates/core/src/operations/optimize.rs b/crates/core/src/operations/optimize.rs index edabaa91c0..c826e57bde 100644 --- a/crates/core/src/operations/optimize.rs +++ b/crates/core/src/operations/optimize.rs @@ -185,7 +185,7 @@ pub struct OptimizeBuilder<'a> { optimize_type: OptimizeType, min_commit_interval: Option, /// Indicates whether the writes should be committed - commit_writes: bool + commit_writes: bool, } impl<'a> OptimizeBuilder<'a> { @@ -273,17 +273,15 @@ impl<'a> OptimizeBuilder<'a> { impl<'a> OptimizeBuilder<'a> { /// Commit writes after processing - pub async fn commit_writes( - self, - commit_info: CommitContext - ) -> DeltaResult { + pub async fn commit_writes(self, commit_info: CommitContext) -> DeltaResult { commit( - self.log_store.as_ref(), - &commit_info.actions, - commit_info.operation.clone().into(), - commit_info.snapshot.as_ref(), - commit_info.app_metadata.clone() - ).await?; + self.log_store.as_ref(), + &commit_info.actions, + commit_info.operation.clone().into(), + commit_info.snapshot.as_ref(), + commit_info.app_metadata.clone(), + ) + .await?; let mut table = DeltaTable::new_with_state(self.log_store, self.snapshot); @@ -324,7 +322,7 @@ impl<'a> std::future::IntoFuture for OptimizeBuilder<'a> { this.max_spill_size, this.min_commit_interval, this.app_metadata, - this.commit_writes + this.commit_writes, ) .await?; @@ -657,9 +655,8 @@ impl MergePlan { max_spill_size: usize, min_commit_interval: Option, app_metadata: Option>, - commit_writes: bool + commit_writes: bool, ) -> Result<(Metrics, Option), DeltaTableError> { - let operations = std::mem::take(&mut self.operations); let stream = match operations { OptimizeOperations::Compact(bins) => futures::stream::iter(bins) @@ -794,7 +791,6 @@ impl MergePlan { table.update().await?; if commit_writes { - debug!("committing {} actions", actions.len()); //// TODO: Check for remove actions on optimized partitions. If a //// optimized partition was updated then abort the commit. Requires (#593). @@ -806,12 +802,10 @@ impl MergePlan { Some(app_metadata.clone()), ) .await?; - } else { // Save the actions buffer so the client can commit later total_actions.extend(actions) } - } if end { @@ -820,12 +814,12 @@ impl MergePlan { } let commit_context = if !commit_writes { - Some(CommitContext{ - actions: total_actions, - app_metadata, - snapshot: Some(snapshot.clone()), // using original table snapshot as all commits are added at once - operation: self.task_parameters.input_parameters.clone() - }) + Some(CommitContext { + actions: total_actions, + app_metadata, + snapshot: Some(snapshot.clone()), // using original table snapshot as all commits are added at once + operation: self.task_parameters.input_parameters.clone(), + }) } else { None }; @@ -838,7 +832,6 @@ impl MergePlan { total_metrics.files_removed.min = 0; } - Ok((total_metrics, commit_context)) } } diff --git a/crates/core/tests/command_optimize.rs b/crates/core/tests/command_optimize.rs index 1df93d2227..36b911d71b 100644 --- a/crates/core/tests/command_optimize.rs +++ b/crates/core/tests/command_optimize.rs @@ -240,7 +240,10 @@ async fn test_optimize_non_partitioned_table_deferred_commit() -> Result<(), Box let version = dt.version(); assert_eq!(dt.get_files_count(), 5); - let optimize = DeltaOps(dt).optimize().with_target_size(2_000_000).with_commit_writes(false); + let optimize = DeltaOps(dt) + .optimize() + .with_target_size(2_000_000) + .with_commit_writes(false); let (dt, metrics, commit_context) = optimize.await?; // Still have same version, and file count, @@ -255,9 +258,10 @@ async fn test_optimize_non_partitioned_table_deferred_commit() -> Result<(), Box let commit_info = dt.history(None).await?; assert_eq!(commit_info.len(), 6); - let dt_commit = DeltaOps(dt).optimize().commit_writes( - commit_context.unwrap() - ).await?; + let dt_commit = DeltaOps(dt) + .optimize() + .commit_writes(commit_context.unwrap()) + .await?; assert_eq!(version + 1, dt_commit.version()); @@ -309,7 +313,10 @@ async fn test_optimize_with_partitions_deferred_commit() -> Result<(), Box Result<(), Box Result<(), Box> { 20, Some(Duration::from_secs(0)), // this will cause as many commits as num_files_added None, - true + true, ) .await?; assert_eq!(metrics.num_files_added, 2); diff --git a/python/deltalake/_internal.pyi b/python/deltalake/_internal.pyi index 8f239066fe..4e1c9bf456 100644 --- a/python/deltalake/_internal.pyi +++ b/python/deltalake/_internal.pyi @@ -59,9 +59,7 @@ class RawDeltaTable: enforce_retention_duration: bool, custom_metadata: Optional[Dict[str, str]], ) -> List[str]: ... - def commit_optimize( - self - ) -> None: ... + def commit_optimize(self) -> None: ... def compact_optimize( self, partition_filters: Optional[FilterType], @@ -70,7 +68,7 @@ class RawDeltaTable: min_commit_interval: Optional[int], writer_properties: Optional[Dict[str, Optional[str]]], custom_metadata: Optional[Dict[str, str]], - commit_writes: bool + commit_writes: bool, ) -> str: ... def z_order_optimize( self, diff --git a/python/deltalake/table.py b/python/deltalake/table.py index c159bf6f41..8417db415d 100644 --- a/python/deltalake/table.py +++ b/python/deltalake/table.py @@ -1857,9 +1857,7 @@ def __call__( return self.compact(partition_filters, target_size, max_concurrent_tasks) - def commit( - self - ) -> Dict[str, Any]: + def commit(self) -> None: self.table._table.commit_optimize() def compact( @@ -1870,7 +1868,7 @@ def compact( min_commit_interval: Optional[Union[int, timedelta]] = None, writer_properties: Optional[WriterProperties] = None, custom_metadata: Optional[Dict[str, str]] = None, - commit_writes: bool = True + commit_writes: bool = True, ) -> Dict[str, Any]: """ Compacts small files to reduce the total number of files in the table. @@ -1924,7 +1922,7 @@ def compact( min_commit_interval, writer_properties._to_dict() if writer_properties else None, custom_metadata, - commit_writes + commit_writes, ) self.table.update_incremental() return json.loads(metrics) diff --git a/python/src/lib.rs b/python/src/lib.rs index 3b63dde917..7e45e625fd 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -366,14 +366,14 @@ impl RawDeltaTable { .with_max_concurrent_tasks(num_cpus::get()); if self._commit.is_none() { - return Err(PyValueError::new_err("Cannot commit optimization, nothing to commit.")) + return Err(PyValueError::new_err( + PyValueError::new_err("Cannot commit optimization, nothing to commit." + )); } - let table = rt()?.block_on( - cmd.commit_writes( - self._commit.take().unwrap() - ) - ).map_err(PythonError::from)?; + let table = rt()? + .block_on(cmd.commit_writes(self._commit.take().unwrap())) + .map_err(PythonError::from)?; self._table.state = table.state; self._commit = None; @@ -431,8 +431,9 @@ impl RawDeltaTable { cmd = cmd.with_metadata(json_metadata); }; - let converted_filters = convert_partition_filters(partition_filters.unwrap_or_default()) - .map_err(PythonError::from)?; + let converted_filters = + convert_partition_filters(partition_filters.unwrap_or_default()) + .map_err(PythonError::from)?; cmd = cmd.with_filters(&converted_filters); let (table, metrics, commit_info) = rt()? diff --git a/python/tests/test_optimize.py b/python/tests/test_optimize.py index 8419d495da..bdfb337836 100644 --- a/python/tests/test_optimize.py +++ b/python/tests/test_optimize.py @@ -79,6 +79,7 @@ def test_z_order_optimize( assert dt.version() == old_version + 1 assert len(dt.file_uris()) == 1 + def test_optimize_deferred_write( tmp_path: pathlib.Path, sample_data: pa.Table, @@ -104,6 +105,7 @@ def test_optimize_deferred_write( last_action = dt.history(1)[0] assert last_action["operation"] == "OPTIMIZE" + def test_optimize_min_commit_interval( tmp_path: pathlib.Path, sample_data: pa.Table,