Skip to content

Commit

Permalink
[release/v1.0.0-rc3-hotfixes]: Don't create indexes during bootstrapp…
Browse files Browse the repository at this point in the history
…ing; wait until after replay (#2161)
  • Loading branch information
gefjon authored and bfops committed Feb 4, 2025
1 parent 965ec2b commit 9135d46
Show file tree
Hide file tree
Showing 10 changed files with 174 additions and 38 deletions.
5 changes: 4 additions & 1 deletion crates/commitlog/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ description = "Implementation of the SpacetimeDB commitlog."
[features]
default = ["serde"]
# Enable types + impls useful for testing
test = []
test = ["dep:env_logger"]

[dependencies]
bitflags.workspace = true
Expand All @@ -25,6 +25,9 @@ spacetimedb-sats.workspace = true
tempfile.workspace = true
thiserror.workspace = true

# For the 'test' feature
env_logger = { workspace = true, optional = true }

[dev-dependencies]
env_logger.workspace = true
once_cell.workspace = true
Expand Down
4 changes: 2 additions & 2 deletions crates/commitlog/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ pub use crate::{
pub mod error;
pub mod payload;

#[cfg(test)]
mod tests;
#[cfg(any(test, feature = "test"))]
pub mod tests;

/// [`Commitlog`] options.
#[derive(Clone, Copy, Debug)]
Expand Down
2 changes: 2 additions & 0 deletions crates/commitlog/src/tests.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
#[cfg(test)]
mod bitflip;
#[cfg(test)]
mod partial;

pub mod helpers;
14 changes: 14 additions & 0 deletions crates/commitlog/src/tests/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,20 @@ where
total_txs
}

/// Put the `txes` into `log`.
///
/// Each TX from `txes` will be placed in its own commit within `log`.
pub fn fill_log_with<R, T>(log: &mut commitlog::Generic<R, T>, txes: impl IntoIterator<Item = T>)
where
R: Repo,
T: Debug + Encode,
{
for tx in txes {
log.append(tx).unwrap();
log.commit().unwrap();
}
}

pub fn enable_logging() {
let _ = env_logger::builder()
.filter_level(log::LevelFilter::Trace)
Expand Down
1 change: 1 addition & 0 deletions crates/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ test = []
[dev-dependencies]
spacetimedb-lib = { path = "../lib", features = ["proptest"] }
spacetimedb-sats = { path = "../sats", features = ["proptest"] }
spacetimedb-commitlog = { workspace = true, features = ["test"] }

criterion.workspace = true
# Also as dev-dependencies for use in _this_ crate's tests.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -319,9 +319,8 @@ impl CommittedState {
.get_mut(&table_id)
.ok_or_else(|| TableError::IdNotFoundState(table_id))?;
let blob_store = &mut self.blob_store;
let skip_index_update = true;
table
.delete_equal_row(blob_store, rel, skip_index_update)
.delete_equal_row(blob_store, rel)
.map_err(TableError::Insert)?
.ok_or_else(|| anyhow!("Delete for non-existent row when replaying transaction"))?;
Ok(())
Expand All @@ -334,7 +333,7 @@ impl CommittedState {
row: &ProductValue,
) -> Result<()> {
let (table, blob_store) = self.get_table_and_blob_store_or_create(table_id, schema);
table.insert_for_replay(blob_store, row).map_err(TableError::Insert)?;
table.insert(blob_store, row).map_err(TableError::Insert)?;
Ok(())
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,10 @@ impl Locking {
// The database tables are now initialized with the correct data.
// Now we have to build our in memory structures.
commit_state.build_sequence_state(&mut datastore.sequence_state.lock())?;
commit_state.build_indexes()?;
// We don't want to build indexes here; we'll build those later,
// in `rebuild_state_after_replay`.
// We actively do not want indexes to exist during replay,
// as they break replaying TX 0.

log::trace!("DATABASE:BOOTSTRAPPING SYSTEM TABLES DONE");
Ok(datastore)
Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/db/datastore/system_tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -905,7 +905,7 @@ impl From<StModuleRow> for ProductValue {
/// identity | address
/// -----------------------------------------------------------------------------------------+--------------------------------------------------------
/// (__identity_bytes = 0x7452047061ea2502003412941d85a42f89b0702588b823ab55fc4f12e9ea8363) | (__address_bytes = 0x6bdea3ab517f5857dc9b1b5fe99e1b14)
#[derive(Clone, Debug, Eq, PartialEq, SpacetimeType)]
#[derive(Clone, Copy, Debug, Eq, PartialEq, SpacetimeType)]
#[sats(crate = spacetimedb_lib)]
pub struct StClientRow {
pub(crate) identity: IdentityViaU256,
Expand Down
146 changes: 142 additions & 4 deletions crates/core/src/db/relational_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1328,7 +1328,7 @@ pub mod tests_utils {

pub struct TempReplicaDir(ReplicaDir);
impl TempReplicaDir {
fn new() -> io::Result<Self> {
pub fn new() -> io::Result<Self> {
let dir = TempDir::with_prefix("stdb_test")?;
Ok(Self(ReplicaDir::from_path_unchecked(dir.into_path())))
}
Expand Down Expand Up @@ -1388,6 +1388,28 @@ pub mod tests_utils {
})
}

/// Create a [`TestDB`] which stores data in a local commitlog,
/// initialized with pre-existing data from `history`.
///
/// [`TestHistory::from_txes`] is an easy-ish way to construct a non-empty [`History`].
///
/// `expected_num_clients` is the expected size of the `connected_clients` return
/// from [`RelationalDB::open`] after replaying `history`.
/// Opening with an empty history, or one that does not insert into `st_client`,
/// should result in this number being 0.
pub fn in_memory_with_history(
history: impl durability::History<TxData = Txdata>,
expected_num_clients: usize,
) -> Result<Self, DBError> {
let dir = TempReplicaDir::new()?;
let db = Self::open_db(&dir, history, None, None, expected_num_clients)?;
Ok(Self {
db,
durable: None,
tmp_dir: dir,
})
}

/// Re-open the database, after ensuring that all data has been flushed
/// to disk (if the database was created via [`Self::durable`]).
pub fn reopen(self) -> Result<Self, DBError> {
Expand Down Expand Up @@ -1468,7 +1490,7 @@ pub mod tests_utils {
}

fn in_memory_internal(root: &ReplicaDir) -> Result<RelationalDB, DBError> {
Self::open_db(root, EmptyHistory::new(), None, None)
Self::open_db(root, EmptyHistory::new(), None, None, 0)
}

fn durable_internal(
Expand All @@ -1479,7 +1501,7 @@ pub mod tests_utils {
let history = local.clone();
let durability = local.clone() as Arc<dyn Durability<TxData = Txdata>>;
let snapshot_repo = open_snapshot_repo(root.snapshots(), Identity::ZERO, 0)?;
let db = Self::open_db(root, history, Some((durability, disk_size_fn)), Some(snapshot_repo))?;
let db = Self::open_db(root, history, Some((durability, disk_size_fn)), Some(snapshot_repo), 0)?;

Ok((db, local))
}
Expand All @@ -1489,6 +1511,7 @@ pub mod tests_utils {
history: impl durability::History<TxData = Txdata>,
durability: Option<(Arc<dyn Durability<TxData = Txdata>>, DiskSizeFn)>,
snapshot_repo: Option<Arc<SnapshotRepository>>,
expected_num_clients: usize,
) -> Result<RelationalDB, DBError> {
let (db, connected_clients) = RelationalDB::open(
root,
Expand All @@ -1498,7 +1521,7 @@ pub mod tests_utils {
durability,
snapshot_repo,
)?;
debug_assert!(connected_clients.is_empty());
assert_eq!(connected_clients.len(), expected_num_clients);
let db = db.with_row_count(Self::row_count_fn());
db.with_auto_commit(Workload::Internal, |tx| {
db.set_initialized(tx, HostType::Wasm, Program::empty())
Expand Down Expand Up @@ -1530,6 +1553,43 @@ pub mod tests_utils {
let gen_cols = row_ref.project(&gen_cols).unwrap();
Ok((gen_cols, row_ref))
}

/// An in-memory commitlog used for tests that want to replay a known history.
pub struct TestHistory(commitlog::commitlog::Generic<commitlog::repo::Memory, Txdata>);

impl durability::History for TestHistory {
type TxData = Txdata;
fn fold_transactions_from<D>(&self, offset: TxOffset, decoder: D) -> Result<(), D::Error>
where
D: commitlog::Decoder,
D::Error: From<commitlog::error::Traversal>,
{
self.0.fold_transactions_from(offset, decoder)
}
fn transactions_from<'a, D>(
&self,
offset: TxOffset,
decoder: &'a D,
) -> impl Iterator<Item = Result<commitlog::Transaction<Self::TxData>, D::Error>>
where
D: commitlog::Decoder<Record = Self::TxData>,
D::Error: From<commitlog::error::Traversal>,
Self::TxData: 'a,
{
self.0.transactions_from(offset, decoder)
}
fn max_tx_offset(&self) -> Option<TxOffset> {
self.0.max_committed_offset()
}
}

impl TestHistory {
pub fn from_txes(txes: impl IntoIterator<Item = Txdata>) -> Self {
let mut log = commitlog::tests::helpers::mem_log::<Txdata>(32);
commitlog::tests::helpers::fill_log_with(&mut log, txes);
Self(log)
}
}
}

#[cfg(test)]
Expand Down Expand Up @@ -1563,6 +1623,7 @@ mod tests {
use spacetimedb_schema::schema::RowLevelSecuritySchema;
use spacetimedb_table::read_column::ReadColumn;
use spacetimedb_table::table::RowRef;
use tests::tests_utils::TestHistory;

fn my_table(col_type: AlgebraicType) -> TableSchema {
table("MyTable", ProductType::from([("my_col", col_type)]), |builder| builder)
Expand Down Expand Up @@ -2429,4 +2490,81 @@ mod tests {
assert_eq!(reducer_timestamp, timestamp);
}
}

/// This tests that we are able to correctly replay mutations to system tables,
/// in this case specifically `st_client`.
///
/// [SpacetimeDB PR #2161](https://github.com/clockworklabs/SpacetimeDB/pull/2161)
/// fixed a bug where replaying deletes to `st_client` would fail due to an unpopulated index.
#[test]
fn replay_delete_from_st_client() {
use crate::db::datastore::system_tables::{StClientRow, ST_CLIENT_ID};

let row_0 = StClientRow {
identity: Identity::ZERO.into(),
address: Address::ZERO.into(),
};
let row_1 = StClientRow {
identity: Identity::ZERO.into(),
address: Address::from_u128(1).into(),
};

let history = TestHistory::from_txes([
// TX 0: insert row 0
Txdata {
inputs: None,
outputs: None,
mutations: Some(txdata::Mutations {
inserts: Box::new([txdata::Ops {
table_id: ST_CLIENT_ID,
rowdata: Arc::new([row_0.into()]),
}]),
deletes: Box::new([]),
truncates: Box::new([]),
}),
},
// TX 1: delete row 0
Txdata {
inputs: None,
outputs: None,
mutations: Some(txdata::Mutations {
inserts: Box::new([]),
deletes: Box::new([txdata::Ops {
table_id: ST_CLIENT_ID,
rowdata: Arc::new([row_0.into()]),
}]),
truncates: Box::new([]),
}),
},
// TX 2: insert row 1
Txdata {
inputs: None,
outputs: None,
mutations: Some(txdata::Mutations {
inserts: Box::new([txdata::Ops {
table_id: ST_CLIENT_ID,
rowdata: Arc::new([row_1.into()]),
}]),
deletes: Box::new([]),
truncates: Box::new([]),
}),
},
]);

// We expect 1 client, since we left `row_1` in there.
let stdb = TestDB::in_memory_with_history(history, /* expected_num_clients: */ 1).unwrap();

let read_tx = stdb.begin_tx(Workload::ForTests);

// Read all of st_client, assert that there's only one row, and that said row is `row_1`.
let present_rows: Vec<StClientRow> = stdb
.iter(&read_tx, ST_CLIENT_ID)
.unwrap()
.map(|row_ref| row_ref.try_into().unwrap())
.collect();
assert_eq!(present_rows.len(), 1);
assert_eq!(present_rows[0], row_1);

stdb.release_tx(read_tx);
}
}
28 changes: 2 additions & 26 deletions crates/table/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -280,26 +280,6 @@ impl Table {
Ok((hash, row_ref))
}

/// Insert a `row` into this table during replay.
///
/// NOTE: This method skips index updating. Use `insert` to insert a row with index updating.
pub fn insert_for_replay(
&mut self,
blob_store: &mut dyn BlobStore,
row: &ProductValue,
) -> Result<(Option<RowHash>, RowPointer), InsertError> {
// Insert the `row`. There should be no errors
let (row_ref, blob_bytes) = self.insert_physically_pv(blob_store, row)?;
let row_ptr = row_ref.pointer();

// SAFETY: We just inserted the row, so `self.is_row_present(row_ptr)` holds.
let row_hash = unsafe { self.insert_into_pointer_map(blob_store, row_ptr) }?;

self.update_statistics_added_row(blob_bytes);

Ok((row_hash, row_ptr))
}

/// Physically inserts `row` into the page
/// without inserting it logically into the pointer map.
///
Expand Down Expand Up @@ -871,7 +851,6 @@ impl Table {
&mut self,
blob_store: &mut dyn BlobStore,
row: &ProductValue,
skip_index_update: bool,
) -> Result<Option<RowPointer>, InsertError> {
// Insert `row` temporarily so `temp_ptr` and `hash` can be used to find the row.
// This must avoid consulting and inserting to the pointer map,
Expand All @@ -894,12 +873,9 @@ impl Table {

// If an equal row was present, delete it.
if let Some(existing_row_ptr) = existing_row_ptr {
let blob_bytes_deleted = if skip_index_update {
// SAFETY: `find_same_row` ensures that the pointer is valid.
unsafe { self.delete_internal(blob_store, existing_row_ptr) }
} else {
let blob_bytes_deleted = unsafe {
// SAFETY: `find_same_row` ensures that the pointer is valid.
unsafe { self.delete_unchecked(blob_store, existing_row_ptr) }
self.delete_unchecked(blob_store, existing_row_ptr)
};
self.update_statistics_deleted_row(blob_bytes_deleted);
}
Expand Down

0 comments on commit 9135d46

Please sign in to comment.