Skip to content

Commit

Permalink
feat: throttle parallel prove commits (#662)
Browse files Browse the repository at this point in the history
  • Loading branch information
th7nder authored Jan 6, 2025
1 parent 206bf2e commit 866115c
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 39 deletions.
5 changes: 0 additions & 5 deletions examples/rpc_publish_multiple_sectors.sh
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,6 @@ do
echo
echo "-------------------------- Publishing deal $i..."
target/release/polka-storage-provider-client publish-deal "$SIGNED_DEAL_JSON" &
# If we try to prove commit 6 in a single row then we're done.
# we need to throttle prove commits.
# Sleeping until polka-storage#655 is done.
sleep 6

done

# wait until user Ctrl+Cs so that the commitment can actually be calculated
Expand Down
20 changes: 18 additions & 2 deletions storage-provider/server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ mod pipeline;
mod rpc;
mod storage;

use std::{env::temp_dir, net::SocketAddr, path::PathBuf, sync::Arc, time::Duration};
use std::{env::temp_dir, net::SocketAddr, num::NonZero, path::PathBuf, sync::Arc, time::Duration};

use clap::Parser;
use pipeline::types::PipelineMessage;
Expand All @@ -32,7 +32,10 @@ use subxt::{
},
tx::Signer,
};
use tokio::{sync::mpsc::UnboundedReceiver, task::JoinError};
use tokio::{
sync::{mpsc::UnboundedReceiver, Semaphore},
task::JoinError,
};
use tokio_util::sync::CancellationToken;
use tracing::level_filters::LevelFilter;
use tracing_subscriber::{fmt, layer::SubscriberExt, util::SubscriberInitExt, EnvFilter};
Expand Down Expand Up @@ -233,6 +236,14 @@ pub struct ServerArguments {
/// **they need to be set** via an extrinsic pallet-proofs::set_post_verifyingkey.
#[arg(long)]
post_parameters: PathBuf,

/// The number of prove commits to be run in parallel.
/// MUST BE > 0 or the pipeline will not progress.
///
/// Creating a replica is memory-heavy process.
/// E.g. With 2KiB sector sizes and 16GiB of RAM, it goes OOM at 4 parallel.
#[arg(long, default_value = "2")]
parallel_prove_commits: NonZero<usize>,
}

/// A valid server configuration. To be created using [`ServerConfiguration::try_from`].
Expand Down Expand Up @@ -272,6 +283,9 @@ pub struct ServerConfiguration {
/// Proving Parameters for PoSt proof.
/// For 2KiB sectors they're ~11MiB of data.
post_parameters: PoStParameters,

/// The number of prove commits to be run in parallel.
parallel_prove_commits: usize,
}

impl TryFrom<ServerArguments> for ServerConfiguration {
Expand Down Expand Up @@ -327,6 +341,7 @@ impl TryFrom<ServerArguments> for ServerConfiguration {
post_proof: value.post_proof,
porep_parameters,
post_parameters,
parallel_prove_commits: value.parallel_prove_commits.get(),
})
}
}
Expand Down Expand Up @@ -458,6 +473,7 @@ impl ServerConfiguration {
xt_client,
xt_keypair: self.multi_pair_signer,
pipeline_sender: pipeline_tx,
prove_commit_throttle: Arc::new(Semaphore::new(self.parallel_prove_commits)),
};

Ok(SetupOutput {
Expand Down
78 changes: 46 additions & 32 deletions storage-provider/server/src/pipeline/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,10 @@ use storagext::{
};
use subxt::{ext::codec::Encode, tx::Signer};
use tokio::{
sync::mpsc::{error::SendError, UnboundedReceiver, UnboundedSender},
sync::{
mpsc::{error::SendError, UnboundedReceiver, UnboundedSender},
Semaphore,
},
task::{JoinError, JoinHandle},
};
use tokio_util::{sync::CancellationToken, task::TaskTracker};
Expand Down Expand Up @@ -86,6 +89,7 @@ pub struct PipelineState {
pub xt_client: Arc<storagext::Client>,
pub xt_keypair: storagext::multipair::MultiPairSigner,
pub pipeline_sender: UnboundedSender<PipelineMessage>,
pub prove_commit_throttle: Arc<Semaphore>,
}

#[tracing::instrument(skip_all)]
Expand Down Expand Up @@ -544,38 +548,48 @@ async fn prove_commit(
tracing::debug!("Performing prove commit for, seal_randomness_height {}, pre_commit_block: {}, prove_commit_block: {}, entropy: {}, ticket: {}, seed: {}, prover id: {}, sector_number: {}",
seal_randomness_height, sector.precommit_block, prove_commit_block, hex::encode(entropy), hex::encode(ticket), hex::encode(seed), hex::encode(prover_id), sector_number);

let sealing_handle: JoinHandle<Result<Vec<BlstrsProof>, _>> = {
let porep_params = state.porep_parameters.clone();
let cache_dir = sector.cache_path.clone();
let sealed_path = sector.sealed_path.clone();
let piece_infos = sector.piece_infos.clone();

tokio::task::spawn_blocking(move || {
sealer.prove_sector(
porep_params.as_ref(),
cache_dir,
sealed_path,
prover_id,
sector_number,
ticket,
Some(seed),
PreCommitOutput {
comm_r: sector.comm_r,
comm_d: sector.comm_d,
},
&piece_infos,
)
})
};
tracing::debug!("Acquiring sempahore...");
let proofs = {
let _permit = state
.prove_commit_throttle
.acquire()
.await
.expect("semaphore to not be closed");
tracing::debug!("Acquired sempahore.");

let sealing_handle: JoinHandle<Result<Vec<BlstrsProof>, _>> = {
let porep_params = state.porep_parameters.clone();
let cache_dir = sector.cache_path.clone();
let sealed_path = sector.sealed_path.clone();
let piece_infos = sector.piece_infos.clone();

tokio::task::spawn_blocking(move || {
sealer.prove_sector(
porep_params.as_ref(),
cache_dir,
sealed_path,
prover_id,
sector_number,
ticket,
Some(seed),
PreCommitOutput {
comm_r: sector.comm_r,
comm_d: sector.comm_d,
},
&piece_infos,
)
})
};

let proofs = tokio::select! {
// Up to this point everything is retryable.
// Pipeline ends up being in an inconsistent state if we prove commit to the chain, and don't wait for it, so the sector's not persisted in the DB.
res = sealing_handle => {
res??
},
() = token.cancelled() => {
return Err(PipelineError::ProvingCancelled);
tokio::select! {
// Up to this point everything is retryable.
// Pipeline ends up being in an inconsistent state if we prove commit to the chain, and don't wait for it, so the sector's not persisted in the DB.
res = sealing_handle => {
res??
},
() = token.cancelled() => {
return Err(PipelineError::ProvingCancelled);
}
}
};

Expand Down

0 comments on commit 866115c

Please sign in to comment.