From 866115c967e509fb28160c893023a7a2ba1cb0ac Mon Sep 17 00:00:00 2001 From: Konrad Date: Mon, 6 Jan 2025 12:49:41 +0100 Subject: [PATCH] feat: throttle parallel prove commits (#662) --- examples/rpc_publish_multiple_sectors.sh | 5 -- storage-provider/server/src/main.rs | 20 +++++- storage-provider/server/src/pipeline/mod.rs | 78 ++++++++++++--------- 3 files changed, 64 insertions(+), 39 deletions(-) diff --git a/examples/rpc_publish_multiple_sectors.sh b/examples/rpc_publish_multiple_sectors.sh index 12a0a1db0..812118477 100755 --- a/examples/rpc_publish_multiple_sectors.sh +++ b/examples/rpc_publish_multiple_sectors.sh @@ -59,11 +59,6 @@ do echo echo "-------------------------- Publishing deal $i..." target/release/polka-storage-provider-client publish-deal "$SIGNED_DEAL_JSON" & - # If we try to prove commit 6 in a single row then we're done. - # we need to throttle prove commits. - # Sleeping until polka-storage#655 is done. - sleep 6 - done # wait until user Ctrl+Cs so that the commitment can actually be calculated diff --git a/storage-provider/server/src/main.rs b/storage-provider/server/src/main.rs index 86336ffc6..72de35d8f 100644 --- a/storage-provider/server/src/main.rs +++ b/storage-provider/server/src/main.rs @@ -7,7 +7,7 @@ mod pipeline; mod rpc; mod storage; -use std::{env::temp_dir, net::SocketAddr, path::PathBuf, sync::Arc, time::Duration}; +use std::{env::temp_dir, net::SocketAddr, num::NonZero, path::PathBuf, sync::Arc, time::Duration}; use clap::Parser; use pipeline::types::PipelineMessage; @@ -32,7 +32,10 @@ use subxt::{ }, tx::Signer, }; -use tokio::{sync::mpsc::UnboundedReceiver, task::JoinError}; +use tokio::{ + sync::{mpsc::UnboundedReceiver, Semaphore}, + task::JoinError, +}; use tokio_util::sync::CancellationToken; use tracing::level_filters::LevelFilter; use tracing_subscriber::{fmt, layer::SubscriberExt, util::SubscriberInitExt, EnvFilter}; @@ -233,6 +236,14 @@ pub struct ServerArguments { /// **they need to be set** via an extrinsic pallet-proofs::set_post_verifyingkey. #[arg(long)] post_parameters: PathBuf, + + /// The number of prove commits to be run in parallel. + /// MUST BE > 0 or the pipeline will not progress. + /// + /// Creating a replica is memory-heavy process. + /// E.g. With 2KiB sector sizes and 16GiB of RAM, it goes OOM at 4 parallel. + #[arg(long, default_value = "2")] + parallel_prove_commits: NonZero, } /// A valid server configuration. To be created using [`ServerConfiguration::try_from`]. @@ -272,6 +283,9 @@ pub struct ServerConfiguration { /// Proving Parameters for PoSt proof. /// For 2KiB sectors they're ~11MiB of data. post_parameters: PoStParameters, + + /// The number of prove commits to be run in parallel. + parallel_prove_commits: usize, } impl TryFrom for ServerConfiguration { @@ -327,6 +341,7 @@ impl TryFrom for ServerConfiguration { post_proof: value.post_proof, porep_parameters, post_parameters, + parallel_prove_commits: value.parallel_prove_commits.get(), }) } } @@ -458,6 +473,7 @@ impl ServerConfiguration { xt_client, xt_keypair: self.multi_pair_signer, pipeline_sender: pipeline_tx, + prove_commit_throttle: Arc::new(Semaphore::new(self.parallel_prove_commits)), }; Ok(SetupOutput { diff --git a/storage-provider/server/src/pipeline/mod.rs b/storage-provider/server/src/pipeline/mod.rs index 6f9c768dd..83aded8ab 100644 --- a/storage-provider/server/src/pipeline/mod.rs +++ b/storage-provider/server/src/pipeline/mod.rs @@ -28,7 +28,10 @@ use storagext::{ }; use subxt::{ext::codec::Encode, tx::Signer}; use tokio::{ - sync::mpsc::{error::SendError, UnboundedReceiver, UnboundedSender}, + sync::{ + mpsc::{error::SendError, UnboundedReceiver, UnboundedSender}, + Semaphore, + }, task::{JoinError, JoinHandle}, }; use tokio_util::{sync::CancellationToken, task::TaskTracker}; @@ -86,6 +89,7 @@ pub struct PipelineState { pub xt_client: Arc, pub xt_keypair: storagext::multipair::MultiPairSigner, pub pipeline_sender: UnboundedSender, + pub prove_commit_throttle: Arc, } #[tracing::instrument(skip_all)] @@ -544,38 +548,48 @@ async fn prove_commit( tracing::debug!("Performing prove commit for, seal_randomness_height {}, pre_commit_block: {}, prove_commit_block: {}, entropy: {}, ticket: {}, seed: {}, prover id: {}, sector_number: {}", seal_randomness_height, sector.precommit_block, prove_commit_block, hex::encode(entropy), hex::encode(ticket), hex::encode(seed), hex::encode(prover_id), sector_number); - let sealing_handle: JoinHandle, _>> = { - let porep_params = state.porep_parameters.clone(); - let cache_dir = sector.cache_path.clone(); - let sealed_path = sector.sealed_path.clone(); - let piece_infos = sector.piece_infos.clone(); - - tokio::task::spawn_blocking(move || { - sealer.prove_sector( - porep_params.as_ref(), - cache_dir, - sealed_path, - prover_id, - sector_number, - ticket, - Some(seed), - PreCommitOutput { - comm_r: sector.comm_r, - comm_d: sector.comm_d, - }, - &piece_infos, - ) - }) - }; + tracing::debug!("Acquiring sempahore..."); + let proofs = { + let _permit = state + .prove_commit_throttle + .acquire() + .await + .expect("semaphore to not be closed"); + tracing::debug!("Acquired sempahore."); + + let sealing_handle: JoinHandle, _>> = { + let porep_params = state.porep_parameters.clone(); + let cache_dir = sector.cache_path.clone(); + let sealed_path = sector.sealed_path.clone(); + let piece_infos = sector.piece_infos.clone(); + + tokio::task::spawn_blocking(move || { + sealer.prove_sector( + porep_params.as_ref(), + cache_dir, + sealed_path, + prover_id, + sector_number, + ticket, + Some(seed), + PreCommitOutput { + comm_r: sector.comm_r, + comm_d: sector.comm_d, + }, + &piece_infos, + ) + }) + }; - let proofs = tokio::select! { - // Up to this point everything is retryable. - // Pipeline ends up being in an inconsistent state if we prove commit to the chain, and don't wait for it, so the sector's not persisted in the DB. - res = sealing_handle => { - res?? - }, - () = token.cancelled() => { - return Err(PipelineError::ProvingCancelled); + tokio::select! { + // Up to this point everything is retryable. + // Pipeline ends up being in an inconsistent state if we prove commit to the chain, and don't wait for it, so the sector's not persisted in the DB. + res = sealing_handle => { + res?? + }, + () = token.cancelled() => { + return Err(PipelineError::ProvingCancelled); + } } };