Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Lock-free latest_l2_height in gas price service #2546

Merged
merged 1 commit into from
Jan 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 6 additions & 9 deletions crates/services/gas_price_service/src/v1/da_source_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,16 +37,13 @@ mod tests {
use fuel_core_types::fuel_types::BlockHeight;
use std::{
sync::{
atomic::AtomicU32,
Arc,
Mutex,
},
time::Duration,
};

fn latest_l2_height(height: u32) -> Arc<Mutex<BlockHeight>> {
Arc::new(Mutex::new(BlockHeight::new(height)))
}

#[tokio::test]
async fn run__when_da_block_cost_source_gives_value_shared_state_is_updated() {
// given
Expand All @@ -59,7 +56,7 @@ mod tests {
let notifier = Arc::new(tokio::sync::Notify::new());
let da_block_costs_source =
DummyDaBlockCosts::new(Ok(expected_da_cost.clone()), notifier.clone());
let latest_l2_height = Arc::new(Mutex::new(BlockHeight::new(10u32)));
let latest_l2_height = Arc::new(AtomicU32::new(10u32));
let service = new_da_service(
da_block_costs_source,
Some(Duration::from_millis(1)),
Expand All @@ -83,7 +80,7 @@ mod tests {
let notifier = Arc::new(tokio::sync::Notify::new());
let da_block_costs_source =
DummyDaBlockCosts::new(Err(anyhow::anyhow!("boo!")), notifier.clone());
let latest_l2_height = latest_l2_height(0);
let latest_l2_height = Arc::new(AtomicU32::new(0));
let service = new_da_service(
da_block_costs_source,
Some(Duration::from_millis(1)),
Expand Down Expand Up @@ -116,7 +113,7 @@ mod tests {
let notifier = Arc::new(tokio::sync::Notify::new());
let da_block_costs_source =
DummyDaBlockCosts::new(Ok(unexpected_costs.clone()), notifier.clone());
let latest_l2_height = latest_l2_height(l2_height);
let latest_l2_height = Arc::new(AtomicU32::new(l2_height));
let service = new_da_service(
da_block_costs_source,
Some(Duration::from_millis(1)),
Expand Down Expand Up @@ -147,7 +144,7 @@ mod tests {
let notifier = Arc::new(tokio::sync::Notify::new());
let da_block_costs_source =
DummyDaBlockCosts::new(Ok(unexpected_costs.clone()), notifier.clone());
let latest_l2_height = latest_l2_height(l2_height);
let latest_l2_height = Arc::new(AtomicU32::new(l2_height));
let mut service = DaSourceService::new(
da_block_costs_source,
Some(Duration::from_millis(1)),
Expand Down Expand Up @@ -178,7 +175,7 @@ mod tests {
let notifier = Arc::new(tokio::sync::Notify::new());
let da_block_costs_source =
DummyDaBlockCosts::new(Ok(unexpected_costs.clone()), notifier.clone());
let latest_l2_height = latest_l2_height(l2_height);
let latest_l2_height = Arc::new(AtomicU32::new(l2_height));
let (sender, mut receiver) =
tokio::sync::broadcast::channel(DA_BLOCK_COSTS_CHANNEL_SIZE);
let mut service = DaSourceService::new_with_sender(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ use fuel_core_services::{
};
use std::{
sync::{
atomic::{
AtomicU32,
Ordering,
},
Arc,
Mutex,
},
Expand Down Expand Up @@ -43,7 +47,7 @@ pub struct DaSourceService<Source> {
poll_interval: Interval,
source: Source,
shared_state: SharedState,
latest_l2_height: Arc<Mutex<BlockHeight>>,
latest_l2_height: Arc<AtomicU32>,
recorded_height: Option<BlockHeight>,
}

Expand All @@ -57,7 +61,7 @@ where
pub fn new(
source: Source,
poll_interval: Option<Duration>,
latest_l2_height: Arc<Mutex<BlockHeight>>,
latest_l2_height: Arc<AtomicU32>,
recorded_height: Option<BlockHeight>,
) -> Self {
let (sender, _) = tokio::sync::broadcast::channel(DA_BLOCK_COSTS_CHANNEL_SIZE);
Expand All @@ -77,7 +81,7 @@ where
pub fn new_with_sender(
source: Source,
poll_interval: Option<Duration>,
latest_l2_height: Arc<Mutex<BlockHeight>>,
latest_l2_height: Arc<AtomicU32>,
recorded_height: Option<BlockHeight>,
sender: Sender<DaBlockCosts>,
) -> Self {
Expand All @@ -103,7 +107,7 @@ where
.filter_costs_that_have_values_greater_than_l2_block_height(da_block_costs)?;
tracing::debug!(
"the latest l2 height is: {:?}",
*self.latest_l2_height.lock().unwrap()
self.latest_l2_height.load(Ordering::Acquire)
);
for da_block_costs in filtered_block_costs {
tracing::debug!("Sending block costs: {:?}", da_block_costs);
Expand All @@ -124,12 +128,9 @@ where
&self,
da_block_costs: Vec<DaBlockCosts>,
) -> Result<impl Iterator<Item = DaBlockCosts>> {
let latest_l2_height = *self
.latest_l2_height
.lock()
.map_err(|err| anyhow::anyhow!("lock error: {:?}", err))?;
let latest_l2_height = self.latest_l2_height.load(Ordering::Acquire);
let iter = da_block_costs.into_iter().filter(move |da_block_costs| {
let end = BlockHeight::from(*da_block_costs.l2_blocks.end());
let end = *da_block_costs.l2_blocks.end();
end < latest_l2_height
});
Ok(iter)
Expand Down Expand Up @@ -206,11 +207,11 @@ where
}
}

#[cfg(feature = "test-helpers")]
#[cfg(any(test, feature = "test-helpers"))]
pub fn new_da_service<S: DaBlockCostsSource>(
da_source: S,
poll_interval: Option<Duration>,
latest_l2_height: Arc<Mutex<BlockHeight>>,
latest_l2_height: Arc<AtomicU32>,
) -> ServiceRunner<DaSourceService<S>> {
ServiceRunner::new(DaSourceService::new(
da_source,
Expand Down
40 changes: 21 additions & 19 deletions crates/services/gas_price_service/src/v1/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ use futures::FutureExt;
use std::{
num::NonZeroU64,
sync::{
atomic::{
AtomicU32,
Ordering,
},
Arc,
Mutex,
},
Expand Down Expand Up @@ -112,7 +116,7 @@ where
/// Storage transaction provider for metadata and unrecorded blocks
storage_tx_provider: AtomicStorage,
/// communicates to the Da source service what the latest L2 block was
latest_l2_block: Arc<Mutex<BlockHeight>>,
latest_l2_block: Arc<AtomicU32>,
}

impl<L2, DA, StorageTxProvider> GasPriceServiceV1<L2, DA, StorageTxProvider>
Expand All @@ -131,6 +135,11 @@ where
}
}
}

#[cfg(test)]
pub fn latest_l2_block(&self) -> &AtomicU32 {
&self.latest_l2_block
}
}

impl<L2, DA, AtomicStorage> GasPriceServiceV1<L2, DA, AtomicStorage>
Expand Down Expand Up @@ -159,11 +168,7 @@ where
match block {
BlockInfo::GenesisBlock => {}
BlockInfo::Block { height, .. } => {
let mut latest_l2_block = self
.latest_l2_block
.lock()
.map_err(|err| anyhow!("Error locking latest L2 block: {:?}", err))?;
*latest_l2_block = BlockHeight::from(height);
self.latest_l2_block.store(height, Ordering::Release);
}
}
Ok(())
Expand All @@ -182,7 +187,7 @@ where
algorithm_updater: AlgorithmUpdaterV1,
da_source_adapter_handle: ServiceRunner<DaSourceService<DA>>,
storage_tx_provider: AtomicStorage,
latest_l2_block: Arc<Mutex<BlockHeight>>,
latest_l2_block: Arc<AtomicU32>,
) -> Self {
let da_source_channel = da_source_adapter_handle.shared.clone().subscribe();
Self {
Expand Down Expand Up @@ -415,6 +420,7 @@ mod tests {
use std::{
num::NonZeroU64,
sync::{
atomic::AtomicU32,
Arc,
Mutex,
},
Expand Down Expand Up @@ -526,9 +532,6 @@ mod tests {
fn database() -> StorageTransaction<InMemoryStorage<GasPriceColumn>> {
InMemoryStorage::default().into_transaction()
}
fn latest_l2_height(height: u32) -> Arc<Mutex<BlockHeight>> {
Arc::new(Mutex::new(BlockHeight::new(height)))
}

#[tokio::test]
async fn run__updates_gas_price_with_l2_block_source() {
Expand Down Expand Up @@ -577,20 +580,19 @@ mod tests {
.unwrap();

let notifier = Arc::new(tokio::sync::Notify::new());
let latest_l2_block = Arc::new(Mutex::new(BlockHeight::new(0)));
let latest_l2_height = Arc::new(AtomicU32::new(0));
let dummy_da_source = DaSourceService::new(
DummyDaBlockCosts::new(
Err(anyhow::anyhow!("unused at the moment")),
notifier.clone(),
),
None,
latest_l2_block,
Arc::clone(&latest_l2_height),
None,
);
let da_service_runner = ServiceRunner::new(dummy_da_source);
da_service_runner.start_and_await().await.unwrap();
let latest_gas_price = LatestGasPrice::new(0, 0);
let latest_l2_height = latest_l2_height(0);

let mut service = GasPriceServiceV1::new(
l2_block_source,
Expand Down Expand Up @@ -665,7 +667,7 @@ mod tests {
algo_updater.last_profit = 10_000;
algo_updater.new_scaled_da_gas_price = 10_000_000;

let latest_l2_block = latest_l2_height(block_height - 1);
let latest_l2_block = Arc::new(AtomicU32::new(block_height - 1));
let notifier = Arc::new(tokio::sync::Notify::new());
let da_source = DaSourceService::new(
DummyDaBlockCosts::new(
Expand All @@ -678,7 +680,7 @@ mod tests {
notifier.clone(),
),
Some(Duration::from_millis(1)),
latest_l2_block.clone(),
Arc::clone(&latest_l2_block),
None,
);
let mut watcher = StateWatcher::started();
Expand Down Expand Up @@ -767,7 +769,7 @@ mod tests {
algo_updater.last_profit = 10_000;
algo_updater.new_scaled_da_gas_price = 10_000_000;

let latest_l2_height = latest_l2_height(block_height - 1);
let latest_l2_height = Arc::new(AtomicU32::new(block_height - 1));
let notifier = Arc::new(tokio::sync::Notify::new());
let da_source = DaSourceService::new(
DummyDaBlockCosts::new(
Expand All @@ -780,7 +782,7 @@ mod tests {
notifier.clone(),
),
Some(Duration::from_millis(1)),
latest_l2_height.clone(),
Arc::clone(&latest_l2_height),
None,
);
let mut watcher = StateWatcher::started();
Expand Down Expand Up @@ -859,7 +861,7 @@ mod tests {

let notifier = Arc::new(tokio::sync::Notify::new());
let blob_cost_wei = 9000;
let latest_l2_height = latest_l2_height(block_height - 1);
let latest_l2_height = Arc::new(AtomicU32::new(block_height - 1));
let da_source = DaSourceService::new(
DummyDaBlockCosts::new(
Ok(DaBlockCosts {
Expand All @@ -871,7 +873,7 @@ mod tests {
notifier.clone(),
),
Some(Duration::from_millis(1)),
latest_l2_height.clone(),
Arc::clone(&latest_l2_height),
None,
);
let mut watcher = StateWatcher::started();
Expand Down
33 changes: 19 additions & 14 deletions crates/services/gas_price_service/src/v1/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,10 @@ use std::{
num::NonZeroU64,
ops::Deref,
sync::{
atomic::{
AtomicU32,
Ordering,
},
Arc,
Mutex,
},
Expand Down Expand Up @@ -358,9 +362,6 @@ fn gas_price_database_with_metadata(
tx.commit().unwrap();
db
}
fn latest_l2_height(height: u32) -> Arc<Mutex<BlockHeight>> {
Arc::new(Mutex::new(BlockHeight::new(height)))
}

#[tokio::test]
async fn next_gas_price__affected_by_new_l2_block() {
Expand All @@ -385,8 +386,9 @@ async fn next_gas_price__affected_by_new_l2_block() {
let (algo_updater, shared_algo) =
initialize_algorithm(&config, height, height, &metadata_storage).unwrap();
let da_source = FakeDABlockCost::never_returns();
let latest_l2_height = latest_l2_height(0);
let da_service_runner = new_da_service(da_source, None, latest_l2_height.clone());
let latest_l2_height = Arc::new(AtomicU32::new(0));
let da_service_runner =
new_da_service(da_source, None, Arc::clone(&latest_l2_height));
da_service_runner.start_and_await().await.unwrap();

let latest_gas_price = LatestGasPrice::new(0, 0);
Expand Down Expand Up @@ -436,8 +438,9 @@ async fn run__new_l2_block_saves_old_metadata() {
let algo_updater = updater_from_config(&config, 0);
let shared_algo = SharedV1Algorithm::new_with_algorithm(algo_updater.algorithm());
let da_source = FakeDABlockCost::never_returns();
let latest_l2_height = latest_l2_height(0);
let da_service_runner = new_da_service(da_source, None, latest_l2_height.clone());
let latest_l2_height = Arc::new(AtomicU32::new(0));
let da_service_runner =
new_da_service(da_source, None, Arc::clone(&latest_l2_height));
da_service_runner.start_and_await().await.unwrap();
let latest_gas_price = LatestGasPrice::new(0, 0);
let mut service = GasPriceServiceV1::new(
Expand Down Expand Up @@ -490,8 +493,9 @@ async fn run__new_l2_block_updates_latest_gas_price_arc() {
let algo_updater = updater_from_config(&config, 0);
let shared_algo = SharedV1Algorithm::new_with_algorithm(algo_updater.algorithm());
let da_source = FakeDABlockCost::never_returns();
let latest_l2_height = latest_l2_height(0);
let da_service_runner = new_da_service(da_source, None, latest_l2_height.clone());
let latest_l2_height = Arc::new(AtomicU32::new(0));
let da_service_runner =
new_da_service(da_source, None, Arc::clone(&latest_l2_height));
let latest_gas_price = LatestGasPrice::new(0, 0);
let mut service = GasPriceServiceV1::new(
l2_block_source,
Expand Down Expand Up @@ -540,9 +544,10 @@ async fn run__updates_da_service_latest_l2_height() {
algo_updater.l2_block_height = l2_height - 1;
let shared_algo = SharedV1Algorithm::new_with_algorithm(algo_updater.algorithm());
let da_source = FakeDABlockCost::never_returns();
let latest_l2_height = latest_l2_height(0);
let latest_l2_height = Arc::new(AtomicU32::new(0));
let latest_gas_price = LatestGasPrice::new(0, 0);
let da_service_runner = new_da_service(da_source, None, latest_l2_height.clone());
let da_service_runner =
new_da_service(da_source, None, Arc::clone(&latest_l2_height));
da_service_runner.start_and_await().await.unwrap();
let mut service = GasPriceServiceV1::new(
l2_block_source,
Expand All @@ -551,7 +556,7 @@ async fn run__updates_da_service_latest_l2_height() {
algo_updater,
da_service_runner,
inner,
latest_l2_height.clone(),
latest_l2_height,
);
let mut watcher = StateWatcher::started();

Expand All @@ -560,8 +565,8 @@ async fn run__updates_da_service_latest_l2_height() {
let _ = service.run(&mut watcher).await;

// then
let latest_value = *latest_l2_height.lock().unwrap();
assert_eq!(*latest_value, l2_height);
let latest_value = service.latest_l2_block().load(Ordering::SeqCst);
assert_eq!(latest_value, l2_height);
}

#[derive(Clone)]
Expand Down
Loading
Loading