Skip to content

Commit

Permalink
[release/v1.0.0-rc3-hotfixes]: Fix unique index + MutTxId::insert n…
Browse files Browse the repository at this point in the history
…ot un-deleteting (#2156)
  • Loading branch information
Centril authored and bfops committed Feb 4, 2025
1 parent 9135d46 commit fa16c3a
Show file tree
Hide file tree
Showing 4 changed files with 149 additions and 43 deletions.
90 changes: 90 additions & 0 deletions crates/core/src/db/datastore/locking_tx_datastore/datastore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1243,6 +1243,15 @@ mod tests {
}
}

// TODO(centril): find-replace all occurrences of body.
fn begin_mut_tx(datastore: &Locking) -> MutTxId {
datastore.begin_mut_tx(IsolationLevel::Serializable, Workload::ForTests)
}

fn commit(datastore: &Locking, tx: MutTxId) -> ResultTest<TxData> {
Ok(datastore.commit_mut_tx(tx)?.expect("commit should produce `TxData`"))
}

#[rustfmt::skip]
fn basic_table_schema_cols() -> [ColRow<'static>; 3] {
let table = FIRST_NON_SYSTEM_ID;
Expand Down Expand Up @@ -2061,6 +2070,87 @@ mod tests {
Ok(())
}

#[test]
fn test_regression_2134() -> ResultTest<()> {
// Get us a datastore and tx.
let datastore = get_datastore()?;
let mut tx = begin_mut_tx(&datastore);

// Create the table. The minimal repro is a one column table with a unique constraint.
let table_id = TableId::SENTINEL;
let table_schema = TableSchema::new(
table_id,
"Foo".into(),
vec![ColumnSchema {
table_id,
col_pos: 0.into(),
col_name: "id".into(),
col_type: AlgebraicType::I32,
}],
vec![IndexSchema {
table_id,
index_id: IndexId::SENTINEL,
index_name: "btree".into(),
index_algorithm: IndexAlgorithm::BTree(BTreeAlgorithm { columns: 0.into() }),
}],
vec![ConstraintSchema {
table_id,
constraint_id: ConstraintId::SENTINEL,
constraint_name: "constraint".into(),
data: ConstraintData::Unique(UniqueConstraintData {
columns: col_list![0].into(),
}),
}],
vec![],
StTableType::User,
StAccess::Public,
None,
None,
);
let table_id = datastore.create_table_mut_tx(&mut tx, table_schema)?;
commit(&datastore, tx)?;

// A "reducer" that deletes and then inserts the same row.
let row = &product![1];
let update = |datastore| -> ResultTest<_> {
let mut tx = begin_mut_tx(datastore);
// Delete the row.
let deleted = tx.delete_by_row_value(table_id, row)?;
// Insert it again.
insert(datastore, &mut tx, table_id, row)?;
let tx_data = commit(datastore, tx)?;
Ok((deleted, tx_data))
};

// In two separate transactions, we update a row to itself.
// What should happen is that the row is added to the committed state the first time,
// as the delete does nothing.
//
// The second time however,
// the delete should first mark the committed row as deleted in the delete tables,
// and then it should remove it from the delete tables upon insertion,
// rather than actually inserting it in the tx state.
// So the second transaction should be observationally a no-op.s
// There was a bug in the datastore that did not respect this in the presence of a unique index.
let (deleted_1, tx_data_1) = update(&datastore)?;
let (deleted_2, tx_data_2) = update(&datastore)?;

// In the first tx, the row is not deleted, but it is inserted, so we end up with the row committed.
assert_eq!(deleted_1, false);
assert_eq!(tx_data_1.deletes().count(), 0);
assert_eq!(tx_data_1.inserts().collect_vec(), [(&table_id, &[row.clone()].into())]);

// In the second tx, the row is deleted from the commit state,
// by marking it in the delete tables.
// Then, when inserting, it is un-deleted by un-marking.
// This sequence results in an empty tx-data.
assert_eq!(deleted_2, true);
assert_eq!(tx_data_2.deletes().count(), 0);
assert_eq!(tx_data_2.inserts().collect_vec(), []);

Ok(())
}

#[test]
/// Test that two read-only TXes can operate concurrently without deadlock or blocking,
/// and that both observe correct results for a simple table scan.
Expand Down
14 changes: 3 additions & 11 deletions crates/core/src/db/datastore/locking_tx_datastore/mut_tx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1287,15 +1287,9 @@ impl MutTxId {
// - `commit_table` and `tx_table` use the same schema
// because `tx_table` is derived from `commit_table`.
// - `tx_row_ptr` is correct per (PC.INS.1).
if let (_, Some(commit_ptr)) = unsafe {
Table::find_same_row_via_pointer_map(
commit_table,
tx_table,
tx_blob_store,
tx_row_ptr,
tx_row_hash,
)
} {
if let (_, Some(commit_ptr)) =
unsafe { Table::find_same_row(commit_table, tx_table, tx_blob_store, tx_row_ptr, tx_row_hash) }
{
// If `row` was already present in the committed state,
// either this is a set-semantic duplicate,
// or the row is marked as deleted, so we will undelete it
Expand Down Expand Up @@ -1464,8 +1458,6 @@ impl MutTxId {
})
});

debug_assert_ne!(to_delete, Some(ptr));

// Remove the temporary entry from the insert tables.
// Do this before actually deleting to drop the borrows on the tables.
// SAFETY: `ptr` is valid because we just inserted it and haven't deleted it since.
Expand Down
84 changes: 54 additions & 30 deletions crates/table/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -530,24 +530,31 @@ impl Table {
///
/// # Safety
///
/// `needle_table.is_row_present(needle_ptr)` must hold.
/// - `target_table` and `needle_table` must have the same `row_layout`.
/// - `needle_table.is_row_present(needle_ptr)` must hold.
unsafe fn find_same_row_via_unique_index(
target_table: &Table,
needle_table: &Table,
needle_bs: &dyn BlobStore,
needle_ptr: RowPointer,
) -> Option<RowPointer> {
// Find the smallest unique index.
let (cols, idx) = target_table
let (cols, target_index) = target_table
.indexes
.iter()
.find(|(_, idx)| idx.is_unique())
.expect("there should be at least one unique index");
// Project the needle row to the columns of the index, and then seek.
// As this is a unique index, there are 0-1 rows for this key.
let needle_row = unsafe { needle_table.get_row_ref_unchecked(needle_bs, needle_ptr) };
let key = needle_row.project(cols).expect("needle row should be valid");
idx.seek(&key).next()
let key = needle_row.project(&cols).expect("needle row should be valid");
target_index.seek(&key).next().filter(|&target_ptr| {
// SAFETY:
// - Caller promised that the row layouts were the same.
// - We know `target_ptr` exists, as it was in `target_index`, belonging to `target_table`.
// - Caller promised that `needle_ptr` is valid for `needle_table`.
unsafe { Self::eq_row_in_page(target_table, target_ptr, needle_table, needle_ptr) }
})
}

/// Insert the row identified by `ptr` into the table's [`PointerMap`],
Expand Down Expand Up @@ -645,36 +652,52 @@ impl Table {
});

// Scan all the frow pointers with `row_hash` in the `committed_table`.
let row_ptr = target_table
.pointers_for(row_hash)
.iter()
.copied()
.find(|committed_ptr| {
let (committed_page, committed_offset) = target_table.inner.page_and_offset(*committed_ptr);
let (tx_page, tx_offset) = needle_table.inner.page_and_offset(needle_ptr);

// SAFETY:
// Our invariants mean `tx_ptr` is valid, so `tx_page` and `tx_offset` are both valid.
// `committed_ptr` is in `committed_table.pointer_map`,
// so it must be valid and therefore `committed_page` and `committed_offset` are valid.
// Our invariants mean `committed_table.row_layout` applies to both tables.
// Moreover was `committed_table.inner.static_layout`
// derived from `committed_table.row_layout`.
unsafe {
eq_row_in_page(
committed_page,
tx_page,
committed_offset,
tx_offset,
&target_table.inner.row_layout,
target_table.static_layout(),
)
}
});
let row_ptr = target_table.pointers_for(row_hash).iter().copied().find(|&target_ptr| {
// SAFETY:
// - Caller promised that the row layouts were the same.
// - We know `target_ptr` exists, as it was found in a pointer map.
// - Caller promised that `needle_ptr` is valid for `needle_table`.
unsafe { Self::eq_row_in_page(target_table, target_ptr, needle_table, needle_ptr) }
});

(row_hash, row_ptr)
}

/// Returns whether the row `target_ptr` in `target_table`
/// is exactly equal to the row `needle_ptr` in `needle_ptr`.
///
/// # Safety
///
/// - `target_table` and `needle_table` must have the same `row_layout`.
/// - `target_table.is_row_present(target_ptr)`.
/// - `needle_table.is_row_present(needle_ptr)`.
unsafe fn eq_row_in_page(
target_table: &Table,
target_ptr: RowPointer,
needle_table: &Table,
needle_ptr: RowPointer,
) -> bool {
let (target_page, target_offset) = target_table.inner.page_and_offset(target_ptr);
let (needle_page, needle_offset) = needle_table.inner.page_and_offset(needle_ptr);

// SAFETY:
// - Caller promised that `target_ptr` is valid, so `target_page` and `target_offset` are both valid.
// - Caller promised that `needle_ptr` is valid, so `needle_page` and `needle_offset` are both valid.
// - Caller promised that the layouts of `target_table` and `needle_table` are the same,
// so `target_table` applies to both.
// Moreover `(x: Table).inner.static_layout` is always derived from `x.row_layout`.
unsafe {
eq_row_in_page(
target_page,
needle_page,
target_offset,
needle_offset,
&target_table.inner.row_layout,
target_table.static_layout(),
)
}
}

/// Searches `target_table` for a row equal to `needle_table[needle_ptr]`,
/// and returns the [`RowPointer`] to that row in `target_table`, if it exists.
///
Expand Down Expand Up @@ -703,6 +726,7 @@ impl Table {
} else {
(
row_hash,
// SAFETY: Caller promised that `target_table` and `needle_table` have the same `row_layout`.
// SAFETY: Caller promised that `needle_table.is_row_present(needle_ptr)`.
unsafe { Self::find_same_row_via_unique_index(target_table, needle_table, needle_bs, needle_ptr) },
)
Expand Down
4 changes: 2 additions & 2 deletions smoketests/tests/auto_migration.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class AddTableAutoMigration(Smoketest):
pub struct Book {
isbn: String,
}
#[spacetimedb::reducer]
pub fn add_book(ctx: &ReducerContext, isbn: String) {
ctx.db.book().insert(Book { isbn });
Expand Down Expand Up @@ -111,8 +111,8 @@ def test_add_table_auto_migration(self):
"""\
sql
------------------------
"SELECT * FROM book"
"SELECT * FROM person"
"SELECT * FROM book"
""",
)

Expand Down

0 comments on commit fa16c3a

Please sign in to comment.