From ccd79b2e4dafd1cbfc20690da40cf17f9e0303ee Mon Sep 17 00:00:00 2001 From: Nazar Mokrynskyi Date: Mon, 2 Oct 2023 04:45:32 +0300 Subject: [PATCH] Limit concurrency of piece retrieval from node via RPC --- .../src/node_client/node_rpc_client.rs | 21 +++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/crates/subspace-farmer/src/node_client/node_rpc_client.rs b/crates/subspace-farmer/src/node_client/node_rpc_client.rs index 0da49c389a..4b7a33ce97 100644 --- a/crates/subspace-farmer/src/node_client/node_rpc_client.rs +++ b/crates/subspace-farmer/src/node_client/node_rpc_client.rs @@ -12,15 +12,19 @@ use subspace_rpc_primitives::{ FarmerAppInfo, NodeSyncStatus, RewardSignatureResponse, RewardSigningInfo, SlotInfo, SolutionResponse, }; +use tokio::sync::Semaphore; -// Defines max_concurrent_requests constant in the node rpc client. -// It must be set for large plots. -const WS_PRC_MAX_CONCURRENT_REQUESTS: usize = 1_000_000; +/// Defines max_concurrent_requests constant in the node rpc client +const RPC_MAX_CONCURRENT_REQUESTS: usize = 1_000_000; +/// Node is having a hard time responding for many piece requests +// TODO: Remove this once https://github.com/paritytech/jsonrpsee/issues/1189 is resolved +const MAX_CONCURRENT_PIECE_REQUESTS: usize = 10; /// `WsClient` wrapper. -#[derive(Clone, Debug)] +#[derive(Debug, Clone)] pub struct NodeRpcClient { client: Arc, + piece_request_semaphore: Arc, } impl NodeRpcClient { @@ -28,12 +32,16 @@ impl NodeRpcClient { pub async fn new(url: &str) -> Result { let client = Arc::new( WsClientBuilder::default() - .max_concurrent_requests(WS_PRC_MAX_CONCURRENT_REQUESTS) + .max_concurrent_requests(RPC_MAX_CONCURRENT_REQUESTS) .max_request_body_size(20 * 1024 * 1024) .build(url) .await?, ); - Ok(Self { client }) + let piece_request_semaphore = Arc::new(Semaphore::new(MAX_CONCURRENT_PIECE_REQUESTS)); + Ok(Self { + client, + piece_request_semaphore, + }) } } @@ -152,6 +160,7 @@ impl NodeClient for NodeRpcClient { } async fn piece(&self, piece_index: PieceIndex) -> Result, RpcError> { + let _permit = self.piece_request_semaphore.acquire().await?; let result: Option> = self .client .request("subspace_piece", rpc_params![&piece_index])