From 91fdf22ab4d718774feb6502cc1dd0ada6f83e69 Mon Sep 17 00:00:00 2001 From: Will Jones Date: Wed, 11 Dec 2024 13:22:40 -0800 Subject: [PATCH 1/4] feat(rust): execute_uncommitted for merge_insert expose in python refactor: make transaction marshalling easier cleanup fix tests fix path backward compatibility fix repr get changes back prototype start to define behavior with tests --- python/python/lance/fragment.pyi | 59 ++++ rust/lance-table/src/format/index.rs | 2 +- rust/lance/src/dataset/transaction.rs | 6 +- rust/lance/src/io/commit.rs | 1 + .../src/io/commit/conflict_resolution.rs | 310 ++++++++++++++++++ rust/lance/src/utils/test.rs | 2 +- 6 files changed, 375 insertions(+), 5 deletions(-) create mode 100644 python/python/lance/fragment.pyi create mode 100644 rust/lance/src/io/commit/conflict_resolution.rs diff --git a/python/python/lance/fragment.pyi b/python/python/lance/fragment.pyi new file mode 100644 index 0000000000..d3285e90b4 --- /dev/null +++ b/python/python/lance/fragment.pyi @@ -0,0 +1,59 @@ +# SPDX-License-Identifier: Apache-2.0 +# SPDX-FileCopyrightText: Copyright The Lance Authors + +from typing import Literal, Optional + +class DeletionFile: + """ + Metadata for a deletion file. + + The deletion file contains the row ids that are tombstoned. + + Attributes + ---------- + read_version : int + The read version of the deletion file. + id : int + A unique identifier for the deletion file, used to prevent collisions. + num_deleted_rows : int + The number of rows that are deleted. + file_type : str + The type of deletion file. "array" is used for small sets, while + "bitmap" is used for large sets. + """ + + read_version: int + id: int + num_deleted_rows: int + file_type: Literal["array", "bitmap"] + + def __init__( + read_version: int, + id: int, + file_type: Literal["array", "bitmap"], + num_deleted_rows: int, + ): ... + def asdict(self) -> dict: + """Get a dictionary representation of the deletion file.""" + ... + def path(self, fragment_id: int, base_uri: Optional[str] = None) -> str: + """ + Get the path to the deletion file. + + Parameters + ---------- + fragment_id : int + The fragment id. + base_uri : str, optional + The base URI to use for the path. If not provided, a relative path + is returned. + + Returns + ------- + str + The path to the deletion file. + """ + ... + +class RowIdMeta: + pass diff --git a/rust/lance-table/src/format/index.rs b/rust/lance-table/src/format/index.rs index a2a48c3613..0cf1c6cebc 100644 --- a/rust/lance-table/src/format/index.rs +++ b/rust/lance-table/src/format/index.rs @@ -12,7 +12,7 @@ use super::pb; use lance_core::{Error, Result}; /// Index metadata -#[derive(Debug, Clone)] +#[derive(Debug, Clone, PartialEq)] pub struct Index { /// Unique ID across all dataset versions. pub uuid: Uuid, diff --git a/rust/lance/src/dataset/transaction.rs b/rust/lance/src/dataset/transaction.rs index e390ecddcc..4990fee887 100644 --- a/rust/lance/src/dataset/transaction.rs +++ b/rust/lance/src/dataset/transaction.rs @@ -96,7 +96,7 @@ pub enum BlobsOperation { } /// An operation on a dataset. -#[derive(Debug, Clone, DeepSizeOf)] +#[derive(Debug, Clone, DeepSizeOf, PartialEq)] pub enum Operation { /// Adding new fragments to the dataset. The fragments contained within /// haven't yet been assigned a final ID. @@ -171,7 +171,7 @@ pub enum Operation { }, } -#[derive(Debug, Clone)] +#[derive(Debug, Clone, PartialEq)] pub struct RewrittenIndex { pub old_id: Uuid, pub new_id: Uuid, @@ -183,7 +183,7 @@ impl DeepSizeOf for RewrittenIndex { } } -#[derive(Debug, Clone, DeepSizeOf)] +#[derive(Debug, Clone, DeepSizeOf, PartialEq)] pub struct RewriteGroup { pub old_fragments: Vec, pub new_fragments: Vec, diff --git a/rust/lance/src/io/commit.rs b/rust/lance/src/io/commit.rs index e8e77e4b41..19ba11b24d 100644 --- a/rust/lance/src/io/commit.rs +++ b/rust/lance/src/io/commit.rs @@ -50,6 +50,7 @@ use crate::index::DatasetIndexInternalExt; use crate::session::Session; use crate::Dataset; +mod conflict_resolution; #[cfg(all(feature = "dynamodb", test))] mod dynamodb; #[cfg(test)] diff --git a/rust/lance/src/io/commit/conflict_resolution.rs b/rust/lance/src/io/commit/conflict_resolution.rs new file mode 100644 index 0000000000..ad76b728fa --- /dev/null +++ b/rust/lance/src/io/commit/conflict_resolution.rs @@ -0,0 +1,310 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright The Lance Authors + +use futures::future::BoxFuture; +use lance_core::utils::mask::RowIdMask; +use lance_io::object_store::ObjectStore; +use lance_table::format::Fragment; + +use crate::{io::commit::Transaction, session::Session, Dataset, Result}; + +async fn resolve_conflicts( + transaction: Transaction, + dataset: &Dataset, +) -> Result<(Transaction, Option>>)> { + // Maybe I should grab them in here? + // TODO: return cleanup task too? + // TODO: nice errors differentiate retry-able and non-retry-able conflicts + // TODO: get diff on deletions + todo!() +} + +/// Identify which rows have been deleted or moved by the transaction. +async fn build_diff( + transaction: &Transaction, + old_fragments: &[Fragment], + object_store: &ObjectStore, + session: &Session, +) -> Result { + todo!() +} + +#[cfg(test)] +mod tests { + use std::sync::{Arc, Mutex}; + + use arrow_array::{BooleanArray, Int64Array, RecordBatch, RecordBatchIterator}; + use arrow_schema::{DataType, Field, Schema as ArrowSchema}; + use lance_core::Error; + use lance_io::object_store::ObjectStoreParams; + use lance_table::io::{commit::RenameCommitHandler, deletion::read_deletion_file}; + use url::Url; + + use crate::{ + dataset::{ + transaction::Operation, InsertBuilder, MergeInsertBuilder, MergeStats, WriteParams, + }, + utils::test::{IoStats, IoTrackingStore}, + }; + + use super::*; + + struct IOTrackingDatasetFixture { + pub dataset: Arc, + pub io_stats: Arc>, + pub store: Arc, + } + + impl IOTrackingDatasetFixture { + pub async fn new(data: Vec) -> Self { + let store = Arc::new(object_store::memory::InMemory::new()); + let (io_stats_wrapper, io_stats) = IoTrackingStore::new_wrapper(); + let store_params = ObjectStoreParams { + object_store_wrapper: Some(io_stats_wrapper), + object_store: Some((store.clone(), Url::parse("memory://test").unwrap())), + ..Default::default() + }; + + let dataset = InsertBuilder::new("memory://test") + .with_params(&WriteParams { + store_params: Some(store_params.clone()), + commit_handler: Some(Arc::new(RenameCommitHandler)), + ..Default::default() + }) + .execute(data) + .await + .unwrap(); + let dataset = Arc::new(dataset); + + Self { + dataset, + io_stats, + store, + } + } + + pub fn reset_stats(&self) { + let mut io_stats = self.io_stats.lock().unwrap(); + io_stats.read_bytes = 0; + io_stats.write_bytes = 0; + io_stats.read_iops = 0; + io_stats.write_iops = 0; + } + + pub fn get_new_stats(&self) -> IoStats { + let stats = self.io_stats.lock().unwrap().clone(); + self.reset_stats(); + stats + } + } + + #[tokio::test] + async fn test_resolve_conflicts_noop() { + todo!("Test that append, and other non-conflicting ones just return the same thing") + } + + struct UpsertFixture { + pub io_fixture: IOTrackingDatasetFixture, + pub schema: Arc, + } + + impl UpsertFixture { + pub async fn new() -> Self { + let schema = Arc::new(ArrowSchema::new(vec![ + Field::new("id", DataType::Int64, false), + Field::new("updated", DataType::Boolean, false), + ])); + let batch = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(Int64Array::from(vec![1, 2, 3])), + Arc::new(BooleanArray::from(vec![false, false, false])), + ], + ) + .unwrap(); + let fixture = IOTrackingDatasetFixture::new(vec![batch]).await; + + // do two upsert transactions + let slf = Self { + io_fixture: fixture, + schema, + }; + slf.do_upsert(vec![2]).await; + slf + } + + pub fn upsert_data(&self, ids: Vec) -> RecordBatch { + let nrows = ids.len(); + RecordBatch::try_new( + self.schema.clone(), + vec![ + Arc::new(Int64Array::from(ids)), + Arc::new(BooleanArray::from(vec![true; nrows])), + ], + ) + .unwrap() + } + + pub async fn do_upsert(&self, ids: Vec) -> MergeStats { + let batch = self.upsert_data(ids); + let reader = RecordBatchIterator::new(vec![Ok(batch)], self.schema.clone()); + MergeInsertBuilder::try_new(self.io_fixture.dataset.clone(), vec!["id".into()]) + .unwrap() + .try_build() + .unwrap() + .execute_reader(reader) + .await + .unwrap() + .1 + } + } + + #[tokio::test] + async fn test_resolve_upsert() { + let fixture = UpsertFixture::new().await; + let dataset = fixture.io_fixture.dataset.clone(); + + // check we get Ok() if we upsert a different row from original read version + let unique_rows = fixture.upsert_data(vec![3]); + let reader = RecordBatchIterator::new(vec![Ok(unique_rows)], fixture.schema.clone()); + let old_dataset = Arc::new(dataset.checkout_version(1).await.unwrap()); + let (transaction, stats) = MergeInsertBuilder::try_new(old_dataset, vec!["id".into()]) + .unwrap() + .try_build() + .unwrap() + .execute_uncommitted(reader) + .await + .unwrap(); + assert_eq!(stats.num_updated_rows, 1); + assert_eq!(stats.num_inserted_rows, 0); + fixture.io_fixture.reset_stats(); + let (new_transaction, cleanup_task) = resolve_conflicts(transaction.clone(), &dataset) + .await + .unwrap(); + let io_stats = fixture.io_fixture.get_new_stats(); + // We should have everything in the session cache + assert_eq!(io_stats.read_bytes, 0); + assert_eq!(io_stats.read_iops, 0); + // We needed to write a new deletion file + assert_eq!(io_stats.write_iops, 1); + + // Transaction should be updated + // UUID should be re-used + assert_eq!(transaction.uuid, new_transaction.uuid); + assert_eq!(transaction.read_version, new_transaction.read_version); + + fn extract_updated_frags(op: &Operation) -> Vec { + match op { + Operation::Update { + updated_fragments, .. + } => updated_fragments.clone(), + _ => panic!("Expected update operation"), + } + } + let updated_frags = extract_updated_frags(&transaction.operation); + let new_updated_frags = extract_updated_frags(&new_transaction.operation); + assert_ne!(updated_frags, new_updated_frags); + assert_eq!(updated_frags.len(), 1); + assert_eq!(new_updated_frags.len(), 1); + // Data files should be unmodified + assert_eq!(updated_frags[0].files, new_updated_frags[0].files); + // Deletion file should be different + assert_ne!( + updated_frags[0].deletion_file, + new_updated_frags[0].deletion_file + ); + + // Only one should still exist. + let deletion_vector = + read_deletion_file(&dataset.base, &updated_frags[0], dataset.object_store()) + .await + .unwrap() + .unwrap(); + assert_eq!( + deletion_vector.to_sorted_iter().collect::>(), + vec![2] + ); + + // Should have merged with existing deletion vector + let deletion_vector = + read_deletion_file(&dataset.base, &new_updated_frags[0], dataset.object_store()) + .await + .unwrap() + .unwrap(); + assert_eq!( + deletion_vector.to_sorted_iter().collect::>(), + vec![1, 2] + ); + + cleanup_task.unwrap().await.unwrap(); + let res = + read_deletion_file(&dataset.base, &new_updated_frags[0], dataset.object_store()).await; + assert!(matches!(res, Err(Error::NotFound { .. })), "{:?}", res); + } + + #[tokio::test] + async fn test_upsert_no_conflict() { + let fixture = UpsertFixture::new().await; + let dataset = fixture.io_fixture.dataset.clone(); + + // Check we get Ok() if we upsert a new row + let new_row = fixture.upsert_data(vec![4]); + let reader = RecordBatchIterator::new(vec![Ok(new_row)], fixture.schema.clone()); + let old_dataset = Arc::new(dataset.checkout_version(1).await.unwrap()); + let (transaction, stats) = MergeInsertBuilder::try_new(old_dataset, vec!["id".into()]) + .unwrap() + .try_build() + .unwrap() + .execute_uncommitted(reader) + .await + .unwrap(); + assert_eq!(stats.num_updated_rows, 0); + assert_eq!(stats.num_inserted_rows, 1); + fixture.io_fixture.reset_stats(); + let (new_transaction, cleanup_task) = resolve_conflicts(transaction.clone(), &dataset) + .await + .unwrap(); + let io_stats = fixture.io_fixture.get_new_stats(); + // We should have everything in the session cache + assert_eq!(io_stats.read_bytes, 0); + assert_eq!(io_stats.read_iops, 0); + // We didn't need to change any files because there are no conflicts + assert_eq!(io_stats.write_iops, 0); + + // Transaction should be left the same + assert_eq!(transaction.uuid, new_transaction.uuid); + assert_eq!(transaction.read_version, new_transaction.read_version); + assert_eq!(transaction.operation, new_transaction.operation); + + assert!(cleanup_task.is_none()); + } + + #[tokio::test] + async fn test_upsert_retry_error() { + let fixture = UpsertFixture::new().await; + let dataset = fixture.io_fixture.dataset.clone(); + + // We should get a retryable conflict error if we try to upsert the same + // row. + let unique_rows = fixture.upsert_data(vec![2]); + let reader = RecordBatchIterator::new(vec![Ok(unique_rows)], fixture.schema.clone()); + let old_dataset = Arc::new(dataset.checkout_version(1).await.unwrap()); + let (transaction, stats) = MergeInsertBuilder::try_new(old_dataset, vec!["id".into()]) + .unwrap() + .try_build() + .unwrap() + .execute_uncommitted(reader) + .await + .unwrap(); + assert_eq!(stats.num_updated_rows, 1); + assert_eq!(stats.num_inserted_rows, 0); + fixture.io_fixture.reset_stats(); + let err = resolve_conflicts(transaction.clone(), &dataset).await; + + if let Err(err) = err { + assert!(matches!(err, Error::CommitConflict { .. }), "{}", err); + } else { + panic!("Expected error"); + } + } +} diff --git a/rust/lance/src/utils/test.rs b/rust/lance/src/utils/test.rs index 5f7ef481ff..a46b6d0673 100644 --- a/rust/lance/src/utils/test.rs +++ b/rust/lance/src/utils/test.rs @@ -267,7 +267,7 @@ fn field_structure(fragment: &Fragment) -> Vec> { .collect::>() } -#[derive(Debug, Default)] +#[derive(Debug, Default, Clone)] pub struct IoStats { pub read_iops: u64, pub read_bytes: u64, From fb56d237a9bee95cf70fb81d2c938dfc18c3f145 Mon Sep 17 00:00:00 2001 From: Will Jones Date: Wed, 15 Jan 2025 14:54:49 -0800 Subject: [PATCH 2/4] cleanup --- python/python/lance/fragment.pyi | 59 -------------------------------- 1 file changed, 59 deletions(-) delete mode 100644 python/python/lance/fragment.pyi diff --git a/python/python/lance/fragment.pyi b/python/python/lance/fragment.pyi deleted file mode 100644 index d3285e90b4..0000000000 --- a/python/python/lance/fragment.pyi +++ /dev/null @@ -1,59 +0,0 @@ -# SPDX-License-Identifier: Apache-2.0 -# SPDX-FileCopyrightText: Copyright The Lance Authors - -from typing import Literal, Optional - -class DeletionFile: - """ - Metadata for a deletion file. - - The deletion file contains the row ids that are tombstoned. - - Attributes - ---------- - read_version : int - The read version of the deletion file. - id : int - A unique identifier for the deletion file, used to prevent collisions. - num_deleted_rows : int - The number of rows that are deleted. - file_type : str - The type of deletion file. "array" is used for small sets, while - "bitmap" is used for large sets. - """ - - read_version: int - id: int - num_deleted_rows: int - file_type: Literal["array", "bitmap"] - - def __init__( - read_version: int, - id: int, - file_type: Literal["array", "bitmap"], - num_deleted_rows: int, - ): ... - def asdict(self) -> dict: - """Get a dictionary representation of the deletion file.""" - ... - def path(self, fragment_id: int, base_uri: Optional[str] = None) -> str: - """ - Get the path to the deletion file. - - Parameters - ---------- - fragment_id : int - The fragment id. - base_uri : str, optional - The base URI to use for the path. If not provided, a relative path - is returned. - - Returns - ------- - str - The path to the deletion file. - """ - ... - -class RowIdMeta: - pass From c6d200d372581c92230d6dd7a773374e229f8414 Mon Sep 17 00:00:00 2001 From: Will Jones Date: Wed, 15 Jan 2025 15:04:58 -0800 Subject: [PATCH 3/4] wip --- rust/lance/src/io/commit/conflict_resolution.rs | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/rust/lance/src/io/commit/conflict_resolution.rs b/rust/lance/src/io/commit/conflict_resolution.rs index ad76b728fa..2cd0e7e0d3 100644 --- a/rust/lance/src/io/commit/conflict_resolution.rs +++ b/rust/lance/src/io/commit/conflict_resolution.rs @@ -12,6 +12,14 @@ async fn resolve_conflicts( transaction: Transaction, dataset: &Dataset, ) -> Result<(Transaction, Option>>)> { + // Assume dataset is already in latest version. + let start = transaction.read_version + 1; + let end = dataset.manifest().version; + + let original_dataset = dataset.checkout_version(transaction.read_version).await?; + let old_fragments = original_dataset.fragments().as_slice(); + for version in start..=end {} + // Maybe I should grab them in here? // TODO: return cleanup task too? // TODO: nice errors differentiate retry-able and non-retry-able conflicts From 5b51935263b67e64c12e1dd17435af17e21548d1 Mon Sep 17 00:00:00 2001 From: Will Jones Date: Thu, 16 Jan 2025 13:47:26 -0800 Subject: [PATCH 4/4] wip: refactor conflicts_with --- rust/lance/src/dataset/transaction.rs | 352 ++++++++++++------ .../src/io/commit/conflict_resolution.rs | 19 +- 2 files changed, 253 insertions(+), 118 deletions(-) diff --git a/rust/lance/src/dataset/transaction.rs b/rust/lance/src/dataset/transaction.rs index 4990fee887..f5ff865577 100644 --- a/rust/lance/src/dataset/transaction.rs +++ b/rust/lance/src/dataset/transaction.rs @@ -353,123 +353,6 @@ impl Transaction { } } - /// Returns true if the transaction cannot be committed if the other - /// transaction is committed first. - pub fn conflicts_with(&self, other: &Self) -> bool { - // This assumes IsolationLevel is Snapshot Isolation, which is more - // permissive than Serializable. In particular, it allows a Delete - // transaction to succeed after a concurrent Append, even if the Append - // added rows that would be deleted. - match &self.operation { - Operation::Append { .. } => match &other.operation { - // Append is compatible with anything that doesn't change the schema - Operation::Append { .. } => false, - Operation::Rewrite { .. } => false, - Operation::CreateIndex { .. } => false, - Operation::Delete { .. } | Operation::Update { .. } => false, - Operation::ReserveFragments { .. } => false, - Operation::Project { .. } => false, - Operation::UpdateConfig { .. } => false, - _ => true, - }, - Operation::Rewrite { .. } => match &other.operation { - // Rewrite is only compatible with operations that don't touch - // existing fragments. - // TODO: it could also be compatible with operations that update - // fragments we don't touch. - Operation::Append { .. } => false, - Operation::ReserveFragments { .. } => false, - Operation::Delete { .. } | Operation::Rewrite { .. } | Operation::Update { .. } => { - // As long as they rewrite disjoint fragments they shouldn't conflict. - self.operation.modifies_same_ids(&other.operation) - } - Operation::Project { .. } => false, - Operation::UpdateConfig { .. } => false, - _ => true, - }, - // Restore always succeeds - Operation::Restore { .. } => false, - // ReserveFragments is compatible with anything that doesn't reset the - // max fragment id. - Operation::ReserveFragments { .. } => matches!( - &other.operation, - Operation::Overwrite { .. } | Operation::Restore { .. } - ), - Operation::CreateIndex { .. } => match &other.operation { - Operation::Append { .. } => false, - // Indices are identified by UUIDs, so they shouldn't conflict. - Operation::CreateIndex { .. } => false, - // Although some of the rows we indexed may have been deleted / moved, - // row ids are still valid, so we allow this optimistically. - Operation::Delete { .. } | Operation::Update { .. } => false, - // Merge & reserve don't change row ids, so this should be fine. - Operation::Merge { .. } => false, - Operation::ReserveFragments { .. } => false, - // Rewrite likely changed many of the row ids, so our index is - // likely useless. It should be rebuilt. - // TODO: we could be smarter here and only invalidate the index - // if the rewrite changed more than X% of row ids. - Operation::Rewrite { .. } => true, - Operation::UpdateConfig { .. } => false, - _ => true, - }, - Operation::Delete { .. } | Operation::Update { .. } => match &other.operation { - Operation::CreateIndex { .. } => false, - Operation::ReserveFragments { .. } => false, - Operation::Delete { .. } | Operation::Rewrite { .. } | Operation::Update { .. } => { - // If we update the same fragments, we conflict. - self.operation.modifies_same_ids(&other.operation) - } - Operation::Project { .. } => false, - Operation::Append { .. } => false, - Operation::UpdateConfig { .. } => false, - _ => true, - }, - Operation::Overwrite { .. } => match &other.operation { - // Overwrite only conflicts with another operation modifying the same update config - Operation::Overwrite { .. } | Operation::UpdateConfig { .. } => { - self.operation.upsert_key_conflict(&other.operation) - } - _ => false, - }, - Operation::UpdateConfig { - schema_metadata, - field_metadata, - .. - } => match &other.operation { - Operation::Overwrite { .. } => { - // Updates to schema metadata or field metadata conflict with any kind - // of overwrite. - if schema_metadata.is_some() || field_metadata.is_some() { - true - } else { - self.operation.upsert_key_conflict(&other.operation) - } - } - Operation::UpdateConfig { .. } => { - self.operation.upsert_key_conflict(&other.operation) - | self.operation.modifies_same_metadata(&other.operation) - } - _ => false, - }, - // Merge changes the schema, but preserves row ids, so the only operations - // it's compatible with is CreateIndex, ReserveFragments, SetMetadata and DeleteMetadata. - Operation::Merge { .. } => !matches!( - &other.operation, - Operation::CreateIndex { .. } - | Operation::ReserveFragments { .. } - | Operation::UpdateConfig { .. } - ), - Operation::Project { .. } => match &other.operation { - // Project is compatible with anything that doesn't change the schema - Operation::CreateIndex { .. } => false, - Operation::Overwrite { .. } => false, - Operation::UpdateConfig { .. } => false, - _ => true, - }, - } - } - fn fragments_with_ids<'a, T>( new_fragments: T, fragment_id: &'a mut u64, @@ -999,6 +882,241 @@ impl Transaction { } } +struct TransactionError { + candidate: Transaction, + other: Transaction, +} + +enum ConflictError { + /// Another transaction was committed concurrently that makes this operation invalid. + /// + /// For example, if when attempting to update a table, another transaction has + /// committed an overwrite, then the update is invalid. + Irreconcilable(TransactionError), + /// Another transaction was committed concurrently that makes this transaction + /// invalid, but the operation is still valid. Retry the operation. + /// + /// For example, if when attempting to update a table, another transaction has + /// compacted the same fragments the update was modifying, this error will + /// occur. The rows being update likely existing, they just need to be + /// re-processed. + Retryable(TransactionError), + /// Another transaction was commmitted concurrently that *may* make this + /// transaction invalid. Additional checks need to be performed to determine + /// if the operation is valid. Call `resolve_conflicts`. + Potential(TransactionError), +} + +impl Transaction { + /// Returns true if the transaction cannot be committed if the other + /// transaction is committed first. + pub fn conflicts_with(&self, other: &Self) -> std::result::Result<(), ConflictError> { + // This assumes IsolationLevel is Snapshot Isolation, which is more + // permissive than Serializable. In particular, it allows a Delete + // transaction to succeed after a concurrent Append, even if the Append + // added rows that would be deleted. + match &self.operation { + Operation::Append { .. } => match &other.operation { + // Append is compatible with anything that doesn't change the schema + Operation::Append { .. } + | Operation::Rewrite { .. } + | Operation::CreateIndex { .. } + | Operation::Delete { .. } + | Operation::Update { .. } + | Operation::ReserveFragments { .. } + | Operation::Project { .. } + | Operation::UpdateConfig { .. } => Ok(()), + // Otherwise, if the schema is changed, it is irreconcilable. + // TODO: since we support inserting subschemas, we could make + // merge compatible. + _ => Err(ConflictError::Irreconcilable(TransactionError { + candidate: self.clone(), + other: other.clone(), + })), + }, + Operation::Rewrite { .. } => match &other.operation { + // Rewrite is only compatible with operations that don't touch + // existing fragments. + // TODO: it could also be compatible with operations that update + // fragments we don't touch. + Operation::Append { .. } + | Operation::ReserveFragments { .. } + | Operation::Project { .. } + | Operation::UpdateConfig { .. } => Ok(()), + Operation::Delete { .. } | Operation::Rewrite { .. } | Operation::Update { .. } => { + // As long as they rewrite disjoint fragments they shouldn't conflict. + if self.operation.modifies_same_ids(&other.operation) { + Err(ConflictError::Retryable(TransactionError { + candidate: self.clone(), + other: other.clone(), + })) + } else { + Ok(()) + } + } + _ => Err(ConflictError::Retryable(TransactionError { + candidate: self.clone(), + other: other.clone(), + })), + }, + // Restore always succeeds + Operation::Restore { .. } => Ok(()), + // ReserveFragments is compatible with anything that doesn't reset the + // max fragment id. + Operation::ReserveFragments { .. } => match &other.operation { + Operation::Overwrite { .. } => { + Err(ConflictError::Irreconcilable(TransactionError { + candidate: self.clone(), + other: other.clone(), + })) + } + Operation::Restore { .. } => Err(ConflictError::Retryable(TransactionError { + candidate: self.clone(), + other: other.clone(), + })), + _ => Ok(()), + }, + Operation::CreateIndex { new_indices, .. } => match &other.operation { + Operation::Append { .. } => Ok(()), + // Indices are identified by UUIDs, so they shouldn't conflict. + Operation::CreateIndex { + new_indices: other_new_indices, + .. + } => { + // If fragment bitmaps overlap for same index name, we conflict. + todo!() + } + // Although some of the rows we indexed may have been deleted / moved, + // row ids are still valid, so we allow this optimistically. + Operation::Delete { .. } | Operation::Update { .. } => Ok(()), + // Merge & reserve don't change row ids, so this should be fine. + Operation::Merge { .. } => Ok(()), + Operation::ReserveFragments { .. } => Ok(()), + // Rewrite likely changed many of the row ids, so our index is + // likely useless. It should be rebuilt. + // TODO: we could be smarter here and only invalidate the index + // if the rewrite changed more than X% of row ids. + Operation::Rewrite { .. } => Err(ConflictError::Retryable(TransactionError { + candidate: self.clone(), + other: other.clone(), + })), + Operation::UpdateConfig { .. } => Ok(()), + _ => Err(ConflictError::Irreconcilable(TransactionError { + candidate: self.clone(), + other: other.clone(), + })), + }, + Operation::Delete { .. } | Operation::Update { .. } => match &other.operation { + Operation::CreateIndex { .. } + | Operation::ReserveFragments { .. } + | Operation::Project { .. } + | Operation::Append { .. } + | Operation::UpdateConfig { .. } => Ok(()), + Operation::Delete { .. } | Operation::Rewrite { .. } | Operation::Update { .. } => { + // If we update the same fragments, we conflict. + if self.operation.modifies_same_ids(&other.operation) { + todo!("we need to differentiate potential and definite conflict here"); + Err(ConflictError::Potential(TransactionError { + candidate: self.clone(), + other: other.clone(), + })) + } else { + Ok(()) + } + } + _ => Err(ConflictError::Irreconcilable(TransactionError { + candidate: self.clone(), + other: other.clone(), + })), + }, + Operation::Overwrite { .. } => match &other.operation { + // Overwrite only conflicts with another operation modifying the same update config + Operation::Overwrite { .. } | Operation::UpdateConfig { .. } => { + if self.operation.upsert_key_conflict(&other.operation) { + Err(ConflictError::Irreconcilable(TransactionError { + candidate: self.clone(), + other: other.clone(), + })) + } else { + Ok(()) + } + } + _ => Ok(()), + }, + Operation::UpdateConfig { + schema_metadata, + field_metadata, + .. + } => match &other.operation { + Operation::Overwrite { .. } => { + // Updates to schema metadata or field metadata conflict with any kind + // of overwrite. + if schema_metadata.is_some() + || field_metadata.is_some() + || self.operation.upsert_key_conflict(&other.operation) + { + Err(ConflictError::Irreconcilable(TransactionError { + candidate: self.clone(), + other: other.clone(), + })) + } else { + Ok(()) + } + } + Operation::UpdateConfig { .. } => { + if self.operation.upsert_key_conflict(&other.operation) + || self.operation.modifies_same_metadata(&other.operation) + { + Err(ConflictError::Irreconcilable(TransactionError { + candidate: self.clone(), + other: other.clone(), + })) + } else { + Ok(()) + } + } + _ => Ok(()), + }, + // Merge changes the schema, but preserves row ids, so the only operations + // it's compatible with is CreateIndex, ReserveFragments, SetMetadata and DeleteMetadata. + Operation::Merge { .. } => match &other.operation { + Operation::CreateIndex { .. } + | Operation::ReserveFragments { .. } + | Operation::UpdateConfig { .. } => Ok(()), + _ => Err(ConflictError::Irreconcilable(TransactionError { + candidate: self.clone(), + other: other.clone(), + })), + }, + Operation::Project { .. } => match &other.operation { + // Project is compatible with anything that doesn't change the schema + Operation::CreateIndex { .. } + | Operation::Append { .. } + | Operation::Delete { .. } + | Operation::Update { .. } => Ok(()), + Operation::UpdateConfig { + schema_metadata, + field_metadata, + .. + } => { + if schema_metadata.is_some() || field_metadata.is_some() { + Err(ConflictError::Retryable(TransactionError { + candidate: self.clone(), + other: other.clone(), + })) + } else { + Ok(()) + } + } + _ => Err(ConflictError::Irreconcilable(TransactionError { + candidate: self.clone(), + other: other.clone(), + })), + }, + } + } +} + impl TryFrom for Transaction { type Error = Error; diff --git a/rust/lance/src/io/commit/conflict_resolution.rs b/rust/lance/src/io/commit/conflict_resolution.rs index 2cd0e7e0d3..563de17ba8 100644 --- a/rust/lance/src/io/commit/conflict_resolution.rs +++ b/rust/lance/src/io/commit/conflict_resolution.rs @@ -18,7 +18,24 @@ async fn resolve_conflicts( let original_dataset = dataset.checkout_version(transaction.read_version).await?; let old_fragments = original_dataset.fragments().as_slice(); - for version in start..=end {} + let mut possible_conflicts: Vec<(u64, Transaction)> = Vec::new(); + for version in start..=end { + // Load each transaction + + // For each transaction, there are four possible outcomes: + // 1. Irreconcilable conflict. Example: overwrite happened. + // 2. No conflict. Example: append happened. (can ignore) + // 3. Retry-able conflict. Example: upsert deleted relevant fragment + // 4. Possible conflict. Example: upsert modified relevant fragment + + // TODO: modify .conflicts_with() to differentiate these four outcomes. + } + + // If there are no conflicts, we can just return the transaction as-is. + + // Possible conflict: + // * Either needs a rewrite + // * Or becomes retry-able conflict // Maybe I should grab them in here? // TODO: return cleanup task too?