Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: throttle parallel prove commits #662

Merged
merged 7 commits into from
Jan 6, 2025
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions examples/rpc_publish_multiple_sectors.sh
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,6 @@ do
echo
echo "-------------------------- Publishing deal $i..."
target/release/polka-storage-provider-client publish-deal "$SIGNED_DEAL_JSON" &
# If we try to prove commit 6 in a single row then we're done.
# we need to throttle prove commits.
# Sleeping until polka-storage#655 is done.
sleep 6

done

# wait until user Ctrl+Cs so that the commitment can actually be calculated
Expand Down
18 changes: 17 additions & 1 deletion storage-provider/server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,10 @@ use subxt::{
},
tx::Signer,
};
use tokio::{sync::mpsc::UnboundedReceiver, task::JoinError};
use tokio::{
sync::{mpsc::UnboundedReceiver, Semaphore},
task::JoinError,
};
use tokio_util::sync::CancellationToken;
use tracing::level_filters::LevelFilter;
use tracing_subscriber::{fmt, layer::SubscriberExt, util::SubscriberInitExt, EnvFilter};
Expand Down Expand Up @@ -233,6 +236,14 @@ pub struct ServerArguments {
/// **they need to be set** via an extrinsic pallet-proofs::set_post_verifyingkey.
#[arg(long)]
post_parameters: PathBuf,

/// The number of prove commits to be run in parallel.
/// MUST BE > 0 or the pipeline will not progress.
th7nder marked this conversation as resolved.
Show resolved Hide resolved
th7nder marked this conversation as resolved.
Show resolved Hide resolved
///
/// Creating a replica is memory-heavy process.
/// E.g. With 2KiB sector sizes and 16GiB of RAM, it goes OOM at 4 parallel.
#[arg(long, default_value = "2")]
parallel_prove_commits: usize,
}

/// A valid server configuration. To be created using [`ServerConfiguration::try_from`].
Expand Down Expand Up @@ -272,6 +283,9 @@ pub struct ServerConfiguration {
/// Proving Parameters for PoSt proof.
/// For 2KiB sectors they're ~11MiB of data.
post_parameters: PoStParameters,

/// The number of prove commits to be run in parallel.
parallel_prove_commits: usize,
}

impl TryFrom<ServerArguments> for ServerConfiguration {
Expand Down Expand Up @@ -327,6 +341,7 @@ impl TryFrom<ServerArguments> for ServerConfiguration {
post_proof: value.post_proof,
porep_parameters,
post_parameters,
parallel_prove_commits: value.parallel_prove_commits,
})
}
}
Expand Down Expand Up @@ -458,6 +473,7 @@ impl ServerConfiguration {
xt_client,
xt_keypair: self.multi_pair_signer,
pipeline_sender: pipeline_tx,
prove_commit_throttle: Arc::new(Semaphore::new(self.parallel_prove_commits)),
};

Ok(SetupOutput {
Expand Down
78 changes: 46 additions & 32 deletions storage-provider/server/src/pipeline/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,10 @@ use storagext::{
};
use subxt::{ext::codec::Encode, tx::Signer};
use tokio::{
sync::mpsc::{error::SendError, UnboundedReceiver, UnboundedSender},
sync::{
mpsc::{error::SendError, UnboundedReceiver, UnboundedSender},
Semaphore,
},
task::{JoinError, JoinHandle},
};
use tokio_util::{sync::CancellationToken, task::TaskTracker};
Expand Down Expand Up @@ -86,6 +89,7 @@ pub struct PipelineState {
pub xt_client: Arc<storagext::Client>,
pub xt_keypair: storagext::multipair::MultiPairSigner,
pub pipeline_sender: UnboundedSender<PipelineMessage>,
pub prove_commit_throttle: Arc<Semaphore>,
}

#[tracing::instrument(skip_all)]
Expand Down Expand Up @@ -544,38 +548,48 @@ async fn prove_commit(
tracing::debug!("Performing prove commit for, seal_randomness_height {}, pre_commit_block: {}, prove_commit_block: {}, entropy: {}, ticket: {}, seed: {}, prover id: {}, sector_number: {}",
seal_randomness_height, sector.precommit_block, prove_commit_block, hex::encode(entropy), hex::encode(ticket), hex::encode(seed), hex::encode(prover_id), sector_number);

let sealing_handle: JoinHandle<Result<Vec<BlstrsProof>, _>> = {
let porep_params = state.porep_parameters.clone();
let cache_dir = sector.cache_path.clone();
let sealed_path = sector.sealed_path.clone();
let piece_infos = sector.piece_infos.clone();

tokio::task::spawn_blocking(move || {
sealer.prove_sector(
porep_params.as_ref(),
cache_dir,
sealed_path,
prover_id,
sector_number,
ticket,
Some(seed),
PreCommitOutput {
comm_r: sector.comm_r,
comm_d: sector.comm_d,
},
&piece_infos,
)
})
};
tracing::debug!("Acquiring sempahore...");
let proofs = {
let _permit = state
.prove_commit_throttle
.acquire()
.await
.expect("semaphore to not be closed");
tracing::debug!("Acquired sempahore.");

let sealing_handle: JoinHandle<Result<Vec<BlstrsProof>, _>> = {
let porep_params = state.porep_parameters.clone();
let cache_dir = sector.cache_path.clone();
let sealed_path = sector.sealed_path.clone();
let piece_infos = sector.piece_infos.clone();

tokio::task::spawn_blocking(move || {
sealer.prove_sector(
porep_params.as_ref(),
cache_dir,
sealed_path,
prover_id,
sector_number,
ticket,
Some(seed),
PreCommitOutput {
comm_r: sector.comm_r,
comm_d: sector.comm_d,
},
&piece_infos,
)
})
};

let proofs = tokio::select! {
// Up to this point everything is retryable.
// Pipeline ends up being in an inconsistent state if we prove commit to the chain, and don't wait for it, so the sector's not persisted in the DB.
res = sealing_handle => {
res??
},
() = token.cancelled() => {
return Err(PipelineError::ProvingCancelled);
tokio::select! {
// Up to this point everything is retryable.
// Pipeline ends up being in an inconsistent state if we prove commit to the chain, and don't wait for it, so the sector's not persisted in the DB.
res = sealing_handle => {
res??
},
() = token.cancelled() => {
return Err(PipelineError::ProvingCancelled);
}
}
};

Expand Down
Loading