Skip to content
This repository has been archived by the owner on Feb 5, 2025. It is now read-only.

Commit

Permalink
Merge pull request #741 from EspressoSystems/ab/split-aggregate-tx
Browse files Browse the repository at this point in the history
separate transactions when updating aggregates
  • Loading branch information
imabdulbasit authored Nov 25, 2024
2 parents 970d3c3 + e677c5d commit d17f0d6
Show file tree
Hide file tree
Showing 8 changed files with 113 additions and 53 deletions.
34 changes: 23 additions & 11 deletions src/data_source/fetching.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,9 @@ use super::{
notifier::Notifier,
storage::{
pruning::{PruneStorage, PrunedHeightStorage},
AggregatesStorage, AvailabilityStorage, ExplorerStorage, MerklizedStateHeightStorage,
MerklizedStateStorage, NodeStorage, UpdateAggregatesStorage, UpdateAvailabilityStorage,
Aggregate, AggregatesStorage, AvailabilityStorage, ExplorerStorage,
MerklizedStateHeightStorage, MerklizedStateStorage, NodeStorage, UpdateAggregatesStorage,
UpdateAvailabilityStorage,
},
Transaction, VersionedDataSource,
};
Expand Down Expand Up @@ -1359,24 +1360,30 @@ where
#[tracing::instrument(skip_all)]
async fn aggregate(self: Arc<Self>, chunk_size: usize, metrics: AggregatorMetrics) {
loop {
let start = loop {
let prev_aggregate = loop {
let mut tx = match self.read().await {
Ok(tx) => tx,
Err(err) => {
tracing::error!("unable to start aggregator: {err:#}");
tracing::error!("unable to open read tx: {err:#}");
sleep(Duration::from_secs(5)).await;
continue;
}
};
match tx.aggregates_height().await {
Ok(height) => break height,
match tx.load_prev_aggregate().await {
Ok(agg) => break agg,
Err(err) => {
tracing::error!("unable to load aggregator height: {err:#}");
tracing::error!("unable to load previous aggregate: {err:#}");
sleep(Duration::from_secs(5)).await;
continue;
}
};
}
};

let (start, mut prev_aggregate) = match prev_aggregate {
Some(aggregate) => (aggregate.height as usize + 1, aggregate),
None => (0, Aggregate::default()),
};

tracing::info!(start, "starting aggregator");
metrics.height.set(start);

Expand All @@ -1402,12 +1409,17 @@ where
loop {
let res = async {
let mut tx = self.write().await.context("opening transaction")?;
tx.update_aggregates(&chunk).await?;
tx.commit().await.context("committing transaction")
let aggregate =
tx.update_aggregates(prev_aggregate.clone(), &chunk).await?;
tx.commit().await.context("committing transaction")?;
prev_aggregate = aggregate;
anyhow::Result::<_>::Ok(())
}
.await;
match res {
Ok(()) => break,
Ok(()) => {
break;
}
Err(err) => {
tracing::warn!(
num_blocks,
Expand Down
15 changes: 14 additions & 1 deletion src/data_source/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,9 +224,21 @@ pub trait NodeStorage<Types: NodeType> {
async fn sync_status(&mut self) -> QueryResult<SyncStatus>;
}

#[derive(Clone, Debug, Default)]
pub struct Aggregate {
pub height: i64,
pub num_transactions: i64,
pub payload_size: i64,
}

pub trait AggregatesStorage {
/// The block height for which aggregate statistics are currently available.
fn aggregates_height(&mut self) -> impl Future<Output = anyhow::Result<usize>> + Send;

/// the last aggregate
fn load_prev_aggregate(
&mut self,
) -> impl Future<Output = anyhow::Result<Option<Aggregate>>> + Send;
}

pub trait UpdateAggregatesStorage<Types>
Expand All @@ -236,8 +248,9 @@ where
/// Update aggregate statistics based on a new block.
fn update_aggregates(
&mut self,
aggregate: Aggregate,
blocks: &[PayloadMetadata<Types>],
) -> impl Future<Output = anyhow::Result<()>> + Send;
) -> impl Future<Output = anyhow::Result<Aggregate>> + Send;
}

/// An interface for querying Data and Statistics from the HotShot Blockchain.
Expand Down
15 changes: 12 additions & 3 deletions src/data_source/storage/fail_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

use super::{
pruning::{PruneStorage, PrunedHeightStorage, PrunerCfg, PrunerConfig},
AggregatesStorage, AvailabilityStorage, NodeStorage, UpdateAggregatesStorage,
Aggregate, AggregatesStorage, AvailabilityStorage, NodeStorage, UpdateAggregatesStorage,
UpdateAvailabilityStorage,
};
use crate::{
Expand Down Expand Up @@ -537,15 +537,24 @@ where
self.maybe_fail_read(FailableAction::Any).await?;
self.inner.aggregates_height().await
}

async fn load_prev_aggregate(&mut self) -> anyhow::Result<Option<Aggregate>> {
self.maybe_fail_read(FailableAction::Any).await?;
self.inner.load_prev_aggregate().await
}
}

impl<T, Types> UpdateAggregatesStorage<Types> for Transaction<T>
where
Types: NodeType,
T: UpdateAggregatesStorage<Types> + Send + Sync,
{
async fn update_aggregates(&mut self, blocks: &[PayloadMetadata<Types>]) -> anyhow::Result<()> {
async fn update_aggregates(
&mut self,
prev: Aggregate,
blocks: &[PayloadMetadata<Types>],
) -> anyhow::Result<Aggregate> {
self.maybe_fail_write(FailableAction::Any).await?;
self.inner.update_aggregates(blocks).await
self.inner.update_aggregates(prev, blocks).await
}
}
13 changes: 9 additions & 4 deletions src/data_source/storage/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
use super::{
ledger_log::{Iter, LedgerLog},
pruning::{PruneStorage, PrunedHeightStorage, PrunerConfig},
AggregatesStorage, AvailabilityStorage, NodeStorage, PayloadMetadata, UpdateAggregatesStorage,
UpdateAvailabilityStorage, VidCommonMetadata,
Aggregate, AggregatesStorage, AvailabilityStorage, NodeStorage, PayloadMetadata,
UpdateAggregatesStorage, UpdateAvailabilityStorage, VidCommonMetadata,
};

use crate::{
Expand Down Expand Up @@ -786,6 +786,10 @@ impl<T: Revert + Send> AggregatesStorage for Transaction<T> {
async fn aggregates_height(&mut self) -> anyhow::Result<usize> {
Ok(0)
}

async fn load_prev_aggregate(&mut self) -> anyhow::Result<Option<Aggregate>> {
Ok(None)
}
}

impl<Types, T: Revert + Send> UpdateAggregatesStorage<Types> for Transaction<T>
Expand All @@ -794,9 +798,10 @@ where
{
async fn update_aggregates(
&mut self,
_prev: Aggregate,
_blocks: &[PayloadMetadata<Types>],
) -> anyhow::Result<()> {
Ok(())
) -> anyhow::Result<Aggregate> {
Ok(Aggregate::default())
}
}

Expand Down
13 changes: 9 additions & 4 deletions src/data_source/storage/no_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@

use super::{
pruning::{PruneStorage, PrunedHeightStorage, PrunerConfig},
AggregatesStorage, AvailabilityStorage, NodeStorage, PayloadMetadata, UpdateAggregatesStorage,
UpdateAvailabilityStorage, VidCommonMetadata,
Aggregate, AggregatesStorage, AvailabilityStorage, NodeStorage, PayloadMetadata,
UpdateAggregatesStorage, UpdateAvailabilityStorage, VidCommonMetadata,
};
use crate::{
availability::{
Expand Down Expand Up @@ -287,6 +287,10 @@ impl<'a> AggregatesStorage for Transaction<'a> {
async fn aggregates_height(&mut self) -> anyhow::Result<usize> {
bail!("no_storage mock read error")
}

async fn load_prev_aggregate(&mut self) -> anyhow::Result<Option<Aggregate>> {
bail!("no_storage mock read error")
}
}

impl<'a, Types> UpdateAggregatesStorage<Types> for Transaction<'a>
Expand All @@ -295,9 +299,10 @@ where
{
async fn update_aggregates(
&mut self,
_prev: Aggregate,
_blocks: &[PayloadMetadata<Types>],
) -> anyhow::Result<()> {
Ok(())
) -> anyhow::Result<Aggregate> {
Ok(Aggregate::default())
}
}

Expand Down
9 changes: 1 addition & 8 deletions src/data_source/storage/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -917,14 +917,7 @@ pub mod testing {

pub fn config(&self) -> Config {
#[cfg(feature = "embedded-db")]
let mut cfg: Config = {
let db_path = self.db_path.to_string_lossy();
let path = format!("sqlite:{db_path}");
sqlx::sqlite::SqliteConnectOptions::from_str(&path)
.expect("invalid db path")
.create_if_missing(true)
.into()
};
let mut cfg = Config::default().db_path(self.db_path.clone());

#[cfg(not(feature = "embedded-db"))]
let mut cfg = Config::default()
Expand Down
57 changes: 37 additions & 20 deletions src/data_source/storage/sql/queries/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use super::{
};
use crate::{
data_source::storage::{
AggregatesStorage, NodeStorage, PayloadMetadata, UpdateAggregatesStorage,
Aggregate, AggregatesStorage, NodeStorage, PayloadMetadata, UpdateAggregatesStorage,
},
node::{BlockId, SyncStatus, TimeWindowQueryData, WindowStart},
types::HeightIndexed,
Expand Down Expand Up @@ -286,28 +286,35 @@ impl<Mode: TransactionMode> AggregatesStorage for Transaction<Mode> {
.await?;
Ok(height as usize)
}

async fn load_prev_aggregate(&mut self) -> anyhow::Result<Option<Aggregate>> {
let res : Option<(i64, i64,i64)> = query_as(
"SELECT height, num_transactions, payload_size FROM aggregate ORDER BY height DESC limit 1",
)
.fetch_optional(self.as_mut())
.await?;

Ok(
res.map(|(height, num_transactions, payload_size)| Aggregate {
height,
num_transactions,
payload_size,
}),
)
}
}

impl<Types: NodeType> UpdateAggregatesStorage<Types> for Transaction<Write> {
async fn update_aggregates(&mut self, blocks: &[PayloadMetadata<Types>]) -> anyhow::Result<()> {
// Get the cumulative statistics up to the block before this chunk.
async fn update_aggregates(
&mut self,
prev: Aggregate,
blocks: &[PayloadMetadata<Types>],
) -> anyhow::Result<Aggregate> {
let height = blocks[0].height();
let (prev_tx_count, prev_size) = if height == 0 {
(0, 0)
} else {
let (tx_count, size): (i64, i64) =
query_as("SELECT num_transactions, payload_size FROM aggregate WHERE height = $1")
.bind((height - 1) as i64)
.fetch_one(self.as_mut())
.await
.map_err(|err| {
anyhow::Error::new(err).context(format!(
"cannot update aggregates for block {height} because previous block is missing"
))
})?;
(tx_count as u64, size as u64)
};

let (prev_tx_count, prev_size) = (
u64::try_from(prev.num_transactions)?,
u64::try_from(prev.payload_size)?,
);
// Cumulatively sum up new statistics for each block in this chunk.
let rows = blocks
.iter()
Expand All @@ -329,14 +336,24 @@ impl<Types: NodeType> UpdateAggregatesStorage<Types> for Transaction<Write> {
},
)
.collect::<anyhow::Result<Vec<_>>>()?;
let last_aggregate = rows.last().cloned();

self.upsert(
"aggregate",
["height", "num_transactions", "payload_size"],
["height"],
rows,
)
.await
.await?;

let (height, num_transactions, payload_size) =
last_aggregate.ok_or_else(|| anyhow!("no row"))?;

Ok(Aggregate {
height,
num_transactions,
payload_size,
})
}
}

Expand Down
10 changes: 8 additions & 2 deletions src/fetching/provider/query_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1124,19 +1124,25 @@ mod test {
assert_eq!(leaves[0], data_source.get_leaf(1).await.await);
data_source.as_ref().pass().await;

// It is possible that the fetch above completes, notifies the subscriber,
// and the fetch below quickly subscribes and gets notified by the same loop.
// We add a delay here to avoid this situation.
// This is not a bug, as being notified after subscribing is fine.
sleep(Duration::from_secs(1)).await;

// We can get the same leaf again, this will again trigger an active fetch since storage
// failed the first time.
tracing::info!("fetch with write success");
let fetch = data_source.get_leaf(1).await;
assert!(fetch.is_pending());
assert_eq!(leaves[0], fetch.await);

sleep(Duration::from_secs(1)).await;

// Finally, we should have the leaf locally and not need to fetch it.
tracing::info!("retrieve from storage");
let fetch = data_source.get_leaf(1).await;
assert_eq!(leaves[0], fetch.try_resolve().ok().unwrap());

drop(db);
}

#[tokio::test(flavor = "multi_thread")]
Expand Down

0 comments on commit d17f0d6

Please sign in to comment.