Skip to content

Commit

Permalink
kad/executor: Add timeout for writting frames (#277)
Browse files Browse the repository at this point in the history
This PR adds a `WRITE_TIMEOUT` for sending frames to remote peers.

The `WRITE_TIMEOUT` is set to 15 seconds to mirror the `READ_TIMEOUT`.

Closes: #231

Signed-off-by: Alexandru Vasile <[email protected]>
  • Loading branch information
lexnv authored Oct 30, 2024
1 parent 0b4bf1d commit 9ad20ff
Showing 1 changed file with 34 additions and 13 deletions.
47 changes: 34 additions & 13 deletions src/protocol/libp2p/kademlia/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ use std::{

/// Read timeout for inbound messages.
const READ_TIMEOUT: Duration = Duration::from_secs(15);
/// Write timeout for outbound messages.
const WRITE_TIMEOUT: Duration = Duration::from_secs(15);

/// Query result.
#[derive(Debug)]
Expand Down Expand Up @@ -91,16 +93,24 @@ impl QueryExecutor {
/// Send message to remote peer.
pub fn send_message(&mut self, peer: PeerId, message: Bytes, mut substream: Substream) {
self.futures.push(Box::pin(async move {
match substream.send_framed(message).await {
Ok(_) => QueryContext {
match tokio::time::timeout(WRITE_TIMEOUT, substream.send_framed(message)).await {
// Timeout error.
Err(_) =>
return QueryContext {
peer,
query_id: None,
result: QueryResult::Timeout,
},
// Writing message to substream failed.
Ok(Err(_)) => QueryContext {
peer,
query_id: None,
result: QueryResult::SendSuccess { substream },
result: QueryResult::SubstreamClosed,
},
Err(_) => QueryContext {
Ok(Ok(())) => QueryContext {
peer,
query_id: None,
result: QueryResult::SubstreamClosed,
result: QueryResult::SendSuccess { substream },
},
}
}));
Expand Down Expand Up @@ -143,14 +153,25 @@ impl QueryExecutor {
mut substream: Substream,
) {
self.futures.push(Box::pin(async move {
if let Err(_) = substream.send_framed(message).await {
let _ = substream.close().await;
return QueryContext {
peer,
query_id,
result: QueryResult::SubstreamClosed,
};
}
match tokio::time::timeout(WRITE_TIMEOUT, substream.send_framed(message)).await {
// Timeout error.
Err(_) =>
return QueryContext {
peer,
query_id,
result: QueryResult::Timeout,
},
// Writing message to substream failed.
Ok(Err(_)) => {
let _ = substream.close().await;
return QueryContext {
peer,
query_id,
result: QueryResult::SubstreamClosed,
};
}
Ok(Ok(())) => (),
};

match tokio::time::timeout(READ_TIMEOUT, substream.next()).await {
Err(_) => QueryContext {
Expand Down

0 comments on commit 9ad20ff

Please sign in to comment.