Skip to content

Commit

Permalink
Merge pull request #2034 from subspace/limit-node-piece-retrieval-con…
Browse files Browse the repository at this point in the history
…currency

Limit concurrency of piece retrieval from node via RPC
  • Loading branch information
nazar-pc authored Oct 2, 2023
2 parents 8341203 + ccd79b2 commit fdc2662
Showing 1 changed file with 15 additions and 6 deletions.
21 changes: 15 additions & 6 deletions crates/subspace-farmer/src/node_client/node_rpc_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,28 +12,36 @@ use subspace_rpc_primitives::{
FarmerAppInfo, NodeSyncStatus, RewardSignatureResponse, RewardSigningInfo, SlotInfo,
SolutionResponse,
};
use tokio::sync::Semaphore;

// Defines max_concurrent_requests constant in the node rpc client.
// It must be set for large plots.
const WS_PRC_MAX_CONCURRENT_REQUESTS: usize = 1_000_000;
/// Defines max_concurrent_requests constant in the node rpc client
const RPC_MAX_CONCURRENT_REQUESTS: usize = 1_000_000;
/// Node is having a hard time responding for many piece requests
// TODO: Remove this once https://github.com/paritytech/jsonrpsee/issues/1189 is resolved
const MAX_CONCURRENT_PIECE_REQUESTS: usize = 10;

/// `WsClient` wrapper.
#[derive(Clone, Debug)]
#[derive(Debug, Clone)]
pub struct NodeRpcClient {
client: Arc<WsClient>,
piece_request_semaphore: Arc<Semaphore>,
}

impl NodeRpcClient {
/// Create a new instance of [`NodeClient`].
pub async fn new(url: &str) -> Result<Self, JsonError> {
let client = Arc::new(
WsClientBuilder::default()
.max_concurrent_requests(WS_PRC_MAX_CONCURRENT_REQUESTS)
.max_concurrent_requests(RPC_MAX_CONCURRENT_REQUESTS)
.max_request_body_size(20 * 1024 * 1024)
.build(url)
.await?,
);
Ok(Self { client })
let piece_request_semaphore = Arc::new(Semaphore::new(MAX_CONCURRENT_PIECE_REQUESTS));
Ok(Self {
client,
piece_request_semaphore,
})
}
}

Expand Down Expand Up @@ -152,6 +160,7 @@ impl NodeClient for NodeRpcClient {
}

async fn piece(&self, piece_index: PieceIndex) -> Result<Option<Piece>, RpcError> {
let _permit = self.piece_request_semaphore.acquire().await?;
let result: Option<Vec<u8>> = self
.client
.request("subspace_piece", rpc_params![&piece_index])
Expand Down

0 comments on commit fdc2662

Please sign in to comment.