Skip to content

Commit

Permalink
fix leak in client pool
Browse files Browse the repository at this point in the history
  • Loading branch information
Boog900 committed May 19, 2024
1 parent 76c0ad3 commit 79a1412
Show file tree
Hide file tree
Showing 8 changed files with 58 additions and 26 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion p2p/cuprate-p2p/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ cuprate-helper = { path = "../../helper", features = ["asynch"] }
monero-serai = { workspace = true, features = ["std"] }

tower = { workspace = true, features = ["buffer"] }
tokio = { workspace = true, features = ["rt"] }
tokio = { workspace = true, features = ["rt", "rt-multi-thread"] }
rayon = { workspace = true }
tokio-util = { workspace = true }
tokio-stream = { workspace = true, features = ["sync", "time"] }
Expand All @@ -32,5 +32,7 @@ rand_distr = { workspace = true, features = ["std"] }
hex = { workspace = true, features = ["std"] }
tracing = { workspace = true, features = ["std", "attributes"] }

tracing-subscriber = "0.3.18"

[dev-dependencies]
cuprate-test-utils = { path = "../../test-utils" }
6 changes: 4 additions & 2 deletions p2p/cuprate-p2p/src/client_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use std::sync::Arc;

use dashmap::{DashMap, DashSet};
use tokio::sync::mpsc;
use tracing::{Instrument, Span};

use monero_p2p::{
client::{Client, InternalPeerID},
Expand All @@ -37,7 +38,6 @@ pub struct ClientPool<N: NetworkZone> {
/// by another thread. However, if the peer is in both here and `clients` it is definitely
/// an outbound peer.
outbound_clients: DashSet<InternalPeerID<N::Addr>>,

/// A channel to send new peer ids down to monitor for disconnect.
new_connection_tx: mpsc::UnboundedSender<(ConnectionHandle, InternalPeerID<N::Addr>)>,
}
Expand All @@ -53,7 +53,9 @@ impl<N: NetworkZone> ClientPool<N> {
new_connection_tx: tx,
});

tokio::spawn(disconnect_monitor::disconnect_monitor(rx, pool.clone()));
tokio::spawn(
disconnect_monitor::disconnect_monitor(rx, pool.clone()).instrument(Span::current()),
);

pool
}
Expand Down
13 changes: 12 additions & 1 deletion p2p/cuprate-p2p/src/client_pool/disconnect_monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@ pub async fn disconnect_monitor<N: NetworkZone>(
mut new_connection_rx: mpsc::UnboundedReceiver<(ConnectionHandle, InternalPeerID<N::Addr>)>,
client_pool: Arc<ClientPool<N>>,
) {
// We need to hold a weak reference otherwise the client pool and this would cause a circular reference
// which means the pool would be leaked.
let weak_client_pool = Arc::downgrade(&client_pool);
drop(client_pool);

tracing::info!("Starting peer disconnect monitor.");

let mut futs: FuturesUnordered<PeerDisconnectFut<N>> = FuturesUnordered::new();
Expand All @@ -39,7 +44,13 @@ pub async fn disconnect_monitor<N: NetworkZone>(
}
Some(peer_id) = futs.next() => {
tracing::debug!("{peer_id} has disconnected, removing from client pool.");
client_pool.remove_client(&peer_id);
let Some(pool) = weak_client_pool.upgrade() else {
tracing::info!("Peer disconnect monitor shutting down.");
return;
};

pool.remove_client(&peer_id);
drop(pool);
}
else => {
tracing::info!("Peer disconnect monitor shutting down.");
Expand Down
15 changes: 9 additions & 6 deletions p2p/cuprate-p2p/src/connection_maintainer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use tokio::{
time::{sleep, timeout},
};
use tower::{Service, ServiceExt};
use tracing::instrument;
use tracing::{instrument, Instrument, Span};

use monero_p2p::{
client::{Client, ConnectRequest, HandshakeError},
Expand Down Expand Up @@ -149,7 +149,7 @@ where
}

/// Connects to a given outbound peer.
#[instrument(level = "info", skip(self, permit), fields(%addr))]
#[instrument(level = "info", skip_all)]
async fn connect_to_outbound_peer(&mut self, permit: OwnedSemaphorePermit, addr: N::Addr) {
let client_pool = self.client_pool.clone();
let connection_fut = self
Expand All @@ -159,11 +159,14 @@ where
.expect("Connector had an error in `poll_ready`")
.call(ConnectRequest { addr, permit });

tokio::spawn(async move {
if let Ok(Ok(peer)) = timeout(HANDSHAKE_TIMEOUT, connection_fut).await {
client_pool.add_new_client(peer);
tokio::spawn(
async move {
if let Ok(Ok(peer)) = timeout(HANDSHAKE_TIMEOUT, connection_fut).await {
client_pool.add_new_client(peer);
}
}
});
.instrument(Span::current()),
);
}

/// Handles a request from the peer set for more peers.
Expand Down
2 changes: 1 addition & 1 deletion p2p/cuprate-p2p/src/constants.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::time::Duration;

/// The timeout we set on handshakes.
pub(crate) const HANDSHAKE_TIMEOUT: Duration = Duration::from_secs(30);
pub(crate) const HANDSHAKE_TIMEOUT: Duration = Duration::from_secs(15);

/// The maximum amount of connections to make to seed nodes for when we need peers.
pub(crate) const MAX_SEED_CONNECTIONS: usize = 3;
Expand Down
18 changes: 11 additions & 7 deletions p2p/cuprate-p2p/src/inbound_server.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! # Inbound Server
//!
//! This module contains the inbound connection server, which listens for inbound connections, gives
//! them to the handshake service and then adds them to the client pool.
//! them to the handshaker service and then adds them to the client pool.
use std::{pin::pin, sync::Arc};

use futures::StreamExt;
Expand All @@ -12,18 +12,19 @@ use monero_p2p::{
use tokio::time::sleep;
use tokio::{sync::Semaphore, time::timeout};
use tower::{Service, ServiceExt};
use tracing::instrument;
use tracing::{instrument, Instrument, Span};

use crate::constants::INBOUND_CONNECTION_COOL_DOWN;
use crate::{client_pool::ClientPool, constants::HANDSHAKE_TIMEOUT, P2PConfig};

#[instrument(level = "info", skip_all)]
pub async fn inbound_server<N: NetworkZone, HS>(
pub async fn inbound_server<N, HS>(
client_pool: Arc<ClientPool<N>>,
mut handshaker: HS,
config: P2PConfig<N>,
) -> Result<(), tower::BoxError>
where
N: NetworkZone,
HS: Service<DoHandshakeRequest<N>, Response = Client<N>, Error = HandshakeError>
+ Send
+ 'static,
Expand Down Expand Up @@ -67,11 +68,14 @@ where

let cloned_pool = client_pool.clone();

tokio::spawn(async move {
if let Ok(Ok(peer)) = timeout(HANDSHAKE_TIMEOUT, fut).await {
cloned_pool.add_new_client(peer);
tokio::spawn(
async move {
if let Ok(Ok(peer)) = timeout(HANDSHAKE_TIMEOUT, fut).await {
cloned_pool.add_new_client(peer);
}
}
});
.instrument(Span::current()),
);
} else {
tracing::debug!("No permit free for incoming connection.");
// TODO: listen for if the peer is just trying to ping us to see if we are reachable.
Expand Down
25 changes: 17 additions & 8 deletions p2p/cuprate-p2p/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
use std::sync::Arc;
use tokio::sync::{mpsc, watch};
use tower::buffer::Buffer;
use tracing::instrument;
use tracing::{instrument, Instrument, Span};

use monero_p2p::{CoreSyncSvc, NetworkZone, PeerRequestHandler};

Expand All @@ -21,7 +21,13 @@ use crate::connection_maintainer::MakeConnectionRequest;
pub use config::P2PConfig;
use monero_p2p::client::Connector;

#[instrument(level="warn", name="net", skip_all, fields(zone=N::NAME))]
/// Initializes the P2P [`NetworkInterface`] for a specific [`NetworkZone`].
///
/// This function starts all the tasks to maintain connections/ accept connections/ make connections.
///
/// To use you must provide, a peer request handler, which is given to each connection and a core sync service
/// which keeps track of the sync state of our node.
#[instrument(level="debug", name="net", skip_all, fields(zone=N::NAME))]
pub async fn initialize_network<N, R, CS>(
peer_req_handler: R,
core_sync_svc: CS,
Expand Down Expand Up @@ -88,12 +94,15 @@ where
outbound_connector,
);

tokio::spawn(outbound_connection_maintainer.run());
tokio::spawn(inbound_server::inbound_server(
client_pool.clone(),
inbound_handshaker,
config,
));
tokio::spawn(
outbound_connection_maintainer
.run()
.instrument(Span::current()),
);
tokio::spawn(
inbound_server::inbound_server(client_pool.clone(), inbound_handshaker, config)
.instrument(Span::current()),
);

Ok(NetworkInterface {
pool: client_pool,
Expand Down

0 comments on commit 79a1412

Please sign in to comment.