diff --git a/.gitignore b/.gitignore index 09b1f44..941b004 100644 --- a/.gitignore +++ b/.gitignore @@ -10,3 +10,4 @@ target/ *.key config.toml +store diff --git a/Cargo.lock b/Cargo.lock index 565a634..a0f1e48 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2084,6 +2084,17 @@ version = "0.2.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "28dea519a9695b9977216879a3ebfddf92f1c08c05d984f8996aecd6ecdc811d" +[[package]] +name = "file-based-storage" +version = "0.1.0" +dependencies = [ + "amplifier-api", + "bytemuck", + "memmap2 0.9.5", + "relayer-amplifier-state", + "tracing", +] + [[package]] name = "fixed-hash" version = "0.8.0" @@ -3193,6 +3204,15 @@ dependencies = [ "libc", ] +[[package]] +name = "memmap2" +version = "0.9.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fd3f7eed9d3848f8b98834af67102b720745c4ec028fcd0aa0239277e7de374f" +dependencies = [ + "libc", +] + [[package]] name = "memoffset" version = "0.9.1" @@ -4351,6 +4371,7 @@ dependencies = [ "futures", "futures-concurrency", "quanta", + "relayer-amplifier-state", "relayer-engine", "serde", "tokio", @@ -4360,6 +4381,13 @@ dependencies = [ "url", ] +[[package]] +name = "relayer-amplifier-state" +version = "0.1.0" +dependencies = [ + "amplifier-api", +] + [[package]] name = "relayer-engine" version = "0.1.0" @@ -5288,6 +5316,7 @@ dependencies = [ "amplifier-api", "color-eyre", "eyre", + "file-based-storage", "indoc", "opentelemetry", "opentelemetry-appender-tracing", @@ -5454,13 +5483,13 @@ dependencies = [ "mockall", "num-traits", "relayer-amplifier-api-integration", + "relayer-amplifier-state", "relayer-engine", "serde", "serde_json", "solana-client", "solana-sdk", "solana-transaction-status", - "tokio", "tracing", "typed-builder", ] @@ -5827,7 +5856,7 @@ dependencies = [ "lazy_static", "libsecp256k1", "log", - "memmap2", + "memmap2 0.5.10", "num_enum", "pbkdf2 0.11.0", "qstring", @@ -7384,7 +7413,7 @@ version = "0.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cf221c93e13a30d793f7645a0e7762c55d169dbb0a49671918a2319d289b10bb" dependencies = [ - "windows-sys 0.48.0", + "windows-sys 0.59.0", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index ab1dbbe..fae70ee 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -64,6 +64,8 @@ unused_must_use = { level = "deny", priority = -1 } # Our crates relayer-engine = { path = "crates/relayer-engine" } relayer-amplifier-api-integration = { path = "crates/relayer-amplifier-api-integration" } +relayer-amplifier-state = { path = "crates/relayer-amplifier-state" } +file-based-storage = { path = "crates/file-based-storage" } amplifier-api = { path = "crates/amplifier-api" } solana-listener = { path = "crates/solana-listener" } common-serde-utils = { path = "crates/common-serde-utils" } @@ -102,6 +104,8 @@ backoff = { version = "0.4", features = ["tokio"] } indoc = "2" itertools = "0.12" num-traits = "0.2" +memmap2 = "0.9" +bytemuck = "1.19" # Serde serde = { version = "1", features = ["derive"] } diff --git a/config.example.toml b/config.example.toml index 44bb431..4a0510f 100644 --- a/config.example.toml +++ b/config.example.toml @@ -1,3 +1,4 @@ +storage_path = "./storage" [amplifier_component] # pem format cert identity = ''' diff --git a/crates/amplifier-api/src/types.rs b/crates/amplifier-api/src/types.rs index c9dd30e..bfdb298 100644 --- a/crates/amplifier-api/src/types.rs +++ b/crates/amplifier-api/src/types.rs @@ -2,11 +2,11 @@ //! Contsructed form the following API spec [link](https://github.com/axelarnetwork/axelar-eds-mirror/blob/main/oapi/gmp/schema.yaml) pub use big_int::BigInt; -pub use bnum; use chrono::{DateTime, Utc}; pub use id::*; use serde::{Deserialize, Deserializer, Serialize}; use typed_builder::TypedBuilder; +pub use {bnum, uuid}; /// Represents an address as a non-empty string. pub type Address = String; @@ -500,7 +500,7 @@ pub enum Task { } /// Represents an individual Task Item. -#[derive(Debug, PartialEq, Eq, Serialize, Deserialize, TypedBuilder)] +#[derive(PartialEq, Eq, Serialize, Deserialize, TypedBuilder)] pub struct TaskItem { /// UUID of current task pub id: TaskItemId, @@ -511,6 +511,24 @@ pub struct TaskItem { pub task: Task, } +#[expect(clippy::min_ident_chars, reason = "comes from trait definition")] +impl core::fmt::Debug for TaskItem { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + let task_type = match self.task { + Task::Verify(_) => "Verify", + Task::GatewayTx(_) => "GatewayTx", + Task::Execute(_) => "Execute", + Task::Refund(_) => "Refund", + }; + + f.debug_struct("TaskItem") + .field("id", &self.id) + .field("timestamp", &self.timestamp) + .field("task", &task_type) + .finish() + } +} + /// Represents the response from fetching tasks. #[derive(Debug, PartialEq, Eq, Serialize, Deserialize, TypedBuilder)] pub struct GetTasksResult { diff --git a/crates/file-based-storage/Cargo.toml b/crates/file-based-storage/Cargo.toml new file mode 100644 index 0000000..6235db1 --- /dev/null +++ b/crates/file-based-storage/Cargo.toml @@ -0,0 +1,18 @@ +[package] +name = "file-based-storage" +version.workspace = true +authors.workspace = true +repository.workspace = true +homepage.workspace = true +license.workspace = true +edition.workspace = true + +[dependencies] +relayer-amplifier-state.workspace = true +amplifier-api.workspace = true +memmap2.workspace = true +tracing.workspace = true +bytemuck.workspace = true + +[lints] +workspace = true diff --git a/crates/file-based-storage/src/lib.rs b/crates/file-based-storage/src/lib.rs new file mode 100644 index 0000000..0116643 --- /dev/null +++ b/crates/file-based-storage/src/lib.rs @@ -0,0 +1,140 @@ +#![expect(clippy::allow_attributes_without_reason)] +#![expect(clippy::allow_attributes)] + +//! simple memory storage implementation using memory maps +use std::fs::OpenOptions; +use std::io::{self, Seek as _, SeekFrom, Write as _}; +use std::path::Path; +use std::sync::{Arc, Mutex}; + +use amplifier_api::types::{uuid, TaskItemId}; +use bytemuck::{Pod, Zeroable}; +use memmap2::MmapMut; + +/// Memory map wrapper that implements the state to successfully store and retrieve latest task item +/// id +#[derive(Debug, Clone)] +pub struct MemmapState { + mmap: Arc>, +} + +#[repr(C)] +#[derive(Default, Debug, Copy, Clone, Pod, Zeroable)] +struct InternalState { + latest_queried_task_item_id: u128, + latest_processed_task_item_id: u128, +} + +#[expect( + clippy::expect_used, + clippy::unwrap_in_result, + reason = "irrecoverable error" +)] +impl MemmapState { + /// Creates a new [`MemmapState`] with the memory-mapped file at the given path. + /// + /// # Errors + /// If the file cannot be created / opened + /// + /// # Panics + /// If the expected state of the [`InternalState`] will be larger than `u64` + pub fn new>(path: P) -> io::Result { + // Open or create the file with read and write permissions + let mut file = OpenOptions::new() + .read(true) + .write(true) + .create(true) + .truncate(false) + .open(path)?; + + // Ensure the file is at least the size of InternalState + let default_state = InternalState::default(); + let default_state_bytes = bytemuck::bytes_of(&default_state); + let expected_len = default_state_bytes + .len() + .try_into() + .expect("the size of default state must fit in a u64"); + if file.metadata()?.len() < expected_len { + file.set_len(expected_len)?; + file.seek(SeekFrom::Start(0))?; + file.write_all(default_state_bytes)?; + } + + // Create a mutable memory map of the file + // SAFETY: + // we ensured that the size is large enough + let mmap = unsafe { MmapMut::map_mut(&file)? }; + mmap.flush_async()?; + + Ok(Self { + mmap: Arc::new(Mutex::new(mmap)), + }) + } + + // Generic helper function for getting a TaskItemId + fn get_task_item_id(&self, field_accessor: F) -> Option + where + F: Fn(&InternalState) -> u128, + { + let mmap = self.mmap.lock().expect("lock should not be poisoned"); + let data = bytemuck::from_bytes::(&mmap[..]); + let task_item_id = field_accessor(data); + drop(mmap); + + if task_item_id == 0 { + None + } else { + Some(TaskItemId(uuid::Uuid::from_u128(task_item_id))) + } + } + + // Generic helper function for setting a TaskItemId + fn set_task_item_id( + &self, + task_item_id: &TaskItemId, + field_mutator: F, + ) -> Result<(), io::Error> + where + F: Fn(&mut InternalState, u128), + { + let mut mmap = self.mmap.lock().expect("lock should not be poisoned"); + let raw_u128 = task_item_id.0.as_u128(); + let data = bytemuck::from_bytes_mut::(&mut mmap[..]); + field_mutator(data, raw_u128); + mmap.flush_async()?; + drop(mmap); + Ok(()) + } +} + +impl relayer_amplifier_state::State for MemmapState { + type Err = io::Error; + + #[tracing::instrument(skip(self), level = "trace", ret)] + fn latest_queried_task_id(&self) -> Option { + tracing::trace!("getting latest queried task item id"); + self.get_task_item_id(|data| data.latest_queried_task_item_id) + } + + #[tracing::instrument(skip(self), err)] + fn set_latest_queried_task_id(&self, task_item_id: TaskItemId) -> Result<(), Self::Err> { + tracing::info!("updating latest queried task item id"); + self.set_task_item_id(&task_item_id, |data, value| { + data.latest_queried_task_item_id = value; + }) + } + + #[tracing::instrument(skip(self), level = "trace", ret)] + fn latest_processed_task_id(&self) -> Option { + tracing::trace!("getting latest processed task item id"); + self.get_task_item_id(|data| data.latest_processed_task_item_id) + } + + #[tracing::instrument(skip(self), err)] + fn set_latest_processed_task_id(&self, task_item_id: TaskItemId) -> Result<(), Self::Err> { + tracing::info!("updating latest processed task item id"); + self.set_task_item_id(&task_item_id, |data, value| { + data.latest_processed_task_item_id = value; + }) + } +} diff --git a/crates/relayer-amplifier-api-integration/Cargo.toml b/crates/relayer-amplifier-api-integration/Cargo.toml index 28ef6be..a017721 100644 --- a/crates/relayer-amplifier-api-integration/Cargo.toml +++ b/crates/relayer-amplifier-api-integration/Cargo.toml @@ -21,6 +21,7 @@ serde.workspace = true relayer-engine.workspace = true tokio-stream.workspace = true common-serde-utils.workspace = true +relayer-amplifier-state.workspace = true [lints] workspace = true diff --git a/crates/relayer-amplifier-api-integration/src/component.rs b/crates/relayer-amplifier-api-integration/src/component.rs index 5992300..ca3c831 100644 --- a/crates/relayer-amplifier-api-integration/src/component.rs +++ b/crates/relayer-amplifier-api-integration/src/component.rs @@ -4,6 +4,7 @@ use core::pin::Pin; use amplifier_api::types::{PublishEventsRequest, TaskItem}; use futures_concurrency::future::FutureExt as _; use quanta::Upkeep; +use relayer_amplifier_state::State; use tracing::{info_span, Instrument as _}; use crate::{config, from_amplifier, healthcheck, to_amplifier}; @@ -25,10 +26,11 @@ pub(crate) type AmplifierTaskSender = futures::channel::mpsc::UnboundedSender { config: config::Config, receiver: CommandReceiver, sender: AmplifierTaskSender, + state: S, } /// Utility client used for communicating with the `Amplifier` instance @@ -45,7 +47,10 @@ pub struct AmplifierTaskReceiver { pub receiver: futures::channel::mpsc::UnboundedReceiver, } -impl relayer_engine::RelayerComponent for Amplifier { +impl relayer_engine::RelayerComponent for Amplifier +where + S: State, +{ fn process(self: Box) -> Pin> + Send>> { use futures::FutureExt as _; @@ -53,17 +58,24 @@ impl relayer_engine::RelayerComponent for Amplifier { } } -impl Amplifier { +impl Amplifier +where + S: State, +{ /// Instantiate a new Amplifier using the pre-configured configuration. /// /// The returned variable also returns a helper client that encompasses ways to communicate with /// the underlying Amplifier instance. #[must_use] - pub fn new(config: config::Config) -> (Self, AmplifierCommandClient, AmplifierTaskReceiver) { + pub fn new( + config: config::Config, + state: S, + ) -> (Self, AmplifierCommandClient, AmplifierTaskReceiver) { let (command_tx, command_rx) = futures::channel::mpsc::unbounded(); let (task_tx, task_rx) = futures::channel::mpsc::unbounded(); let this = Self { config, + state, sender: task_tx, receiver: command_rx, }; @@ -87,10 +99,14 @@ impl Amplifier { to_amplifier::process(self.config.clone(), self.receiver, client.clone()) .instrument(info_span!("to amplifier")) .in_current_span(); - let from_amplifier_msgs = - from_amplifier::process(self.config.clone(), client.clone(), self.sender.clone()) - .instrument(info_span!("from amplifier")) - .in_current_span(); + let from_amplifier_msgs = from_amplifier::process( + self.config.clone(), + client.clone(), + self.sender.clone(), + self.state, + ) + .instrument(info_span!("from amplifier")) + .in_current_span(); // await tasks until one of them exits (fatal) healthcheck diff --git a/crates/relayer-amplifier-api-integration/src/config.rs b/crates/relayer-amplifier-api-integration/src/config.rs index 0c2e792..b60ea89 100644 --- a/crates/relayer-amplifier-api-integration/src/config.rs +++ b/crates/relayer-amplifier-api-integration/src/config.rs @@ -59,6 +59,6 @@ pub(crate) mod config_defaults { Some(5) } pub(crate) const fn get_chains_limit() -> u8 { - 25 + 4 } } diff --git a/crates/relayer-amplifier-api-integration/src/from_amplifier.rs b/crates/relayer-amplifier-api-integration/src/from_amplifier.rs index 3daf6cb..06e96b3 100644 --- a/crates/relayer-amplifier-api-integration/src/from_amplifier.rs +++ b/crates/relayer-amplifier-api-integration/src/from_amplifier.rs @@ -1,11 +1,11 @@ use core::task::Poll; -use std::sync::{Arc, Mutex}; use amplifier_api::requests::{self, WithTrailingSlash}; -use amplifier_api::types::{ErrorResponse, GetTasksResult, TaskItemId}; +use amplifier_api::types::{ErrorResponse, GetTasksResult}; use amplifier_api::AmplifierRequest; use futures::stream::StreamExt as _; use futures::SinkExt as _; +use relayer_amplifier_state::State; use tokio::task::JoinSet; use tokio_stream::wrappers::IntervalStream; @@ -15,11 +15,15 @@ use crate::config::Config; // process incoming messages (aka `tasks`) coming in form Amplifier API // 1. periodically check if we have new tasks for processing // 2. if we do, try to act on them; spawning handlers concurrently -pub(crate) async fn process( +pub(crate) async fn process( config: Config, client: amplifier_api::AmplifierApiClient, fan_out_sender: AmplifierTaskSender, -) -> eyre::Result<()> { + state: S, +) -> eyre::Result<()> +where + S: State, +{ tracing::info!(poll_interval =? config.get_chains_poll_interval, "spawned"); // Trailing slash is significant when making the API calls! @@ -31,7 +35,11 @@ pub(crate) async fn process( interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); interval }); - let latest_task = Arc::new(Mutex::new(Option::::None)); + + // Upon startup we want to continue from the latest processed item + if let Some(task_item_id) = state.latest_processed_task_id() { + state.set_latest_queried_task_id(task_item_id)?; + } let mut task_stream = futures::stream::poll_fn(move |cx| { // periodically query the API for new tasks but only if the downstream processor is ready to @@ -40,11 +48,11 @@ pub(crate) async fn process( Poll::Ready(Some(_res)) => { let res = internal( &config, - Arc::clone(&latest_task), &chain_with_trailing_slash, &client, fan_out_sender.clone(), &mut join_set, + state.clone(), ); // in case we were awoken by join_set being ready, let's re-run this function, // while returning the result of `internal`. @@ -81,46 +89,56 @@ pub(crate) async fn process( eyre::bail!("fatal error when processing messages from amplifier") } -pub(crate) fn internal( +pub(crate) fn internal( config: &Config, - tasks_after: Arc>>, chain_with_trailing_slash: &WithTrailingSlash, client: &lifier_api::AmplifierApiClient, fan_out_sender: AmplifierTaskSender, to_join_set: &mut JoinSet>, -) -> eyre::Result<()> { + state: S, +) -> eyre::Result<()> +where + S: State, +{ if !fan_out_sender.is_empty() { // the downstream client is still processing the events, don't send any new ones return Ok(()) } - let tasks_after_internal = tasks_after.lock().expect("lock poisoned").clone(); + let latest_processed_task = state.latest_processed_task_id(); + let latest_queried_task = state.latest_queried_task_id(); + if latest_processed_task != latest_queried_task { + tracing::debug!("downstream processor still processing the last batch"); + return Ok(()) + } + tracing::debug!(?latest_processed_task, "latest task to query"); let request = requests::GetChains::builder() .chain(chain_with_trailing_slash) .limit(config.get_chains_limit) - .after(tasks_after_internal) + .after(latest_processed_task) .build(); let request = client.build_request(&request)?; - to_join_set.spawn(process_task_request(request, tasks_after, fan_out_sender)); + to_join_set.spawn(process_task_request(request, fan_out_sender, state)); Ok(()) } -async fn process_task_request( +async fn process_task_request( request: AmplifierRequest, - tasks_after: Arc>>, mut fan_out_sender: AmplifierTaskSender, + state: S, ) -> eyre::Result<()> { let res = request.execute().await?; let res = res.json().await??; - let Some(last_task) = res.tasks.last().map(|x| x.id.clone()) else { + let Some(last_task_item_id) = res.tasks.last().map(|x| x.id.clone()) else { return Ok(()); }; - tracing::info!(task_count = ?res.tasks.len(), "received new tasks"); + tracing::info!( + new_tasks = ?res.tasks.len(), + latest_queried_task_id =? last_task_item_id, + "received new tasks" + ); let mut iter = futures::stream::iter(res.tasks.into_iter().map(Ok)); fan_out_sender.send_all(&mut iter).await?; - { - let mut lock = tasks_after.lock().expect("lock poisoned"); - lock.replace(last_task); - }; + state.set_latest_queried_task_id(last_task_item_id)?; Ok(()) } diff --git a/crates/relayer-amplifier-api-integration/src/state.rs b/crates/relayer-amplifier-api-integration/src/state.rs new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/crates/relayer-amplifier-api-integration/src/state.rs @@ -0,0 +1 @@ + diff --git a/crates/relayer-amplifier-state/Cargo.toml b/crates/relayer-amplifier-state/Cargo.toml new file mode 100644 index 0000000..ba24580 --- /dev/null +++ b/crates/relayer-amplifier-state/Cargo.toml @@ -0,0 +1,14 @@ +[package] +name = "relayer-amplifier-state" +version.workspace = true +authors.workspace = true +repository.workspace = true +homepage.workspace = true +license.workspace = true +edition.workspace = true + +[dependencies] +amplifier-api.workspace = true + +[lints] +workspace = true diff --git a/crates/relayer-amplifier-state/src/lib.rs b/crates/relayer-amplifier-state/src/lib.rs new file mode 100644 index 0000000..3d60df5 --- /dev/null +++ b/crates/relayer-amplifier-state/src/lib.rs @@ -0,0 +1,27 @@ +//! Abstract state interfacte for the Amplifier component + +use amplifier_api::types::TaskItemId; + +/// State interfacte to be used by the Amplifier relayer component +pub trait State: Clone + Send + Sync + 'static { + /// Geneirc error + type Err: core::error::Error + Send + Sync + 'static; + + /// Get the latest stored task item id + fn latest_processed_task_id(&self) -> Option; + + /// Get the latest stored task item id + fn latest_queried_task_id(&self) -> Option; + + /// Store the latest processed task item id + /// + /// # Errors + /// Depends on the implementation details + fn set_latest_processed_task_id(&self, task_item_id: TaskItemId) -> Result<(), Self::Err>; + + /// Store the latest queried task item id + /// + /// # Errors + /// Depends on the implementation details + fn set_latest_queried_task_id(&self, task_item_id: TaskItemId) -> Result<(), Self::Err>; +} diff --git a/crates/solana-axelar-relayer/Cargo.toml b/crates/solana-axelar-relayer/Cargo.toml index 1441f61..476e094 100644 --- a/crates/solana-axelar-relayer/Cargo.toml +++ b/crates/solana-axelar-relayer/Cargo.toml @@ -26,6 +26,7 @@ opentelemetry-otlp.workspace = true opentelemetry-semantic-conventions.workspace = true opentelemetry_sdk.workspace = true opentelemetry-appender-tracing.workspace = true +file-based-storage.workspace = true [dev-dependencies] temp-env.workspace = true diff --git a/crates/solana-axelar-relayer/src/main.rs b/crates/solana-axelar-relayer/src/main.rs index 4e132f2..48a58f5 100644 --- a/crates/solana-axelar-relayer/src/main.rs +++ b/crates/solana-axelar-relayer/src/main.rs @@ -25,6 +25,9 @@ async fn main() { let config_file = std::fs::read_to_string(config_file).expect("cannot read config file"); let config = toml::from_str::(&config_file).expect("invalid config file"); + let file_based_storage = file_based_storage::MemmapState::new(config.storage_path) + .expect("could not init file based storage"); + let rpc_client = retrying_solana_http_sender::new_client(&config.solana_rpc); let event_forwarder_config = solana_event_forwarder::Config::new( &config.solana_listener_component, @@ -32,12 +35,13 @@ async fn main() { ); let name_on_amplifier = config.amplifier_component.chain.clone(); let (amplifier_component, amplifier_client, amplifier_task_receiver) = - Amplifier::new(config.amplifier_component); + Amplifier::new(config.amplifier_component, file_based_storage.clone()); let gateway_task_processor = solana_gateway_task_processor::SolanaTxPusher::new( config.solana_gateway_task_processor, name_on_amplifier.clone(), Arc::clone(&rpc_client), amplifier_task_receiver, + file_based_storage, ); let (solana_listener_component, solana_listener_client) = solana_listener::SolanaListener::new( config.solana_listener_component, @@ -80,6 +84,8 @@ pub struct Config { pub relayer_engine: relayer_engine::Config, /// Shared configuration for the Solana RPC client pub solana_rpc: retrying_solana_http_sender::Config, + /// Path to the storage configuration file + pub storage_path: std::path::PathBuf, } #[expect( @@ -118,6 +124,8 @@ mod tests { let identity = identity_fixture(); let missed_signature_catchup_strategy = "until_beginning"; let input = indoc::formatdoc! {r#" + storage_path = "./store" + [amplifier_component] identity = ''' {identity} @@ -172,6 +180,7 @@ mod tests { max_concurrent_rpc_requests, solana_http_rpc: solana_rpc, }, + storage_path: "./store".parse().unwrap(), }; assert_eq!(parsed, expected); Ok(()) diff --git a/crates/solana-axelar-relayer/src/telemetry.rs b/crates/solana-axelar-relayer/src/telemetry.rs index 6743371..35344f6 100644 --- a/crates/solana-axelar-relayer/src/telemetry.rs +++ b/crates/solana-axelar-relayer/src/telemetry.rs @@ -105,6 +105,7 @@ fn setup_subscriber( .add_directive("solana_event_forwarder=info".parse()?) .add_directive("solana_gateway_task_processor=info".parse()?) .add_directive("effective_tx_sender=info".parse()?) + .add_directive("file_based_storage=info".parse()?) .add_directive("hyper=error".parse()?) .add_directive("tonic=error".parse()?) .add_directive("reqwest=error".parse()?) diff --git a/crates/solana-event-forwarder/src/component.rs b/crates/solana-event-forwarder/src/component.rs index 43a1c29..cbc9cba 100644 --- a/crates/solana-event-forwarder/src/component.rs +++ b/crates/solana-event-forwarder/src/component.rs @@ -348,7 +348,7 @@ fn map_gateway_event_to_amplifier_event( ) .build(), ); - tracing::info!(?approved_message, "Message approved"); + tracing::info!(message_id = ?approved_message.message_id, "Message approved"); Some(amplifier_event) } MessageExecuted(ref _executed_message) => { diff --git a/crates/solana-gateway-task-processor/Cargo.toml b/crates/solana-gateway-task-processor/Cargo.toml index 031bf83..0151c9e 100644 --- a/crates/solana-gateway-task-processor/Cargo.toml +++ b/crates/solana-gateway-task-processor/Cargo.toml @@ -20,12 +20,12 @@ futures.workspace = true common-serde-utils = { workspace = true, features = ["solana-sdk"] } bs58.workspace = true relayer-engine.workspace = true -tokio.workspace = true amplifier-api.workspace = true relayer-amplifier-api-integration.workspace = true effective-tx-sender.workspace = true axelar-executable.workspace = true num-traits.workspace = true +relayer-amplifier-state.workspace = true [dev-dependencies] serde_json.workspace = true diff --git a/crates/solana-gateway-task-processor/src/component.rs b/crates/solana-gateway-task-processor/src/component.rs index 31be57a..f267354 100644 --- a/crates/solana-gateway-task-processor/src/component.rs +++ b/crates/solana-gateway-task-processor/src/component.rs @@ -7,12 +7,13 @@ use std::sync::Arc; use amplifier_api::types::TaskItem; use axelar_rkyv_encoding::types::{HasheableMessageVec, VerifierSet}; use effective_tx_sender::ComputeBudgetError; -use futures::stream::{FusedStream as _, FuturesUnordered}; +use futures::stream::{FusedStream as _, FuturesOrdered, FuturesUnordered}; use futures::StreamExt as _; use gmp_gateway::commands::OwnedCommand; use gmp_gateway::state::GatewayApprovedCommand; use gmp_gateway::{hasher_impl, instructions}; use num_traits::FromPrimitive as _; +use relayer_amplifier_state::State; use solana_client::nonblocking::rpc_client::RpcClient; use solana_client::rpc_response::RpcSimulateTransactionResult; use solana_sdk::instruction::{Instruction, InstructionError}; @@ -21,21 +22,21 @@ use solana_sdk::pubkey::Pubkey; use solana_sdk::signature::{Keypair, Signature}; use solana_sdk::signer::Signer as _; use solana_sdk::transaction::TransactionError; -use tokio::task::JoinSet; use tracing::{info_span, instrument, Instrument as _}; use crate::config; /// A component that pushes transactions over to the Solana blockchain. /// The transactions to push are dependant on the events that the Amplifier API will provide -pub struct SolanaTxPusher { +pub struct SolanaTxPusher { config: config::Config, name_on_amplifier: String, rpc_client: Arc, task_receiver: relayer_amplifier_api_integration::AmplifierTaskReceiver, + state: S, } -impl relayer_engine::RelayerComponent for SolanaTxPusher { +impl relayer_engine::RelayerComponent for SolanaTxPusher { fn process(self: Box) -> Pin> + Send>> { use futures::FutureExt as _; @@ -43,7 +44,7 @@ impl relayer_engine::RelayerComponent for SolanaTxPusher { } } -impl SolanaTxPusher { +impl SolanaTxPusher { /// Create a new [`SolanaTxPusher`] component #[must_use] pub const fn new( @@ -51,47 +52,52 @@ impl SolanaTxPusher { name_on_amplifier: String, rpc_client: Arc, task_receiver: relayer_amplifier_api_integration::AmplifierTaskReceiver, + state: S, ) -> Self { Self { config, name_on_amplifier, rpc_client, task_receiver, + state, } } async fn process_internal(self) -> eyre::Result<()> { let config_metadata = self.get_config_metadata().await.map(Arc::new)?; + let state = self.state.clone(); let keypair = Arc::new(self.config.signing_keypair.insecure_clone()); - let mut join_set = JoinSet::>::new(); + let mut futures_ordered = FuturesOrdered::new(); let mut rx = self.task_receiver.receiver.fuse(); let mut task_stream = futures::stream::poll_fn(move |cx| { // check if we have new requests to add to the join set match rx.poll_next_unpin(cx) { - Poll::Ready(Some(command)) => { - // spawn the command on the joinset, returning the error - tracing::info!(?command, "received command from amplifier API"); - join_set.spawn({ + Poll::Ready(Some(task)) => { + // spawn the task on the joinset, returning the error + tracing::info!(?task, "received task from amplifier API"); + futures_ordered.push_back({ let solana_rpc_client = Arc::clone(&self.rpc_client); let keypair = Arc::clone(&keypair); let config_metadata = Arc::clone(&config_metadata); async move { - process_task(&keypair, &solana_rpc_client, command, &config_metadata) - .await + let command_id = task.id.clone(); + let res = + process_task(&keypair, &solana_rpc_client, task, &config_metadata) + .await; + (command_id, res) } }); } Poll::Pending => (), Poll::Ready(None) => { tracing::error!("receiver channel closed"); - join_set.abort_all(); } } // check if any background tasks are done - match join_set.poll_join_next(cx) { + match futures_ordered.poll_next_unpin(cx) { Poll::Ready(Some(res)) => Poll::Ready(Some(res)), - // join set returns `Poll::Ready(None)` when it's empty + // futures unordered returns `Poll::Ready(None)` when it's empty Poll::Ready(None) => { if rx.is_terminated() { return Poll::Ready(None) @@ -102,12 +108,9 @@ impl SolanaTxPusher { } }); - while let Some(task_result) = task_stream.next().await { - let Ok(res) = task_result else { - tracing::error!(?task_result, "background task panicked"); - continue; - }; - let Err(err) = res else { + while let Some((task_item_id, task_result)) = task_stream.next().await { + state.set_latest_processed_task_id(task_item_id)?; + let Err(err) = task_result else { continue; }; @@ -649,7 +652,7 @@ fn get_new_signing_verifier_set_pda(new_verifier_set: &VerifierSet) -> eyre::Res Ok(new_signing_verifier_set_pda) } -#[instrument(skip_all, ret)] +#[instrument(skip_all)] async fn send_transaction( solana_rpc_client: &RpcClient, keypair: &Keypair,