From 04b15ff18f257f5b435b304dc2c4b43a58930986 Mon Sep 17 00:00:00 2001 From: Abdul Basit Date: Sat, 23 Nov 2024 00:31:24 +0500 Subject: [PATCH 1/7] separate transactions for aggregate height and upsert in update_aggregates() --- src/data_source/fetching.rs | 15 ++++++-- src/data_source/storage.rs | 12 +++++++ src/data_source/storage/fail_storage.rs | 15 ++++++-- src/data_source/storage/fs.rs | 9 +++-- src/data_source/storage/no_storage.rs | 9 +++-- src/data_source/storage/sql/queries/node.rs | 38 ++++++++++----------- 6 files changed, 69 insertions(+), 29 deletions(-) diff --git a/src/data_source/fetching.rs b/src/data_source/fetching.rs index 61deed94..c257a940 100644 --- a/src/data_source/fetching.rs +++ b/src/data_source/fetching.rs @@ -77,8 +77,9 @@ use super::{ notifier::Notifier, storage::{ pruning::{PruneStorage, PrunedHeightStorage}, - AggregatesStorage, AvailabilityStorage, ExplorerStorage, MerklizedStateHeightStorage, - MerklizedStateStorage, NodeStorage, UpdateAggregatesStorage, UpdateAvailabilityStorage, + Aggregate, AggregatesStorage, AvailabilityStorage, ExplorerStorage, + MerklizedStateHeightStorage, MerklizedStateStorage, NodeStorage, UpdateAggregatesStorage, + UpdateAvailabilityStorage, }, Transaction, VersionedDataSource, }; @@ -1401,8 +1402,16 @@ where ); loop { let res = async { + let start = chunk[0].height; + let aggregate = if start == 0 { + Aggregate::default() + } else { + let mut tx = self.read().await.context("opening transaction")?; + tx.aggregate((start - 1).try_into()?).await? + }; + let mut tx = self.write().await.context("opening transaction")?; - tx.update_aggregates(&chunk).await?; + tx.update_aggregates(aggregate, &chunk).await?; tx.commit().await.context("committing transaction") } .await; diff --git a/src/data_source/storage.rs b/src/data_source/storage.rs index 49ce6181..ebb40ba7 100644 --- a/src/data_source/storage.rs +++ b/src/data_source/storage.rs @@ -81,6 +81,7 @@ use async_trait::async_trait; use futures::future::Future; use hotshot_types::traits::node_implementation::NodeType; use jf_merkle_tree::prelude::MerkleProof; +use sqlx::prelude::FromRow; use std::ops::RangeBounds; use tagged_base64::TaggedBase64; @@ -224,9 +225,19 @@ pub trait NodeStorage { async fn sync_status(&mut self) -> QueryResult; } +#[derive(Clone, Debug, Default, FromRow)] +pub struct Aggregate { + pub height: i64, + pub num_transactions: i64, + pub payload_size: i64, +} + pub trait AggregatesStorage { /// The block height for which aggregate statistics are currently available. fn aggregates_height(&mut self) -> impl Future> + Send; + + /// The aggregate table row at a specific block height + fn aggregate(&mut self, height: i64) -> impl Future> + Send; } pub trait UpdateAggregatesStorage @@ -236,6 +247,7 @@ where /// Update aggregate statistics based on a new block. fn update_aggregates( &mut self, + aggregate: Aggregate, blocks: &[PayloadMetadata], ) -> impl Future> + Send; } diff --git a/src/data_source/storage/fail_storage.rs b/src/data_source/storage/fail_storage.rs index c52010c7..20775f18 100644 --- a/src/data_source/storage/fail_storage.rs +++ b/src/data_source/storage/fail_storage.rs @@ -14,7 +14,7 @@ use super::{ pruning::{PruneStorage, PrunedHeightStorage, PrunerCfg, PrunerConfig}, - AggregatesStorage, AvailabilityStorage, NodeStorage, UpdateAggregatesStorage, + Aggregate, AggregatesStorage, AvailabilityStorage, NodeStorage, UpdateAggregatesStorage, UpdateAvailabilityStorage, }; use crate::{ @@ -537,6 +537,11 @@ where self.maybe_fail_read(FailableAction::Any).await?; self.inner.aggregates_height().await } + + async fn aggregate(&mut self, height: i64) -> anyhow::Result { + self.maybe_fail_read(FailableAction::Any).await?; + self.inner.aggregate(height).await + } } impl UpdateAggregatesStorage for Transaction @@ -544,8 +549,12 @@ where Types: NodeType, T: UpdateAggregatesStorage + Send + Sync, { - async fn update_aggregates(&mut self, blocks: &[PayloadMetadata]) -> anyhow::Result<()> { + async fn update_aggregates( + &mut self, + aggregate: Aggregate, + blocks: &[PayloadMetadata], + ) -> anyhow::Result<()> { self.maybe_fail_write(FailableAction::Any).await?; - self.inner.update_aggregates(blocks).await + self.inner.update_aggregates(aggregate, blocks).await } } diff --git a/src/data_source/storage/fs.rs b/src/data_source/storage/fs.rs index bfb1122d..3e328d61 100644 --- a/src/data_source/storage/fs.rs +++ b/src/data_source/storage/fs.rs @@ -15,8 +15,8 @@ use super::{ ledger_log::{Iter, LedgerLog}, pruning::{PruneStorage, PrunedHeightStorage, PrunerConfig}, - AggregatesStorage, AvailabilityStorage, NodeStorage, PayloadMetadata, UpdateAggregatesStorage, - UpdateAvailabilityStorage, VidCommonMetadata, + Aggregate, AggregatesStorage, AvailabilityStorage, NodeStorage, PayloadMetadata, + UpdateAggregatesStorage, UpdateAvailabilityStorage, VidCommonMetadata, }; use crate::{ @@ -786,6 +786,10 @@ impl AggregatesStorage for Transaction { async fn aggregates_height(&mut self) -> anyhow::Result { Ok(0) } + + async fn aggregate(&mut self, _height: i64) -> anyhow::Result { + Ok(Aggregate::default()) + } } impl UpdateAggregatesStorage for Transaction @@ -794,6 +798,7 @@ where { async fn update_aggregates( &mut self, + _aggregate: Aggregate, _blocks: &[PayloadMetadata], ) -> anyhow::Result<()> { Ok(()) diff --git a/src/data_source/storage/no_storage.rs b/src/data_source/storage/no_storage.rs index 3f67055d..74a8cd10 100644 --- a/src/data_source/storage/no_storage.rs +++ b/src/data_source/storage/no_storage.rs @@ -14,8 +14,8 @@ use super::{ pruning::{PruneStorage, PrunedHeightStorage, PrunerConfig}, - AggregatesStorage, AvailabilityStorage, NodeStorage, PayloadMetadata, UpdateAggregatesStorage, - UpdateAvailabilityStorage, VidCommonMetadata, + Aggregate, AggregatesStorage, AvailabilityStorage, NodeStorage, PayloadMetadata, + UpdateAggregatesStorage, UpdateAvailabilityStorage, VidCommonMetadata, }; use crate::{ availability::{ @@ -287,6 +287,10 @@ impl<'a> AggregatesStorage for Transaction<'a> { async fn aggregates_height(&mut self) -> anyhow::Result { bail!("no_storage mock read error") } + + async fn aggregate(&mut self, _height: i64) -> anyhow::Result { + bail!("no_storage mock read error") + } } impl<'a, Types> UpdateAggregatesStorage for Transaction<'a> @@ -295,6 +299,7 @@ where { async fn update_aggregates( &mut self, + _aggregate: Aggregate, _blocks: &[PayloadMetadata], ) -> anyhow::Result<()> { Ok(()) diff --git a/src/data_source/storage/sql/queries/node.rs b/src/data_source/storage/sql/queries/node.rs index 70147e84..f38462f9 100644 --- a/src/data_source/storage/sql/queries/node.rs +++ b/src/data_source/storage/sql/queries/node.rs @@ -18,7 +18,7 @@ use super::{ }; use crate::{ data_source::storage::{ - AggregatesStorage, NodeStorage, PayloadMetadata, UpdateAggregatesStorage, + Aggregate, AggregatesStorage, NodeStorage, PayloadMetadata, UpdateAggregatesStorage, }, node::{BlockId, SyncStatus, TimeWindowQueryData, WindowStart}, types::HeightIndexed, @@ -286,28 +286,28 @@ impl AggregatesStorage for Transaction { .await?; Ok(height as usize) } + + async fn aggregate(&mut self, height: i64) -> anyhow::Result { + let aggregate = query_as("SELECT * FROM aggregate WHERE height = $1") + .bind(height) + .fetch_one(self.as_mut()) + .await?; + + Ok(aggregate) + } } impl UpdateAggregatesStorage for Transaction { - async fn update_aggregates(&mut self, blocks: &[PayloadMetadata]) -> anyhow::Result<()> { - // Get the cumulative statistics up to the block before this chunk. + async fn update_aggregates( + &mut self, + aggregate: Aggregate, + blocks: &[PayloadMetadata], + ) -> anyhow::Result<()> { let height = blocks[0].height(); - let (prev_tx_count, prev_size) = if height == 0 { - (0, 0) - } else { - let (tx_count, size): (i64, i64) = - query_as("SELECT num_transactions, payload_size FROM aggregate WHERE height = $1") - .bind((height - 1) as i64) - .fetch_one(self.as_mut()) - .await - .map_err(|err| { - anyhow::Error::new(err).context(format!( - "cannot update aggregates for block {height} because previous block is missing" - )) - })?; - (tx_count as u64, size as u64) - }; - + let (prev_tx_count, prev_size) = ( + aggregate.num_transactions as u64, + aggregate.payload_size as u64, + ); // Cumulatively sum up new statistics for each block in this chunk. let rows = blocks .iter() From 853a7227d4be062ca3aba579af45152a36ec61a6 Mon Sep 17 00:00:00 2001 From: Abdul Basit Date: Sat, 23 Nov 2024 00:42:52 +0500 Subject: [PATCH 2/7] lint --- src/data_source/storage.rs | 3 +-- src/data_source/storage/sql/queries/node.rs | 16 +++++++++++----- 2 files changed, 12 insertions(+), 7 deletions(-) diff --git a/src/data_source/storage.rs b/src/data_source/storage.rs index ebb40ba7..53388dc1 100644 --- a/src/data_source/storage.rs +++ b/src/data_source/storage.rs @@ -81,7 +81,6 @@ use async_trait::async_trait; use futures::future::Future; use hotshot_types::traits::node_implementation::NodeType; use jf_merkle_tree::prelude::MerkleProof; -use sqlx::prelude::FromRow; use std::ops::RangeBounds; use tagged_base64::TaggedBase64; @@ -225,7 +224,7 @@ pub trait NodeStorage { async fn sync_status(&mut self) -> QueryResult; } -#[derive(Clone, Debug, Default, FromRow)] +#[derive(Clone, Debug, Default)] pub struct Aggregate { pub height: i64, pub num_transactions: i64, diff --git a/src/data_source/storage/sql/queries/node.rs b/src/data_source/storage/sql/queries/node.rs index f38462f9..64412ca0 100644 --- a/src/data_source/storage/sql/queries/node.rs +++ b/src/data_source/storage/sql/queries/node.rs @@ -288,12 +288,18 @@ impl AggregatesStorage for Transaction { } async fn aggregate(&mut self, height: i64) -> anyhow::Result { - let aggregate = query_as("SELECT * FROM aggregate WHERE height = $1") - .bind(height) - .fetch_one(self.as_mut()) - .await?; + let (height, num_transactions, payload_size) = query_as( + "SELECT height, num_transactions, payload_size FROM aggregate WHERE height = $1", + ) + .bind(height) + .fetch_one(self.as_mut()) + .await?; - Ok(aggregate) + Ok(Aggregate { + height, + num_transactions, + payload_size, + }) } } From 9800dc4cf9429721c55a59e4c7e20ca87eb50823 Mon Sep 17 00:00:00 2001 From: Abdul Basit Date: Sat, 23 Nov 2024 01:46:41 +0500 Subject: [PATCH 3/7] set journal mode to wal for tmpdb --- src/data_source/storage/sql.rs | 1 + src/fetching/provider/query_service.rs | 2 -- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/src/data_source/storage/sql.rs b/src/data_source/storage/sql.rs index c38275db..1d5312d9 100644 --- a/src/data_source/storage/sql.rs +++ b/src/data_source/storage/sql.rs @@ -922,6 +922,7 @@ pub mod testing { let path = format!("sqlite:{db_path}"); sqlx::sqlite::SqliteConnectOptions::from_str(&path) .expect("invalid db path") + .journal_mode(sqlx::sqlite::SqliteJournalMode::Wal) .create_if_missing(true) .into() }; diff --git a/src/fetching/provider/query_service.rs b/src/fetching/provider/query_service.rs index 3b844542..e2135cbc 100644 --- a/src/fetching/provider/query_service.rs +++ b/src/fetching/provider/query_service.rs @@ -1135,8 +1135,6 @@ mod test { tracing::info!("retrieve from storage"); let fetch = data_source.get_leaf(1).await; assert_eq!(leaves[0], fetch.try_resolve().ok().unwrap()); - - drop(db); } #[tokio::test(flavor = "multi_thread")] From c0d83a4b88760bde1d680b3d183ad59453ac7a64 Mon Sep 17 00:00:00 2001 From: Abdul Basit Date: Sat, 23 Nov 2024 03:34:04 +0500 Subject: [PATCH 4/7] add some sleep --- src/data_source/storage/sql.rs | 2 +- src/fetching/provider/query_service.rs | 8 ++++++++ 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/src/data_source/storage/sql.rs b/src/data_source/storage/sql.rs index 1d5312d9..4a78b0a5 100644 --- a/src/data_source/storage/sql.rs +++ b/src/data_source/storage/sql.rs @@ -922,7 +922,7 @@ pub mod testing { let path = format!("sqlite:{db_path}"); sqlx::sqlite::SqliteConnectOptions::from_str(&path) .expect("invalid db path") - .journal_mode(sqlx::sqlite::SqliteJournalMode::Wal) + .locking_mode(sqlx::sqlite::SqliteLockingMode::Exclusive) .create_if_missing(true) .into() }; diff --git a/src/fetching/provider/query_service.rs b/src/fetching/provider/query_service.rs index e2135cbc..849104bb 100644 --- a/src/fetching/provider/query_service.rs +++ b/src/fetching/provider/query_service.rs @@ -1124,6 +1124,12 @@ mod test { assert_eq!(leaves[0], data_source.get_leaf(1).await.await); data_source.as_ref().pass().await; + // It is possible that the fetch above completes, notifies the subscriber, + // and the fetch below quickly subscribes and gets notified by the same loop. + // We add a delay here to avoid this situation. + // This is not a bug, as being notified after subscribing is fine. + sleep(Duration::from_secs(1)).await; + // We can get the same leaf again, this will again trigger an active fetch since storage // failed the first time. tracing::info!("fetch with write success"); @@ -1131,6 +1137,8 @@ mod test { assert!(fetch.is_pending()); assert_eq!(leaves[0], fetch.await); + sleep(Duration::from_secs(1)).await; + // Finally, we should have the leaf locally and not need to fetch it. tracing::info!("retrieve from storage"); let fetch = data_source.get_leaf(1).await; From b0ec6c1dc66fd86a99516b15e8d6cfd0abddb019 Mon Sep 17 00:00:00 2001 From: Abdul Basit Date: Mon, 25 Nov 2024 06:35:37 +0500 Subject: [PATCH 5/7] save previous aggregate from update_aggregate --- src/data_source/fetching.rs | 37 +++++++++--------- src/data_source/storage.rs | 8 ++-- src/data_source/storage/fail_storage.rs | 10 ++--- src/data_source/storage/fs.rs | 10 ++--- src/data_source/storage/no_storage.rs | 8 ++-- src/data_source/storage/sql.rs | 1 - src/data_source/storage/sql/queries/node.rs | 42 +++++++++++++-------- 7 files changed, 65 insertions(+), 51 deletions(-) diff --git a/src/data_source/fetching.rs b/src/data_source/fetching.rs index c257a940..bac7ee28 100644 --- a/src/data_source/fetching.rs +++ b/src/data_source/fetching.rs @@ -1360,24 +1360,30 @@ where #[tracing::instrument(skip_all)] async fn aggregate(self: Arc, chunk_size: usize, metrics: AggregatorMetrics) { loop { - let start = loop { + let prev_aggregate = loop { let mut tx = match self.read().await { Ok(tx) => tx, Err(err) => { - tracing::error!("unable to start aggregator: {err:#}"); + tracing::error!("unable to open read tx: {err:#}"); sleep(Duration::from_secs(5)).await; continue; } }; - match tx.aggregates_height().await { - Ok(height) => break height, + match tx.load_prev_aggregate().await { + Ok(agg) => break agg, Err(err) => { - tracing::error!("unable to load aggregator height: {err:#}"); + tracing::error!("unable to load previous aggregate: {err:#}"); sleep(Duration::from_secs(5)).await; continue; } - }; + } }; + + let (start, mut prev_aggregate) = match prev_aggregate { + Some(aggregate) => (aggregate.height as usize + 1, aggregate), + None => (0, Aggregate::default()), + }; + tracing::info!(start, "starting aggregator"); metrics.height.set(start); @@ -1402,21 +1408,18 @@ where ); loop { let res = async { - let start = chunk[0].height; - let aggregate = if start == 0 { - Aggregate::default() - } else { - let mut tx = self.read().await.context("opening transaction")?; - tx.aggregate((start - 1).try_into()?).await? - }; - let mut tx = self.write().await.context("opening transaction")?; - tx.update_aggregates(aggregate, &chunk).await?; - tx.commit().await.context("committing transaction") + let aggregate = + tx.update_aggregates(prev_aggregate.clone(), &chunk).await?; + tx.commit().await.context("committing transaction")?; + prev_aggregate = aggregate; + anyhow::Result::<_>::Ok(()) } .await; match res { - Ok(()) => break, + Ok(()) => { + break; + } Err(err) => { tracing::warn!( num_blocks, diff --git a/src/data_source/storage.rs b/src/data_source/storage.rs index 53388dc1..ff8a72ff 100644 --- a/src/data_source/storage.rs +++ b/src/data_source/storage.rs @@ -235,8 +235,10 @@ pub trait AggregatesStorage { /// The block height for which aggregate statistics are currently available. fn aggregates_height(&mut self) -> impl Future> + Send; - /// The aggregate table row at a specific block height - fn aggregate(&mut self, height: i64) -> impl Future> + Send; + /// the last aggregate + fn load_prev_aggregate( + &mut self, + ) -> impl Future>> + Send; } pub trait UpdateAggregatesStorage @@ -248,7 +250,7 @@ where &mut self, aggregate: Aggregate, blocks: &[PayloadMetadata], - ) -> impl Future> + Send; + ) -> impl Future> + Send; } /// An interface for querying Data and Statistics from the HotShot Blockchain. diff --git a/src/data_source/storage/fail_storage.rs b/src/data_source/storage/fail_storage.rs index 20775f18..7677927e 100644 --- a/src/data_source/storage/fail_storage.rs +++ b/src/data_source/storage/fail_storage.rs @@ -538,9 +538,9 @@ where self.inner.aggregates_height().await } - async fn aggregate(&mut self, height: i64) -> anyhow::Result { + async fn load_prev_aggregate(&mut self) -> anyhow::Result> { self.maybe_fail_read(FailableAction::Any).await?; - self.inner.aggregate(height).await + self.inner.load_prev_aggregate().await } } @@ -551,10 +551,10 @@ where { async fn update_aggregates( &mut self, - aggregate: Aggregate, + prev: Aggregate, blocks: &[PayloadMetadata], - ) -> anyhow::Result<()> { + ) -> anyhow::Result { self.maybe_fail_write(FailableAction::Any).await?; - self.inner.update_aggregates(aggregate, blocks).await + self.inner.update_aggregates(prev, blocks).await } } diff --git a/src/data_source/storage/fs.rs b/src/data_source/storage/fs.rs index 3e328d61..9dc1d23b 100644 --- a/src/data_source/storage/fs.rs +++ b/src/data_source/storage/fs.rs @@ -787,8 +787,8 @@ impl AggregatesStorage for Transaction { Ok(0) } - async fn aggregate(&mut self, _height: i64) -> anyhow::Result { - Ok(Aggregate::default()) + async fn load_prev_aggregate(&mut self) -> anyhow::Result> { + Ok(None) } } @@ -798,10 +798,10 @@ where { async fn update_aggregates( &mut self, - _aggregate: Aggregate, + _prev: Aggregate, _blocks: &[PayloadMetadata], - ) -> anyhow::Result<()> { - Ok(()) + ) -> anyhow::Result { + Ok(Aggregate::default()) } } diff --git a/src/data_source/storage/no_storage.rs b/src/data_source/storage/no_storage.rs index 74a8cd10..49c1064a 100644 --- a/src/data_source/storage/no_storage.rs +++ b/src/data_source/storage/no_storage.rs @@ -288,7 +288,7 @@ impl<'a> AggregatesStorage for Transaction<'a> { bail!("no_storage mock read error") } - async fn aggregate(&mut self, _height: i64) -> anyhow::Result { + async fn load_prev_aggregate(&mut self) -> anyhow::Result> { bail!("no_storage mock read error") } } @@ -299,10 +299,10 @@ where { async fn update_aggregates( &mut self, - _aggregate: Aggregate, + _prev: Aggregate, _blocks: &[PayloadMetadata], - ) -> anyhow::Result<()> { - Ok(()) + ) -> anyhow::Result { + Ok(Aggregate::default()) } } diff --git a/src/data_source/storage/sql.rs b/src/data_source/storage/sql.rs index 4a78b0a5..c38275db 100644 --- a/src/data_source/storage/sql.rs +++ b/src/data_source/storage/sql.rs @@ -922,7 +922,6 @@ pub mod testing { let path = format!("sqlite:{db_path}"); sqlx::sqlite::SqliteConnectOptions::from_str(&path) .expect("invalid db path") - .locking_mode(sqlx::sqlite::SqliteLockingMode::Exclusive) .create_if_missing(true) .into() }; diff --git a/src/data_source/storage/sql/queries/node.rs b/src/data_source/storage/sql/queries/node.rs index 64412ca0..731436d2 100644 --- a/src/data_source/storage/sql/queries/node.rs +++ b/src/data_source/storage/sql/queries/node.rs @@ -287,32 +287,33 @@ impl AggregatesStorage for Transaction { Ok(height as usize) } - async fn aggregate(&mut self, height: i64) -> anyhow::Result { - let (height, num_transactions, payload_size) = query_as( - "SELECT height, num_transactions, payload_size FROM aggregate WHERE height = $1", + async fn load_prev_aggregate(&mut self) -> anyhow::Result> { + let res : Option<(i64, i64,i64)> = query_as( + "SELECT height, num_transactions, payload_size FROM aggregate ORDER BY height DESC limit 1", ) - .bind(height) - .fetch_one(self.as_mut()) + .fetch_optional(self.as_mut()) .await?; - Ok(Aggregate { - height, - num_transactions, - payload_size, - }) + Ok( + res.map(|(height, num_transactions, payload_size)| Aggregate { + height, + num_transactions, + payload_size, + }), + ) } } impl UpdateAggregatesStorage for Transaction { async fn update_aggregates( &mut self, - aggregate: Aggregate, + prev: Aggregate, blocks: &[PayloadMetadata], - ) -> anyhow::Result<()> { + ) -> anyhow::Result { let height = blocks[0].height(); let (prev_tx_count, prev_size) = ( - aggregate.num_transactions as u64, - aggregate.payload_size as u64, + u64::try_from(prev.num_transactions)?, + u64::try_from(prev.payload_size)?, ); // Cumulatively sum up new statistics for each block in this chunk. let rows = blocks @@ -340,9 +341,18 @@ impl UpdateAggregatesStorage for Transaction { "aggregate", ["height", "num_transactions", "payload_size"], ["height"], - rows, + rows.clone(), ) - .await + .await?; + + let (height, num_transactions, payload_size) = + rows.last().ok_or_else(|| anyhow!("no row"))?; + + Ok(Aggregate { + height: *height, + num_transactions: *num_transactions, + payload_size: *payload_size, + }) } } From c85379a8abb3170b20e6c70a334b925d3c3920c7 Mon Sep 17 00:00:00 2001 From: Abdul Basit Date: Mon, 25 Nov 2024 18:47:33 +0500 Subject: [PATCH 6/7] try2 --- src/data_source/storage/sql.rs | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/src/data_source/storage/sql.rs b/src/data_source/storage/sql.rs index c38275db..d1723709 100644 --- a/src/data_source/storage/sql.rs +++ b/src/data_source/storage/sql.rs @@ -917,14 +917,7 @@ pub mod testing { pub fn config(&self) -> Config { #[cfg(feature = "embedded-db")] - let mut cfg: Config = { - let db_path = self.db_path.to_string_lossy(); - let path = format!("sqlite:{db_path}"); - sqlx::sqlite::SqliteConnectOptions::from_str(&path) - .expect("invalid db path") - .create_if_missing(true) - .into() - }; + let mut cfg = Config::default().db_path(self.db_path.clone()); #[cfg(not(feature = "embedded-db"))] let mut cfg = Config::default() From e677c5d2ca0facc22ae90a1d77d3b97202294e97 Mon Sep 17 00:00:00 2001 From: Abdul Basit Date: Tue, 26 Nov 2024 00:44:05 +0500 Subject: [PATCH 7/7] extract last row first --- src/data_source/storage/sql/queries/node.rs | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/src/data_source/storage/sql/queries/node.rs b/src/data_source/storage/sql/queries/node.rs index 731436d2..ca8874c1 100644 --- a/src/data_source/storage/sql/queries/node.rs +++ b/src/data_source/storage/sql/queries/node.rs @@ -336,22 +336,23 @@ impl UpdateAggregatesStorage for Transaction { }, ) .collect::>>()?; + let last_aggregate = rows.last().cloned(); self.upsert( "aggregate", ["height", "num_transactions", "payload_size"], ["height"], - rows.clone(), + rows, ) .await?; let (height, num_transactions, payload_size) = - rows.last().ok_or_else(|| anyhow!("no row"))?; + last_aggregate.ok_or_else(|| anyhow!("no row"))?; Ok(Aggregate { - height: *height, - num_transactions: *num_transactions, - payload_size: *payload_size, + height, + num_transactions, + payload_size, }) } }