Skip to content

Commit

Permalink
chore(gas_price_service): lock-free latest_l2_height
Browse files Browse the repository at this point in the history
Co-authored-by: Rafał Chabowski <[email protected]>
  • Loading branch information
rymnc and rafal-ch committed Jan 14, 2025
1 parent be1bc51 commit 91ca09e
Show file tree
Hide file tree
Showing 5 changed files with 67 additions and 59 deletions.
15 changes: 6 additions & 9 deletions crates/services/gas_price_service/src/v1/da_source_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,16 +37,13 @@ mod tests {
use fuel_core_types::fuel_types::BlockHeight;
use std::{
sync::{
atomic::AtomicU32,
Arc,
Mutex,
},
time::Duration,
};

fn latest_l2_height(height: u32) -> Arc<Mutex<BlockHeight>> {
Arc::new(Mutex::new(BlockHeight::new(height)))
}

#[tokio::test]
async fn run__when_da_block_cost_source_gives_value_shared_state_is_updated() {
// given
Expand All @@ -59,7 +56,7 @@ mod tests {
let notifier = Arc::new(tokio::sync::Notify::new());
let da_block_costs_source =
DummyDaBlockCosts::new(Ok(expected_da_cost.clone()), notifier.clone());
let latest_l2_height = Arc::new(Mutex::new(BlockHeight::new(10u32)));
let latest_l2_height = Arc::new(AtomicU32::new(10u32));
let service = new_da_service(
da_block_costs_source,
Some(Duration::from_millis(1)),
Expand All @@ -83,7 +80,7 @@ mod tests {
let notifier = Arc::new(tokio::sync::Notify::new());
let da_block_costs_source =
DummyDaBlockCosts::new(Err(anyhow::anyhow!("boo!")), notifier.clone());
let latest_l2_height = latest_l2_height(0);
let latest_l2_height = Arc::new(AtomicU32::new(0));
let service = new_da_service(
da_block_costs_source,
Some(Duration::from_millis(1)),
Expand Down Expand Up @@ -116,7 +113,7 @@ mod tests {
let notifier = Arc::new(tokio::sync::Notify::new());
let da_block_costs_source =
DummyDaBlockCosts::new(Ok(unexpected_costs.clone()), notifier.clone());
let latest_l2_height = latest_l2_height(l2_height);
let latest_l2_height = Arc::new(AtomicU32::new(l2_height));
let service = new_da_service(
da_block_costs_source,
Some(Duration::from_millis(1)),
Expand Down Expand Up @@ -147,7 +144,7 @@ mod tests {
let notifier = Arc::new(tokio::sync::Notify::new());
let da_block_costs_source =
DummyDaBlockCosts::new(Ok(unexpected_costs.clone()), notifier.clone());
let latest_l2_height = latest_l2_height(l2_height);
let latest_l2_height = Arc::new(AtomicU32::new(l2_height));
let mut service = DaSourceService::new(
da_block_costs_source,
Some(Duration::from_millis(1)),
Expand Down Expand Up @@ -178,7 +175,7 @@ mod tests {
let notifier = Arc::new(tokio::sync::Notify::new());
let da_block_costs_source =
DummyDaBlockCosts::new(Ok(unexpected_costs.clone()), notifier.clone());
let latest_l2_height = latest_l2_height(l2_height);
let latest_l2_height = Arc::new(AtomicU32::new(l2_height));
let (sender, mut receiver) =
tokio::sync::broadcast::channel(DA_BLOCK_COSTS_CHANNEL_SIZE);
let mut service = DaSourceService::new_with_sender(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ use fuel_core_services::{
};
use std::{
sync::{
atomic::{
AtomicU32,
Ordering,
},
Arc,
Mutex,
},
Expand Down Expand Up @@ -43,7 +47,7 @@ pub struct DaSourceService<Source> {
poll_interval: Interval,
source: Source,
shared_state: SharedState,
latest_l2_height: Arc<Mutex<BlockHeight>>,
latest_l2_height: Arc<AtomicU32>,
recorded_height: Option<BlockHeight>,
}

Expand All @@ -57,7 +61,7 @@ where
pub fn new(
source: Source,
poll_interval: Option<Duration>,
latest_l2_height: Arc<Mutex<BlockHeight>>,
latest_l2_height: Arc<AtomicU32>,
recorded_height: Option<BlockHeight>,
) -> Self {
let (sender, _) = tokio::sync::broadcast::channel(DA_BLOCK_COSTS_CHANNEL_SIZE);
Expand All @@ -77,7 +81,7 @@ where
pub fn new_with_sender(
source: Source,
poll_interval: Option<Duration>,
latest_l2_height: Arc<Mutex<BlockHeight>>,
latest_l2_height: Arc<AtomicU32>,
recorded_height: Option<BlockHeight>,
sender: Sender<DaBlockCosts>,
) -> Self {
Expand All @@ -103,7 +107,7 @@ where
.filter_costs_that_have_values_greater_than_l2_block_height(da_block_costs)?;
tracing::debug!(
"the latest l2 height is: {:?}",
*self.latest_l2_height.lock().unwrap()
self.latest_l2_height.load(Ordering::Acquire)
);
for da_block_costs in filtered_block_costs {
tracing::debug!("Sending block costs: {:?}", da_block_costs);
Expand All @@ -124,12 +128,9 @@ where
&self,
da_block_costs: Vec<DaBlockCosts>,
) -> Result<impl Iterator<Item = DaBlockCosts>> {
let latest_l2_height = *self
.latest_l2_height
.lock()
.map_err(|err| anyhow::anyhow!("lock error: {:?}", err))?;
let latest_l2_height = self.latest_l2_height.load(Ordering::Acquire);
let iter = da_block_costs.into_iter().filter(move |da_block_costs| {
let end = BlockHeight::from(*da_block_costs.l2_blocks.end());
let end = *da_block_costs.l2_blocks.end();
end < latest_l2_height
});
Ok(iter)
Expand Down Expand Up @@ -206,11 +207,11 @@ where
}
}

#[cfg(feature = "test-helpers")]
#[cfg(any(test, feature = "test-helpers"))]
pub fn new_da_service<S: DaBlockCostsSource>(
da_source: S,
poll_interval: Option<Duration>,
latest_l2_height: Arc<Mutex<BlockHeight>>,
latest_l2_height: Arc<AtomicU32>,
) -> ServiceRunner<DaSourceService<S>> {
ServiceRunner::new(DaSourceService::new(
da_source,
Expand Down
40 changes: 21 additions & 19 deletions crates/services/gas_price_service/src/v1/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ use futures::FutureExt;
use std::{
num::NonZeroU64,
sync::{
atomic::{
AtomicU32,
Ordering,
},
Arc,
Mutex,
},
Expand Down Expand Up @@ -112,7 +116,7 @@ where
/// Storage transaction provider for metadata and unrecorded blocks
storage_tx_provider: AtomicStorage,
/// communicates to the Da source service what the latest L2 block was
latest_l2_block: Arc<Mutex<BlockHeight>>,
latest_l2_block: Arc<AtomicU32>,
}

impl<L2, DA, StorageTxProvider> GasPriceServiceV1<L2, DA, StorageTxProvider>
Expand All @@ -131,6 +135,11 @@ where
}
}
}

#[cfg(test)]
pub fn latest_l2_block(&self) -> &AtomicU32 {
&self.latest_l2_block
}
}

impl<L2, DA, AtomicStorage> GasPriceServiceV1<L2, DA, AtomicStorage>
Expand Down Expand Up @@ -159,11 +168,7 @@ where
match block {
BlockInfo::GenesisBlock => {}
BlockInfo::Block { height, .. } => {
let mut latest_l2_block = self
.latest_l2_block
.lock()
.map_err(|err| anyhow!("Error locking latest L2 block: {:?}", err))?;
*latest_l2_block = BlockHeight::from(height);
self.latest_l2_block.store(height, Ordering::Release);
}
}
Ok(())
Expand All @@ -182,7 +187,7 @@ where
algorithm_updater: AlgorithmUpdaterV1,
da_source_adapter_handle: ServiceRunner<DaSourceService<DA>>,
storage_tx_provider: AtomicStorage,
latest_l2_block: Arc<Mutex<BlockHeight>>,
latest_l2_block: Arc<AtomicU32>,
) -> Self {
let da_source_channel = da_source_adapter_handle.shared.clone().subscribe();
Self {
Expand Down Expand Up @@ -415,6 +420,7 @@ mod tests {
use std::{
num::NonZeroU64,
sync::{
atomic::AtomicU32,
Arc,
Mutex,
},
Expand Down Expand Up @@ -526,9 +532,6 @@ mod tests {
fn database() -> StorageTransaction<InMemoryStorage<GasPriceColumn>> {
InMemoryStorage::default().into_transaction()
}
fn latest_l2_height(height: u32) -> Arc<Mutex<BlockHeight>> {
Arc::new(Mutex::new(BlockHeight::new(height)))
}

#[tokio::test]
async fn run__updates_gas_price_with_l2_block_source() {
Expand Down Expand Up @@ -577,20 +580,19 @@ mod tests {
.unwrap();

let notifier = Arc::new(tokio::sync::Notify::new());
let latest_l2_block = Arc::new(Mutex::new(BlockHeight::new(0)));
let latest_l2_height = Arc::new(AtomicU32::new(0));
let dummy_da_source = DaSourceService::new(
DummyDaBlockCosts::new(
Err(anyhow::anyhow!("unused at the moment")),
notifier.clone(),
),
None,
latest_l2_block,
Arc::clone(&latest_l2_height),
None,
);
let da_service_runner = ServiceRunner::new(dummy_da_source);
da_service_runner.start_and_await().await.unwrap();
let latest_gas_price = LatestGasPrice::new(0, 0);
let latest_l2_height = latest_l2_height(0);

let mut service = GasPriceServiceV1::new(
l2_block_source,
Expand Down Expand Up @@ -665,7 +667,7 @@ mod tests {
algo_updater.last_profit = 10_000;
algo_updater.new_scaled_da_gas_price = 10_000_000;

let latest_l2_block = latest_l2_height(block_height - 1);
let latest_l2_block = Arc::new(AtomicU32::new(block_height - 1));
let notifier = Arc::new(tokio::sync::Notify::new());
let da_source = DaSourceService::new(
DummyDaBlockCosts::new(
Expand All @@ -678,7 +680,7 @@ mod tests {
notifier.clone(),
),
Some(Duration::from_millis(1)),
latest_l2_block.clone(),
Arc::clone(&latest_l2_block),
None,
);
let mut watcher = StateWatcher::started();
Expand Down Expand Up @@ -767,7 +769,7 @@ mod tests {
algo_updater.last_profit = 10_000;
algo_updater.new_scaled_da_gas_price = 10_000_000;

let latest_l2_height = latest_l2_height(block_height - 1);
let latest_l2_height = Arc::new(AtomicU32::new(block_height - 1));
let notifier = Arc::new(tokio::sync::Notify::new());
let da_source = DaSourceService::new(
DummyDaBlockCosts::new(
Expand All @@ -780,7 +782,7 @@ mod tests {
notifier.clone(),
),
Some(Duration::from_millis(1)),
latest_l2_height.clone(),
Arc::clone(&latest_l2_height),
None,
);
let mut watcher = StateWatcher::started();
Expand Down Expand Up @@ -859,7 +861,7 @@ mod tests {

let notifier = Arc::new(tokio::sync::Notify::new());
let blob_cost_wei = 9000;
let latest_l2_height = latest_l2_height(block_height - 1);
let latest_l2_height = Arc::new(AtomicU32::new(block_height - 1));
let da_source = DaSourceService::new(
DummyDaBlockCosts::new(
Ok(DaBlockCosts {
Expand All @@ -871,7 +873,7 @@ mod tests {
notifier.clone(),
),
Some(Duration::from_millis(1)),
latest_l2_height.clone(),
Arc::clone(&latest_l2_height),
None,
);
let mut watcher = StateWatcher::started();
Expand Down
33 changes: 19 additions & 14 deletions crates/services/gas_price_service/src/v1/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,10 @@ use std::{
num::NonZeroU64,
ops::Deref,
sync::{
atomic::{
AtomicU32,
Ordering,
},
Arc,
Mutex,
},
Expand Down Expand Up @@ -358,9 +362,6 @@ fn gas_price_database_with_metadata(
tx.commit().unwrap();
db
}
fn latest_l2_height(height: u32) -> Arc<Mutex<BlockHeight>> {
Arc::new(Mutex::new(BlockHeight::new(height)))
}

#[tokio::test]
async fn next_gas_price__affected_by_new_l2_block() {
Expand All @@ -385,8 +386,9 @@ async fn next_gas_price__affected_by_new_l2_block() {
let (algo_updater, shared_algo) =
initialize_algorithm(&config, height, height, &metadata_storage).unwrap();
let da_source = FakeDABlockCost::never_returns();
let latest_l2_height = latest_l2_height(0);
let da_service_runner = new_da_service(da_source, None, latest_l2_height.clone());
let latest_l2_height = Arc::new(AtomicU32::new(0));
let da_service_runner =
new_da_service(da_source, None, Arc::clone(&latest_l2_height));
da_service_runner.start_and_await().await.unwrap();

let latest_gas_price = LatestGasPrice::new(0, 0);
Expand Down Expand Up @@ -436,8 +438,9 @@ async fn run__new_l2_block_saves_old_metadata() {
let algo_updater = updater_from_config(&config, 0);
let shared_algo = SharedV1Algorithm::new_with_algorithm(algo_updater.algorithm());
let da_source = FakeDABlockCost::never_returns();
let latest_l2_height = latest_l2_height(0);
let da_service_runner = new_da_service(da_source, None, latest_l2_height.clone());
let latest_l2_height = Arc::new(AtomicU32::new(0));
let da_service_runner =
new_da_service(da_source, None, Arc::clone(&latest_l2_height));
da_service_runner.start_and_await().await.unwrap();
let latest_gas_price = LatestGasPrice::new(0, 0);
let mut service = GasPriceServiceV1::new(
Expand Down Expand Up @@ -490,8 +493,9 @@ async fn run__new_l2_block_updates_latest_gas_price_arc() {
let algo_updater = updater_from_config(&config, 0);
let shared_algo = SharedV1Algorithm::new_with_algorithm(algo_updater.algorithm());
let da_source = FakeDABlockCost::never_returns();
let latest_l2_height = latest_l2_height(0);
let da_service_runner = new_da_service(da_source, None, latest_l2_height.clone());
let latest_l2_height = Arc::new(AtomicU32::new(0));
let da_service_runner =
new_da_service(da_source, None, Arc::clone(&latest_l2_height));
let latest_gas_price = LatestGasPrice::new(0, 0);
let mut service = GasPriceServiceV1::new(
l2_block_source,
Expand Down Expand Up @@ -540,9 +544,10 @@ async fn run__updates_da_service_latest_l2_height() {
algo_updater.l2_block_height = l2_height - 1;
let shared_algo = SharedV1Algorithm::new_with_algorithm(algo_updater.algorithm());
let da_source = FakeDABlockCost::never_returns();
let latest_l2_height = latest_l2_height(0);
let latest_l2_height = Arc::new(AtomicU32::new(0));
let latest_gas_price = LatestGasPrice::new(0, 0);
let da_service_runner = new_da_service(da_source, None, latest_l2_height.clone());
let da_service_runner =
new_da_service(da_source, None, Arc::clone(&latest_l2_height));
da_service_runner.start_and_await().await.unwrap();
let mut service = GasPriceServiceV1::new(
l2_block_source,
Expand All @@ -551,7 +556,7 @@ async fn run__updates_da_service_latest_l2_height() {
algo_updater,
da_service_runner,
inner,
latest_l2_height.clone(),
latest_l2_height,
);
let mut watcher = StateWatcher::started();

Expand All @@ -560,8 +565,8 @@ async fn run__updates_da_service_latest_l2_height() {
let _ = service.run(&mut watcher).await;

// then
let latest_value = *latest_l2_height.lock().unwrap();
assert_eq!(*latest_value, l2_height);
let latest_value = service.latest_l2_block().load(Ordering::SeqCst);
assert_eq!(latest_value, l2_height);
}

#[derive(Clone)]
Expand Down
Loading

0 comments on commit 91ca09e

Please sign in to comment.