Skip to content

Commit

Permalink
add an inbound connection server
Browse files Browse the repository at this point in the history
  • Loading branch information
Boog900 committed May 19, 2024
1 parent f818b6a commit 2fd74de
Show file tree
Hide file tree
Showing 9 changed files with 169 additions and 30 deletions.
4 changes: 1 addition & 3 deletions p2p/address-book/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,7 @@
//!
use std::{io::ErrorKind, path::PathBuf, time::Duration};

use tower::buffer::Buffer;

use monero_p2p::{services::AddressBookRequest, NetworkZone};
use monero_p2p::NetworkZone;

mod book;
mod peer_list;
Expand Down
6 changes: 6 additions & 0 deletions p2p/cuprate-p2p/src/constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,9 @@ pub(crate) const MAX_TXS_IN_BROADCAST_CHANNEL: usize = 50;

/// The durations of a short ban.
pub(crate) const SHORT_BAN: Duration = Duration::from_secs(60 * 10);

/// The time to sleep after an inbound connection comes in.
///
/// This is a safety measure to prevent Cuprate from getting spammed with a load of inbound connections.
/// TODO: it might be a good idea to make this configurable.
pub(crate) const INBOUND_CONNECTION_COOL_DOWN: Duration = Duration::from_millis(750);
84 changes: 84 additions & 0 deletions p2p/cuprate-p2p/src/inbound_server.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
//! # Inbound Server
//!
//! This module contains the inbound connection server, which listens for inbound connections, gives
//! them to the handshake service and then adds them to the client pool.
use std::{pin::pin, sync::Arc};

use futures::StreamExt;
use monero_p2p::{
client::{Client, DoHandshakeRequest, HandshakeError, InternalPeerID},
ConnectionDirection, NetworkZone,
};
use tokio::time::sleep;
use tokio::{sync::Semaphore, time::timeout};
use tower::{Service, ServiceExt};
use tracing::instrument;

use crate::constants::INBOUND_CONNECTION_COOL_DOWN;
use crate::{client_pool::ClientPool, constants::HANDSHAKE_TIMEOUT, P2PConfig};

#[instrument(level = "info", skip_all)]
pub async fn inbound_server<N: NetworkZone, HS>(
client_pool: Arc<ClientPool<N>>,
mut handshaker: HS,
config: P2PConfig<N>,
) -> Result<(), tower::BoxError>
where
HS: Service<DoHandshakeRequest<N>, Response = Client<N>, Error = HandshakeError>
+ Send
+ 'static,
HS::Future: Send + 'static,
{
let Some(server_config) = config.server_config else {
tracing::warn!("No inbound server config provided, not listening for inbound connections.");
return Ok(());
};

tracing::info!("Starting inbound connection server");

let listener = N::incoming_connection_listener(server_config, config.p2p_port)
.await
.inspect_err(|e| tracing::warn!("Failed to start inbound server: {e}"))?;

let mut listener = pin!(listener);

let semaphore = Arc::new(Semaphore::new(config.max_inbound_connections));

while let Some(connection) = listener.next().await {
let Ok((addr, peer_stream, peer_sink)) = connection else {
continue;
};

let addr = match addr {
Some(addr) => InternalPeerID::KnownAddr(addr),
None => InternalPeerID::Unknown(rand::random()),
};

if let Ok(permit) = semaphore.clone().try_acquire_owned() {
tracing::debug!("Permit free for incoming connection, attempting handshake.");

let fut = handshaker.ready().await?.call(DoHandshakeRequest {
addr,
peer_stream,
peer_sink,
direction: ConnectionDirection::InBound,
permit,
});

let cloned_pool = client_pool.clone();

tokio::spawn(async move {
if let Ok(Ok(peer)) = timeout(HANDSHAKE_TIMEOUT, fut).await {
cloned_pool.add_new_client(peer);
}
});
} else {
tracing::debug!("No permit free for incoming connection.");
// TODO: listen for if the peer is just trying to ping us to see if we are reachable.
}

sleep(INBOUND_CONNECTION_COOL_DOWN).await;
}

Ok(())
}
69 changes: 56 additions & 13 deletions p2p/cuprate-p2p/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,35 +8,47 @@
#![allow(dead_code)]

use std::sync::Arc;
use tokio::sync::watch;
use tokio::sync::{mpsc, watch};
use tower::buffer::Buffer;
use tracing::instrument;

use monero_p2p::NetworkZone;
use monero_p2p::{CoreSyncSvc, NetworkZone, PeerRequestHandler};

mod broadcast;
mod client_pool;
pub mod config;
pub mod connection_maintainer;
mod constants;
mod inbound_server;
mod sync_states;

use crate::connection_maintainer::MakeConnectionRequest;
pub use config::P2PConfig;
use monero_p2p::client::Connector;

#[instrument(level="warn", name="net", skip_all, fields(zone=N::NAME))]
pub async fn initialize_network<N, R, CS>(
peer_req_handler: R,
core_sync_svc: CS,
config: P2PConfig<N>,
) -> Result<NetworkInterface<N>, tower::BoxError>
where
N: NetworkZone,
R: PeerRequestHandler + Clone,
CS: CoreSyncSvc + Clone,
{
let address_book = monero_address_book::init_address_book(config.address_book_config).await?;
let address_book =
monero_address_book::init_address_book(config.address_book_config.clone()).await?;
let address_book = Buffer::new(
address_book,
config.max_inbound_connections + config.outbound_connections,
);

let (sync_states_svc, top_block_watcher) = sync_states::PeerSyncSvc::new();
let (sync_states_svc, top_block_watch) = sync_states::PeerSyncSvc::new();
let sync_states_svc = Buffer::new(
sync_states_svc,
config.max_inbound_connections + config.outbound_connections,
);

// Use the default config. Changing the defaults effects tx fluff times, which could effect D++ so for now don't allow changing
// this.
Expand All @@ -46,34 +58,65 @@ where
let mut basic_node_data = config.basic_node_data();

if !N::CHECK_NODE_ID {
// TODO: make sure this is the value monerod sets for anonn networks.
// TODO: make sure this is the value monerod sets for anon networks.
basic_node_data.peer_id = 1;
}

let outbound_handshaker = monero_p2p::client::HandShaker::new(
address_book.clone(),
sync_states_svc,
core_sync_svc,
peer_req_handler,
sync_states_svc.clone(),
core_sync_svc.clone(),
peer_req_handler.clone(),
outbound_mkr,
basic_node_data,
basic_node_data.clone(),
);

let inbound_handshaker = monero_p2p::client::HandShaker::new(
address_book.clone(),
sync_states_svc,
core_sync_svc,
core_sync_svc.clone(),
peer_req_handler,
inbound_mkr,
basic_node_data,
);

let outb


let client_pool = client_pool::ClientPool::new();

let (make_connection_tx, make_connection_rx) = mpsc::channel(3);

let outbound_connector = Connector::new(outbound_handshaker);
let outbound_connection_maintainer = connection_maintainer::OutboundConnectionKeeper::new(
config.clone(),
client_pool.clone(),
make_connection_rx,
address_book.clone(),
outbound_connector,
);

tokio::spawn(outbound_connection_maintainer.run());
tokio::spawn(inbound_server::inbound_server(
client_pool.clone(),
inbound_handshaker,
config,
));

Ok(NetworkInterface {
pool: client_pool,
broadcast_svc,
top_block_watch,
make_connection_tx,
})
}

/// The interface to Monero's P2P network on a certain [`NetworkZone`].
pub struct NetworkInterface<N: NetworkZone> {
/// A pool of free connected peers.
pool: Arc<client_pool::ClientPool<N>>,
/// A [`Service`](tower::Service) that allows broadcasting to all connected peers.
broadcast_svc: broadcast::BroadcastSvc<N>,
/// A [`watch`] channel that contains the highest seen cumulative difficulty and other info
/// on that claimed chain.
top_block_watch: watch::Receiver<sync_states::NewSyncInfo>,
/// A channel to request extra connections.
make_connection_tx: mpsc::Sender<MakeConnectionRequest>,
}
9 changes: 5 additions & 4 deletions p2p/monero-p2p/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,18 +130,19 @@ pub trait NetworkZone: Clone + Copy + Send + 'static {
/// The sink (outgoing data) type for this network.
type Sink: Sink<LevinMessage<Message>, Error = BucketError> + Unpin + Send + 'static;
/// The inbound connection listener for this network.
type Listener: Stream<
Item = Result<(Option<Self::Addr>, Self::Stream, Self::Sink), std::io::Error>,
>;
type Listener: Stream<Item = Result<(Option<Self::Addr>, Self::Stream, Self::Sink), std::io::Error>>
+ Send
+ 'static;
/// Config used to start a server which listens for incoming connections.
type ServerCfg;
type ServerCfg: Clone + Debug + Send + 'static;

async fn connect_to_peer(
addr: Self::Addr,
) -> Result<(Self::Stream, Self::Sink), std::io::Error>;

async fn incoming_connection_listener(
config: Self::ServerCfg,
port: u16,
) -> Result<Self::Listener, std::io::Error>;
}

Expand Down
6 changes: 4 additions & 2 deletions p2p/monero-p2p/src/network_zones/clear.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,9 @@ impl NetZoneAddress for SocketAddr {
}
}

#[derive(Debug, Clone)]
pub struct ClearNetServerCfg {
pub addr: SocketAddr,
pub ip: IpAddr,
}

#[derive(Clone, Copy)]
Expand Down Expand Up @@ -80,8 +81,9 @@ impl NetworkZone for ClearNet {

async fn incoming_connection_listener(
config: Self::ServerCfg,
port: u16,
) -> Result<Self::Listener, std::io::Error> {
let listener = TcpListener::bind(config.addr).await?;
let listener = TcpListener::bind(SocketAddr::new(config.ip, port)).await?;
Ok(InBoundStream { listener })
}
}
Expand Down
7 changes: 4 additions & 3 deletions p2p/monero-p2p/tests/fragmented_handshake.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,9 @@ impl NetworkZone for FragNet {

async fn incoming_connection_listener(
config: Self::ServerCfg,
port: u16,
) -> Result<Self::Listener, std::io::Error> {
let listener = TcpListener::bind(config.addr).await?;
let listener = TcpListener::bind(SocketAddr::new(config.ip, port)).await?;
Ok(InBoundStream { listener })
}
}
Expand Down Expand Up @@ -194,9 +195,9 @@ async fn fragmented_handshake_monerod_to_cuprate() {
our_basic_node_data,
);

let addr = "127.0.0.1:18081".parse().unwrap();
let ip = "127.0.0.1".parse().unwrap();

let mut listener = FragNet::incoming_connection_listener(ClearNetServerCfg { addr })
let mut listener = FragNet::incoming_connection_listener(ClearNetServerCfg { ip }, 18081)
.await
.unwrap();

Expand Down
4 changes: 2 additions & 2 deletions p2p/monero-p2p/tests/handshake.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,9 +174,9 @@ async fn handshake_monerod_to_cuprate() {
our_basic_node_data,
);

let addr = "127.0.0.1:18081".parse().unwrap();
let ip = "127.0.0.1".parse().unwrap();

let mut listener = ClearNet::incoming_connection_listener(ClearNetServerCfg { addr })
let mut listener = ClearNet::incoming_connection_listener(ClearNetServerCfg { ip }, 18081)
.await
.unwrap();

Expand Down
10 changes: 7 additions & 3 deletions test-utils/src/test_netzone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,9 @@ impl<const ALLOW_SYNC: bool, const DANDELION_PP: bool, const CHECK_NODE_ID: bool
type Listener = Pin<
Box<
dyn Stream<
Item = Result<(Option<Self::Addr>, Self::Stream, Self::Sink), std::io::Error>,
>,
Item = Result<(Option<Self::Addr>, Self::Stream, Self::Sink), std::io::Error>,
> + Send
+ 'static,
>,
>;
type ServerCfg = ();
Expand All @@ -97,7 +98,10 @@ impl<const ALLOW_SYNC: bool, const DANDELION_PP: bool, const CHECK_NODE_ID: bool
unimplemented!()
}

async fn incoming_connection_listener(_: Self::ServerCfg) -> Result<Self::Listener, Error> {
async fn incoming_connection_listener(
_: Self::ServerCfg,
_: u16,
) -> Result<Self::Listener, Error> {
unimplemented!()
}
}

0 comments on commit 2fd74de

Please sign in to comment.