Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(snapshots): store BlockInfo in Resharding(CatchingUp) status #12651

Merged
merged 7 commits into from
Dec 20, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/store/src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ pub trait Database: Sync + Send {

/// If this is a test database, return a copy of the entire database.
/// Otherwise return None.
fn copy_if_test(&self) -> Option<Arc<dyn Database>> {
fn copy_if_test(&self, _columns_to_keep: Option<&[DBCol]>) -> Option<Arc<dyn Database>> {
None
}
}
Expand Down
7 changes: 6 additions & 1 deletion core/store/src/db/testdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,11 +154,16 @@ impl Database for TestDB {
Ok(())
}

fn copy_if_test(&self) -> Option<Arc<dyn Database>> {
fn copy_if_test(&self, columns_to_keep: Option<&[DBCol]>) -> Option<Arc<dyn Database>> {
let mut copy = Self::default();
{
let mut db = copy.db.write().unwrap();
for (col, map) in self.db.read().unwrap().iter() {
if let Some(keep) = columns_to_keep {
if !keep.contains(&col) {
continue;
}
}
let new_col = &mut db[col];
for (key, value) in map.iter() {
new_col.insert(key.clone(), value.clone());
Expand Down
55 changes: 28 additions & 27 deletions core/store/src/flat/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use crate::flat::{
BlockInfo, FlatStorageReadyStatus, FlatStorageReshardingStatus, FlatStorageStatus,
POISONED_LOCK_ERR,
};
use crate::{DBCol, StoreAdapter};
use crate::{DBCol, Store, StoreAdapter};
use near_primitives::block_header::BlockHeader;
use near_primitives::errors::StorageError;
use near_primitives::hash::CryptoHash;
Expand Down Expand Up @@ -40,6 +40,21 @@ pub struct FlatStorageManagerInner {
want_snapshot: Mutex<Option<BlockHeight>>,
}

fn read_block_info(store: &Store, hash: &CryptoHash) -> Result<BlockInfo, StorageError> {
let header = store
.get_ser::<BlockHeader>(DBCol::BlockHeader, hash.as_ref())
.map_err(|e| {
StorageError::StorageInconsistentState(format!(
"could not read block header {}: {:?}",
hash, e
))
})?
.ok_or_else(|| {
StorageError::StorageInconsistentState(format!("block header {} not found", hash))
})?;
Ok(BlockInfo { hash: *header.hash(), prev_hash: *header.prev_hash(), height: header.height() })
}

impl FlatStorageManager {
pub fn new(store: FlatStoreAdapter) -> Self {
Self(Arc::new(FlatStorageManagerInner {
Expand Down Expand Up @@ -99,30 +114,15 @@ impl FlatStorageManager {
Ok(())
}

fn read_block_info(&self, hash: &CryptoHash) -> Result<BlockInfo, StorageError> {
let header = self
.0
.store
.store_ref()
.get_ser::<BlockHeader>(DBCol::BlockHeader, hash.as_ref())
.map_err(|e| {
StorageError::StorageInconsistentState(format!(
"could not read block header {}: {:?}",
hash, e
))
})?
.ok_or_else(|| {
StorageError::StorageInconsistentState(format!("block header {} not found", hash))
})?;
Ok(BlockInfo {
hash: *header.hash(),
prev_hash: *header.prev_hash(),
height: header.height(),
})
}

/// Sets the status to `Ready` if it's currently `Resharding(CatchingUp)`
fn mark_flat_storage_ready(&self, shard_uid: ShardUId) -> Result<(), StorageError> {
/// At the moment, this function is only called on flat storages in snapshots,
/// so the `store` argument should be the main store that contains all data
/// so that we can look up any relevant block headers.
fn mark_flat_storage_ready(
&self,
store: &Store,
shard_uid: ShardUId,
) -> Result<(), StorageError> {
// Don't use Self::get_flat_storage_status() because there's no need to panic if this fails, since this is used
// during state snapshotting where an error isn't critical to node operation.
let status = self.0.store.get_flat_storage_status(shard_uid)?;
Expand All @@ -138,7 +138,7 @@ impl FlatStorageManager {
)))
}
};
let flat_head = self.read_block_info(&catchup_flat_head)?;
let flat_head = read_block_info(store, &catchup_flat_head)?;
let mut store_update = self.0.store.store_update();
store_update.set_flat_storage_status(
shard_uid,
Expand All @@ -154,9 +154,10 @@ impl FlatStorageManager {
// should now be considered `Ready` in the state snapshot, even if not in the main DB.
pub fn mark_ready_and_create_flat_storage(
&self,
store: &Store,
shard_uid: ShardUId,
) -> Result<(), StorageError> {
self.mark_flat_storage_ready(shard_uid)?;
self.mark_flat_storage_ready(store, shard_uid)?;
self.create_flat_storage_for_shard(shard_uid)
}

Expand Down Expand Up @@ -313,7 +314,7 @@ impl FlatStorageManager {
FlatStorageStatus::Resharding(FlatStorageReshardingStatus::CatchingUp(
catchup_flat_head,
)) => {
let flat_head = self.read_block_info(&catchup_flat_head)?;
let flat_head = read_block_info(self.0.store.store_ref(), &catchup_flat_head)?;
if let Some(Some(min_height)) = ret {
ret = Some(Some(std::cmp::min(min_height, flat_head.height)));
} else {
Expand Down
2 changes: 1 addition & 1 deletion core/store/src/opener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -602,7 +602,7 @@ pub fn checkpoint_hot_storage_and_cleanup_columns(
let _span =
tracing::info_span!(target: "state_snapshot", "checkpoint_hot_storage_and_cleanup_columns")
.entered();
if let Some(storage) = hot_store.storage.copy_if_test() {
if let Some(storage) = hot_store.storage.copy_if_test(columns_to_keep) {
return Ok(NodeStorage::new(storage));
}
let checkpoint_path = checkpoint_base_path.join("data");
Expand Down
20 changes: 13 additions & 7 deletions core/store/src/trie/state_snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use crate::config::StateSnapshotType;
use crate::flat::{FlatStorageManager, FlatStorageStatus};
use crate::Mode;
use crate::ShardTries;
use crate::Store;
use crate::StoreConfig;
use crate::{checkpoint_hot_storage_and_cleanup_columns, metrics, DBCol, NodeStorage};
use near_primitives::block::Block;
Expand Down Expand Up @@ -81,7 +82,8 @@ pub struct StateSnapshot {
impl StateSnapshot {
/// Creates an object and also creates flat storage for the given shards.
pub fn new(
store: TrieStoreAdapter,
store: &Store,
snapshot_store: TrieStoreAdapter,
prev_block_hash: CryptoHash,
flat_storage_manager: FlatStorageManager,
shard_indexes_and_uids: &[(ShardIndex, ShardUId)],
Expand All @@ -90,7 +92,9 @@ impl StateSnapshot {
tracing::debug!(target: "state_snapshot", ?shard_indexes_and_uids, ?prev_block_hash, "new StateSnapshot");
let mut included_shard_uids = vec![];
for &(shard_index, shard_uid) in shard_indexes_and_uids {
if let Err(err) = flat_storage_manager.mark_ready_and_create_flat_storage(shard_uid) {
if let Err(err) =
flat_storage_manager.mark_ready_and_create_flat_storage(store, shard_uid)
{
tracing::warn!(target: "state_snapshot", ?err, ?shard_uid, "Failed to create a flat storage for snapshot shard");
continue;
}
Expand Down Expand Up @@ -120,7 +124,7 @@ impl StateSnapshot {
}
}
}
Self { prev_block_hash, store, flat_storage_manager, included_shard_uids }
Self { prev_block_hash, store: snapshot_store, flat_storage_manager, included_shard_uids }
}

/// Returns the UIds for the shards included in the snapshot.
Expand Down Expand Up @@ -207,7 +211,7 @@ impl ShardTries {
let StateSnapshotConfig { home_dir, hot_store_path, state_snapshot_subdir, .. } =
self.state_snapshot_config();
let storage = checkpoint_hot_storage_and_cleanup_columns(
&self.store().store(),
self.store().store_ref(),
&Self::get_state_snapshot_base_dir(
&prev_block_hash,
home_dir,
Expand All @@ -218,13 +222,14 @@ impl ShardTries {
// Can't be cleaned up now because these columns are needed to `update_flat_head()`.
Some(STATE_SNAPSHOT_COLUMNS),
)?;
let store = storage.get_hot_store().trie_store();
let snapshot_store = storage.get_hot_store().trie_store();
// It is fine to create a separate FlatStorageManager, because
// it is used only for reading flat storage in the snapshot a
// doesn't introduce memory overhead.
let flat_storage_manager = FlatStorageManager::new(store.flat_store());
let flat_storage_manager = FlatStorageManager::new(snapshot_store.flat_store());
*state_snapshot_lock = Some(StateSnapshot::new(
store,
self.store().store_ref(),
snapshot_store,
prev_block_hash,
flat_storage_manager,
shard_indexes_and_uids,
Expand Down Expand Up @@ -354,6 +359,7 @@ impl ShardTries {
let shard_indexes_and_uids = get_shard_indexes_and_uids_fn(snapshot_hash)?;
let mut guard = self.state_snapshot().write().unwrap();
*guard = Some(StateSnapshot::new(
self.store().store_ref(),
store,
snapshot_hash,
flat_storage_manager,
Expand Down
Loading