diff --git a/common/src/watcher.rs b/common/src/watcher.rs index 95e57fd8..b011b279 100644 --- a/common/src/watcher.rs +++ b/common/src/watcher.rs @@ -10,10 +10,11 @@ use std::{future::Future, time::Duration}; use tokio::{ select, - sync::watch, + sync::watch::{self, Ref}, + task::JoinHandle, time::{self, sleep}, }; -use tracing::warn; +use tracing::{error, warn}; /// Creates a new watcher that auto initializes it with initial_value /// and updates it given an interval @@ -84,3 +85,31 @@ where }); rx } + +// Replacement for pipe_async function in eventuals +// Listen to the changes in a receiver and runs parametric function +pub fn watch_pipe(rx: watch::Receiver, function: F) -> JoinHandle<()> +where + T: Clone + Send + Sync + 'static, + F: Fn(Ref<'_, T>) -> Fut + Send + Sync + 'static, + Fut: Future + Send + 'static, +{ + tokio::spawn(async move { + let mut rx = rx; + let value = rx.borrow(); + function(value).await; + loop { + let res = rx.changed().await; + match res { + Ok(_) => { + let value = rx.borrow(); + function(value).await; + } + Err(err) => { + error!("There was an error piping the watcher results: {err}"); + break; + } + }; + } + }) +} diff --git a/tap-agent/src/agent/sender_account.rs b/tap-agent/src/agent/sender_account.rs index 675aae38..3140eddc 100644 --- a/tap-agent/src/agent/sender_account.rs +++ b/tap-agent/src/agent/sender_account.rs @@ -9,6 +9,7 @@ use bigdecimal::ToPrimitive; use futures::{stream, StreamExt}; use graphql_client::GraphQLQuery; +use indexer_common::watcher::watch_pipe; use jsonrpsee::http_client::HttpClientBuilder; use prometheus::{register_gauge_vec, register_int_gauge_vec, GaugeVec, IntGaugeVec}; use reqwest::Url; @@ -496,35 +497,29 @@ impl Actor for SenderAccount { }: Self::Arguments, ) -> std::result::Result { let myself_clone = myself.clone(); - let _indexer_allocations_handle = tokio::spawn(async move { - let mut indexer_allocations = indexer_allocations.clone(); - loop { - let allocation_ids = indexer_allocations.borrow().clone(); - // Update the allocation_ids - myself_clone - .cast(SenderAccountMessage::UpdateAllocationIds(allocation_ids)) - .unwrap_or_else(|e| { - error!("Error while updating allocation_ids: {:?}", e); - }); - if indexer_allocations.changed().await.is_err() { - break; - } - } + let _indexer_allocations_handle = watch_pipe(indexer_allocations, move |allocation_ids| { + let allocation_ids = allocation_ids.clone(); + // Update the allocation_ids + myself_clone + .cast(SenderAccountMessage::UpdateAllocationIds(allocation_ids)) + .unwrap_or_else(|e| { + error!("Error while updating allocation_ids: {:?}", e); + }); + async {} }); let myself_clone = myself.clone(); let pgpool_clone = pgpool.clone(); - let mut accounts_clone = escrow_accounts.clone(); - let _escrow_account_monitor = tokio::spawn(async move { - while accounts_clone.changed().await.is_ok() { - let escrow_account = accounts_clone.borrow().clone(); - let myself = myself_clone.clone(); - let pgpool = pgpool_clone.clone(); - // Get balance or default value for sender - // this balance already takes into account thawing - let balance = escrow_account - .get_balance_for_sender(&sender_id) - .unwrap_or_default(); + let accounts_clone = escrow_accounts.clone(); + let _escrow_account_monitor = watch_pipe(accounts_clone, move |escrow_account| { + let myself = myself_clone.clone(); + let pgpool = pgpool_clone.clone(); + // Get balance or default value for sender + // this balance already takes into account thawing + let balance = escrow_account + .get_balance_for_sender(&sender_id) + .unwrap_or_default(); + async move { let last_non_final_ravs = sqlx::query!( r#" SELECT allocation_id, value_aggregate diff --git a/tap-agent/src/agent/sender_accounts_manager.rs b/tap-agent/src/agent/sender_accounts_manager.rs index af6c38e2..e194609c 100644 --- a/tap-agent/src/agent/sender_accounts_manager.rs +++ b/tap-agent/src/agent/sender_accounts_manager.rs @@ -5,6 +5,9 @@ use std::collections::HashSet; use std::time::Duration; use std::{collections::HashMap, str::FromStr}; +use super::sender_account::{ + SenderAccount, SenderAccountArgs, SenderAccountConfig, SenderAccountMessage, +}; use crate::agent::sender_allocation::SenderAllocationMessage; use crate::lazy_static; use alloy::dyn_abi::Eip712Domain; @@ -14,6 +17,8 @@ use anyhow::{anyhow, bail}; use futures::{stream, StreamExt}; use indexer_common::escrow_accounts::EscrowAccounts; use indexer_common::prelude::{Allocation, SubgraphClient}; +use indexer_common::watcher::watch_pipe; +use prometheus::{register_counter_vec, CounterVec}; use ractor::concurrency::JoinHandle; use ractor::{Actor, ActorCell, ActorProcessingErr, ActorRef, SupervisionEvent}; use reqwest::Url; @@ -23,12 +28,6 @@ use tokio::select; use tokio::sync::watch::{self, Receiver}; use tracing::{error, warn}; -use prometheus::{register_counter_vec, CounterVec}; - -use super::sender_account::{ - SenderAccount, SenderAccountArgs, SenderAccountConfig, SenderAccountMessage, -}; - lazy_static! { static ref RECEIPTS_CREATED: CounterVec = register_counter_vec!( "tap_receipts_received_total", @@ -106,19 +105,12 @@ impl Actor for SenderAccountsManager { }: Self::Arguments, ) -> std::result::Result { let (allocations_tx, allocations_rx) = watch::channel(HashSet::
::new()); - tokio::spawn(async move { - let mut indexer_allocations = indexer_allocations.clone(); - while indexer_allocations.changed().await.is_ok() { - allocations_tx - .send( - indexer_allocations - .borrow() - .keys() - .cloned() - .collect::>(), - ) - .expect("Failed to update indexer_allocations_set channel"); - } + watch_pipe(indexer_allocations.clone(), move |allocation_id| { + let allocation_set = allocation_id.keys().cloned().collect::>(); + allocations_tx + .send(allocation_set) + .expect("Failed to update indexer_allocations_set channel"); + async {} }); let mut pglistener = PgListener::connect_with(&pgpool.clone()).await.unwrap(); pglistener @@ -129,18 +121,17 @@ impl Actor for SenderAccountsManager { 'scalar_tap_receipt_notification'", ); let myself_clone = myself.clone(); - let mut accounts_clone = escrow_accounts.clone(); - let _eligible_allocations_senders_handle = tokio::spawn(async move { - while accounts_clone.changed().await.is_ok() { + let accounts_clone = escrow_accounts.clone(); + let _eligible_allocations_senders_handle = + watch_pipe(accounts_clone, move |escrow_accounts| { + let senders = escrow_accounts.get_senders(); myself_clone - .cast(SenderAccountsManagerMessage::UpdateSenderAccounts( - accounts_clone.borrow().get_senders(), - )) + .cast(SenderAccountsManagerMessage::UpdateSenderAccounts(senders)) .unwrap_or_else(|e| { error!("Error while updating sender_accounts: {:?}", e); }); - } - }); + async {} + }); let mut state = State { config,