Skip to content
This repository has been archived by the owner on Feb 5, 2025. It is now read-only.

Commit

Permalink
Merge pull request #618 from EspressoSystems/jb/locking
Browse files Browse the repository at this point in the history
Fine tune locking to minimize critical sections
  • Loading branch information
jbearer authored Jun 5, 2024
2 parents 9c11647 + 710907c commit 4ffa7d0
Show file tree
Hide file tree
Showing 11 changed files with 223 additions and 197 deletions.
158 changes: 76 additions & 82 deletions src/availability.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,31 +140,28 @@ where
let timeout = options.fetch_timeout;

api.with_version("0.0.1".parse().unwrap())
.get("get_leaf", move |req, state| {
.at("get_leaf", move |req, state| {
async move {
let id = match req.opt_integer_param("height")? {
Some(height) => LeafId::Number(height),
None => LeafId::Hash(req.blob_param("hash")?),
};
state
.get_leaf(id)
.await
.with_timeout(timeout)
.await
.context(FetchLeafSnafu {
resource: id.to_string(),
})
let fetch = state.read(|state| state.get_leaf(id).boxed()).await;
fetch.with_timeout(timeout).await.context(FetchLeafSnafu {
resource: id.to_string(),
})
}
.boxed()
})?
.get("get_leaf_range", move |req, state| {
.at("get_leaf_range", move |req, state| {
async move {
let from = req.integer_param::<_, usize>("from")?;
let until = req.integer_param("until")?;

state
.get_leaf_range(from..until)
.await
let leaves = state
.read(|state| state.get_leaf_range(from..until).boxed())
.await;
leaves
.enumerate()
.then(|(index, fetch)| async move {
fetch.with_timeout(timeout).await.context(FetchLeafSnafu {
Expand All @@ -188,7 +185,7 @@ where
.try_flatten_stream()
.boxed()
})?
.get("get_header", move |req, state| {
.at("get_header", move |req, state| {
async move {
let id = if let Some(height) = req.opt_integer_param("height")? {
BlockId::Number(height)
Expand All @@ -197,9 +194,8 @@ where
} else {
BlockId::PayloadHash(req.blob_param("payload-hash")?)
};
Ok(state
.get_block(id)
.await
let fetch = state.read(|state| state.get_block(id).boxed()).await;
Ok(fetch
.with_timeout(timeout)
.await
.context(FetchBlockSnafu {
Expand All @@ -210,14 +206,15 @@ where
}
.boxed()
})?
.get("get_header_range", move |req, state| {
.at("get_header_range", move |req, state| {
async move {
let from = req.integer_param::<_, usize>("from")?;
let until = req.integer_param::<_, usize>("until")?;

state
.get_block_range(from..until)
.await
let headers = state
.read(|state| state.get_block_range(from..until).boxed())
.await;
headers
.enumerate()
.then(|(index, fetch)| async move {
fetch.with_timeout(timeout).await.context(FetchBlockSnafu {
Expand Down Expand Up @@ -248,7 +245,7 @@ where
.try_flatten_stream()
.boxed()
})?
.get("get_block", move |req, state| {
.at("get_block", move |req, state| {
async move {
let id = if let Some(height) = req.opt_integer_param("height")? {
BlockId::Number(height)
Expand All @@ -257,25 +254,22 @@ where
} else {
BlockId::PayloadHash(req.blob_param("payload-hash")?)
};
state
.get_block(id)
.await
.with_timeout(timeout)
.await
.context(FetchBlockSnafu {
resource: id.to_string(),
})
let fetch = state.read(|state| state.get_block(id).boxed()).await;
fetch.with_timeout(timeout).await.context(FetchBlockSnafu {
resource: id.to_string(),
})
}
.boxed()
})?
.get("get_block_range", move |req, state| {
.at("get_block_range", move |req, state| {
async move {
let from = req.integer_param::<_, usize>("from")?;
let until = req.integer_param("until")?;

state
.get_block_range(from..until)
.await
let blocks = state
.read(|state| state.get_block_range(from..until).boxed())
.await;
blocks
.enumerate()
.then(|(index, fetch)| async move {
fetch.with_timeout(timeout).await.context(FetchBlockSnafu {
Expand All @@ -299,7 +293,7 @@ where
.try_flatten_stream()
.boxed()
})?
.get("get_payload", move |req, state| {
.at("get_payload", move |req, state| {
async move {
let id = if let Some(height) = req.opt_integer_param("height")? {
BlockId::Number(height)
Expand All @@ -308,25 +302,22 @@ where
} else {
BlockId::Hash(req.blob_param("block-hash")?)
};
state
.get_payload(id)
.await
.with_timeout(timeout)
.await
.context(FetchBlockSnafu {
resource: id.to_string(),
})
let fetch = state.read(|state| state.get_payload(id).boxed()).await;
fetch.with_timeout(timeout).await.context(FetchBlockSnafu {
resource: id.to_string(),
})
}
.boxed()
})?
.get("get_payload_range", move |req, state| {
.at("get_payload_range", move |req, state| {
async move {
let from = req.integer_param::<_, usize>("from")?;
let until = req.integer_param("until")?;

state
.get_payload_range(from..until)
.await
let payloads = state
.read(|state| state.get_payload_range(from..until).boxed())
.await;
payloads
.enumerate()
.then(|(index, fetch)| async move {
fetch.with_timeout(timeout).await.context(FetchBlockSnafu {
Expand All @@ -350,7 +341,7 @@ where
.try_flatten_stream()
.boxed()
})?
.get("get_vid_common", move |req, state| {
.at("get_vid_common", move |req, state| {
async move {
let id = if let Some(height) = req.opt_integer_param("height")? {
BlockId::Number(height)
Expand All @@ -359,14 +350,10 @@ where
} else {
BlockId::PayloadHash(req.blob_param("payload-hash")?)
};
state
.get_vid_common(id)
.await
.with_timeout(timeout)
.await
.context(FetchBlockSnafu {
resource: id.to_string(),
})
let fetch = state.read(|state| state.get_vid_common(id).boxed()).await;
fetch.with_timeout(timeout).await.context(FetchBlockSnafu {
resource: id.to_string(),
})
}
.boxed()
})?
Expand All @@ -382,27 +369,28 @@ where
.try_flatten_stream()
.boxed()
})?
.get("get_transaction", move |req, state| {
.at("get_transaction", move |req, state| {
async move {
match req.opt_blob_param("hash")? {
Some(hash) => state
.get_transaction(hash)
.await
.with_timeout(timeout)
.await
.context(FetchTransactionSnafu {
resource: hash.to_string(),
}),
None => {
let height: u64 = req.integer_param("height")?;
let block = state
.get_block(height as usize)
.await
Some(hash) => {
let fetch = state
.read(|state| state.get_transaction(hash).boxed())
.await;
fetch
.with_timeout(timeout)
.await
.context(FetchBlockSnafu {
resource: height.to_string(),
})?;
.context(FetchTransactionSnafu {
resource: hash.to_string(),
})
}
None => {
let height: u64 = req.integer_param("height")?;
let fetch = state
.read(|state| state.get_block(height as usize).boxed())
.await;
let block = fetch.with_timeout(timeout).await.context(FetchBlockSnafu {
resource: height.to_string(),
})?;
let i: u64 = req.integer_param("index")?;
let index = block
.payload()
Expand All @@ -415,13 +403,12 @@ where
}
.boxed()
})?
.get("get_block_summary", move |req, state| {
.at("get_block_summary", move |req, state| {
async move {
let id: usize = req.integer_param("height")?;

state
.get_block(id)
.await
let fetch = state.read(|state| state.get_block(id).boxed()).await;
fetch
.with_timeout(timeout)
.await
.context(FetchBlockSnafu {
Expand All @@ -431,14 +418,15 @@ where
}
.boxed()
})?
.get("get_block_summary_range", move |req, state| {
.at("get_block_summary_range", move |req, state| {
async move {
let from: usize = req.integer_param("from")?;
let until: usize = req.integer_param("until")?;

let result: Vec<BlockSummaryQueryData<Types>> = state
.get_block_range(from..until)
.await
let blocks = state
.read(|state| state.get_block_range(from..until).boxed())
.await;
let result: Vec<BlockSummaryQueryData<Types>> = blocks
.enumerate()
.then(|(index, fetch)| async move {
fetch.with_timeout(timeout).await.context(FetchBlockSnafu {
Expand Down Expand Up @@ -762,7 +750,13 @@ mod test {
setup_test();

// Create the consensus network.
let mut network = MockNetwork::<D>::init().await;
let mut network = MockNetwork::<D>::init_with_config(|cfg| {
// Make the rate of empty block production slower than the API fetching timeout.
// Otherwise, we will produce new blocks faster than we can fetch them (particularly in
// the no-storage case, where fetching is quite slow) and the test will never finish.
cfg.builder_timeout = fetch_timeout + Duration::from_millis(500);
})
.await;
network.start().await;

// Start the web server.
Expand Down
12 changes: 6 additions & 6 deletions src/data_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -634,14 +634,14 @@ pub mod node_tests {
.collect::<Vec<_>>();

// At first, the node is fully synced.
assert!(ds.sync_status().await.unwrap().is_fully_synced());
assert!(ds.sync_status().await.await.unwrap().is_fully_synced());

// Insert a leaf without the corresponding block or VID info, make sure we detect that the
// block and VID info are missing.
ds.insert_leaf(leaves[0].clone()).await.unwrap();
ds.commit().await.unwrap();
assert_eq!(
ds.sync_status().await.unwrap(),
ds.sync_status().await.await.unwrap(),
SyncStatus {
missing_blocks: 1,
missing_vid_common: 1,
Expand All @@ -656,7 +656,7 @@ pub mod node_tests {
ds.insert_leaf(leaves[2].clone()).await.unwrap();
ds.commit().await.unwrap();
assert_eq!(
ds.sync_status().await.unwrap(),
ds.sync_status().await.await.unwrap(),
SyncStatus {
missing_blocks: 3,
missing_vid_common: 3,
Expand All @@ -670,7 +670,7 @@ pub mod node_tests {
ds.insert_vid(vid[0].0.clone(), None).await.unwrap();
ds.commit().await.unwrap();
assert_eq!(
ds.sync_status().await.unwrap(),
ds.sync_status().await.await.unwrap(),
SyncStatus {
missing_blocks: 3,
missing_vid_common: 2,
Expand Down Expand Up @@ -715,12 +715,12 @@ pub mod node_tests {
missing_vid_shares: expected_missing,
pruned_height: None,
};
assert_eq!(ds.sync_status().await.unwrap(), expected_sync_status);
assert_eq!(ds.sync_status().await.await.unwrap(), expected_sync_status);

// If we re-insert one of the VID entries without a share, it should not overwrite the share
// that we already have; that is, `insert_vid` should be monotonic.
ds.insert_vid(vid[0].0.clone(), None).await.unwrap();
assert_eq!(ds.sync_status().await.unwrap(), expected_sync_status);
assert_eq!(ds.sync_status().await.await.unwrap(), expected_sync_status);
}

#[async_std::test]
Expand Down
3 changes: 2 additions & 1 deletion src/data_source/extension.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use crate::{
Header, Payload, QueryResult, Transaction, VidShare,
};
use async_trait::async_trait;
use futures::future::BoxFuture;
use hotshot_types::traits::node_implementation::NodeType;
use jf_merkle_tree::prelude::MerkleProof;
use std::ops::RangeBounds;
Expand Down Expand Up @@ -255,7 +256,7 @@ where
{
self.data_source.vid_share(id).await
}
async fn sync_status(&self) -> QueryResult<SyncStatus> {
async fn sync_status(&self) -> BoxFuture<'static, QueryResult<SyncStatus>> {
self.data_source.sync_status().await
}
async fn get_header_window(
Expand Down
2 changes: 1 addition & 1 deletion src/data_source/fetching.rs
Original file line number Diff line number Diff line change
Expand Up @@ -739,7 +739,7 @@ where
self.storage().await.vid_share(id).await
}

async fn sync_status(&self) -> QueryResult<SyncStatus> {
async fn sync_status(&self) -> BoxFuture<'static, QueryResult<SyncStatus>> {
self.storage().await.sync_status().await
}

Expand Down
Loading

0 comments on commit 4ffa7d0

Please sign in to comment.