Skip to content

Commit

Permalink
MutTxId::update: consider deleted commited row when replacing tx row (
Browse files Browse the repository at this point in the history
#2302)

Signed-off-by: Mazdak Farrokhzad <[email protected]>
Co-authored-by: Phoebe Goldman <[email protected]>
  • Loading branch information
Centril and gefjon authored Feb 24, 2025
1 parent d92a3f5 commit 92c8e95
Show file tree
Hide file tree
Showing 2 changed files with 127 additions and 5 deletions.
80 changes: 80 additions & 0 deletions crates/core/src/db/datastore/locking_tx_datastore/datastore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1042,6 +1042,7 @@ mod tests {
use core::{fmt, mem};
use itertools::Itertools;
use pretty_assertions::{assert_eq, assert_matches};
use spacetimedb_execution::Datastore;
use spacetimedb_lib::db::auth::{StAccess, StTableType};
use spacetimedb_lib::error::ResultTest;
use spacetimedb_lib::st_var::StVarValue;
Expand Down Expand Up @@ -2283,6 +2284,85 @@ mod tests {
Ok(())
}

#[test]
fn test_update_brings_back_deleted_commit_row_repro_2296() -> ResultTest<()> {
// Get us a datastore and tx.
let datastore = get_datastore()?;
let mut tx = begin_mut_tx(&datastore);

// Create the table. The minimal repro is a two column table with a unique constraint.
let table_id = TableId::SENTINEL;
let col = |pos: usize| ColumnSchema {
table_id,
col_pos: pos.into(),
col_name: format!("c{pos}").into(),
col_type: AlgebraicType::U32,
};
let table_schema = TableSchema::new(
table_id,
"Foo".into(),
[col(0), col(1)].into(),
vec![IndexSchema {
table_id,
index_id: IndexId::SENTINEL,
index_name: "index".into(),
index_algorithm: IndexAlgorithm::BTree(BTreeAlgorithm { columns: 0.into() }),
}],
vec![ConstraintSchema {
table_id,
constraint_id: ConstraintId::SENTINEL,
constraint_name: "constraint".into(),
data: ConstraintData::Unique(UniqueConstraintData { columns: 0.into() }),
}],
vec![],
StTableType::User,
StAccess::Public,
None,
None,
);
let table_id = datastore.create_table_mut_tx(&mut tx, table_schema)?;
let index_id = datastore.index_id_from_name_mut_tx(&tx, "index")?.unwrap();
let find_row_by_key = |tx: &MutTxId, key: u32| {
tx.index_scan_point(table_id, index_id, &key.into())
.unwrap()
.map(|row| row.pointer())
.collect::<Vec<_>>()
};

// Insert `Foo { c0: 0, c1: 0 }`.
const KEY: u32 = 0;
const DATA: u32 = 0;
let row = &product![KEY, DATA];
let row_prime = &product![KEY, DATA + 1];
insert(&datastore, &mut tx, table_id, row)?;

// It's important for the test that the row is committed.
commit(&datastore, tx)?;

// Start a new transaction where we:
let mut tx = begin_mut_tx(&datastore);
// 1. delete the row.
let row_to_del = find_row_by_key(&tx, KEY);
datastore.delete_mut_tx(&mut tx, table_id, row_to_del.iter().copied());
// 2. insert a new row with the same key as the one we deleted but different extra field.
// We should now have a committed row `row` marked as deleted and a row `row_prime`.
// These share `KEY`.
insert(&datastore, &mut tx, table_id, row_prime)?;
// 3. update `row_prime` -> `row`.
// Because `row` exists in the committed state but was marked as deleted,
// it should be undeleted, without a new row being added to the tx state.
let (_, row_ref) = update(&datastore, &mut tx, table_id, index_id, row)?;
assert_eq!([row_ref.pointer()], &*row_to_del);
assert_eq!(row_to_del, find_row_by_key(&tx, KEY));

// Commit the transaction.
// We expect the transaction to be a noop.
let tx_data = commit(&datastore, tx)?;
assert_eq!(tx_data.inserts().count(), 0);
assert_eq!(tx_data.deletes().count(), 0);
Ok(())
}

#[test]
fn test_update_no_such_index() -> ResultTest<()> {
let (datastore, tx, table_id) = setup_table_with_indices([].into(), [].into())?;
Expand Down
52 changes: 47 additions & 5 deletions crates/core/src/db/datastore/locking_tx_datastore/mut_tx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1347,6 +1347,8 @@ impl MutTxId {
if let (_, Some(commit_ptr)) =
unsafe { Table::find_same_row(commit_table, tx_table, tx_blob_store, tx_row_ptr, tx_row_hash) }
{
// (insert_undelete)
// -----------------------------------------------------
// If `row` was already present in the committed state,
// either this is a set-semantic duplicate,
// or the row is marked as deleted, so we will undelete it
Expand Down Expand Up @@ -1547,6 +1549,8 @@ impl MutTxId {
// In the former case, the index must not have been removed in the transaction.
// As it's unlikely that an index was added in this transaction,
// we begin by checking the committed state.
let commit_blob_store = &self.committed_state_write_lock.blob_store;
let mut old_commit_del_ptr = None;
let err = 'failed_rev_ins: {
let tx_row_ptr = if tx_removed_index {
break 'failed_rev_ins IndexError::NotFound(index_id).into();
Expand All @@ -1561,7 +1565,13 @@ impl MutTxId {
.committed_state_write_lock
.get_index_by_id_with_table(table_id, index_id)
.and_then(|index| find_old_row(tx_row_ref, index).0.map(|ptr| (index, ptr)))
.filter(|(_, ptr)| !del_table.contains(*ptr))
.filter(|&(_, ptr)| {
// Was committed row previously deleted in this TX?
let deleted = del_table.contains(ptr);
// If so, remember it in case it was identical to the new row.
old_commit_del_ptr = deleted.then_some(ptr);
!deleted
})
{
// 1. Ensure the index is unique.
// 2. Ensure the new row doesn't violate any other committed state unique indices.
Expand All @@ -1581,7 +1591,6 @@ impl MutTxId {
if unsafe { Table::eq_row_in_page(commit_table, old_ptr, tx_table, tx_row_ptr) } {
// SAFETY: `self.is_row_present(tx_row_ptr)` holds, as noted in 3.
unsafe { tx_table.delete_internal_skip_pointer_map(tx_blob_store, tx_row_ptr) };
let commit_blob_store = &self.committed_state_write_lock.blob_store;
// SAFETY: `commit_table.is_row_present(old_ptr)` holds, as noted in 2.
let old_row_ref = unsafe { commit_table.get_row_ref_unchecked(commit_blob_store, old_ptr) };
return Ok((cols_to_gen, old_row_ref, update_flags));
Expand Down Expand Up @@ -1638,7 +1647,6 @@ impl MutTxId {
if unsafe { Table::eq_row_in_page(commit_table, old_ptr, tx_table, tx_row_ptr) } {
// SAFETY: `self.is_row_present(tx_row_ptr)` holds, as noted in 3.
unsafe { tx_table.delete_internal_skip_pointer_map(tx_blob_store, tx_row_ptr) };
let commit_blob_store = &self.committed_state_write_lock.blob_store;
// SAFETY: `commit_table.is_row_present(old_ptr)` holds, as noted in 2.
let old_row_ref =
unsafe { commit_table.get_row_ref_unchecked(commit_blob_store, old_ptr) };
Expand All @@ -1665,8 +1673,42 @@ impl MutTxId {
// SAFETY: `self.is_row_present(tx_row_ptr)` and `self.is_row_present(old_ptr)` both hold
// as we've deleted neither.
// In particular, the `write_gen_val_to_col` call does not remove the row.
unsafe { tx_table.confirm_update(tx_blob_store, tx_row_ptr, old_ptr, blob_bytes) }
.map_err(IndexError::UniqueConstraintViolation)?
let tx_row_ptr =
unsafe { tx_table.confirm_update(tx_blob_store, tx_row_ptr, old_ptr, blob_bytes) }
.map_err(IndexError::UniqueConstraintViolation)?;

if let Some(old_commit_del_ptr) = old_commit_del_ptr {
let commit_table =
commit_table.expect("previously found a row in `commit_table`, so there should be one");
// If we have an identical deleted row in the committed state,
// we need to undeleted it, just like in `Self::insert`.
// The same note (`insert_undelete`) there re. MVCC applies here as well.
//
// SAFETY:
// 1. `tx_table` is derived from `commit_table` so they have the same layouts.
// 2. `old_commit_del_ptr` was found in an index of `commit_table`.
// 3. we just inserted `tx_row_ptr` into `tx_table`, so we know it is valid.
if unsafe { Table::eq_row_in_page(commit_table, old_commit_del_ptr, tx_table, tx_row_ptr) }
{
// It is important that we `confirm_update` first,
// as we must ensure that undeleting the row causes no tx state conflict.
tx_table
.delete(tx_blob_store, tx_row_ptr, |_| ())
.expect("Failed to delete a row we just inserted");

// Undelete.
del_table.remove(old_commit_del_ptr);

// Return the undeleted committed state row.
// SAFETY: `commit_table.is_row_present(old_commit_del_ptr)` holds.
let old_row_ref = unsafe {
commit_table.get_row_ref_unchecked(commit_blob_store, old_commit_del_ptr)
};
return Ok((cols_to_gen, old_row_ref, update_flags));
}
}

tx_row_ptr
}
_ => unreachable!("Invalid SquashedOffset for RowPointer: {:?}", old_ptr),
}
Expand Down

2 comments on commit 92c8e95

@github-actions
Copy link

@github-actions github-actions bot commented on 92c8e95 Feb 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Callgrind benchmark results Error when comparing benchmarks: Couldn't find AWS credentials in environment, credentials file, or IAM role.

Caused by:
Couldn't find AWS credentials in environment, credentials file, or IAM role.

@github-actions
Copy link

@github-actions github-actions bot commented on 92c8e95 Feb 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Criterion benchmark results

Error when comparing benchmarks: Couldn't find AWS credentials in environment, credentials file, or IAM role.

Caused by:
Couldn't find AWS credentials in environment, credentials file, or IAM role.

Please sign in to comment.