Skip to content
This repository has been archived by the owner on Dec 17, 2024. It is now read-only.

Commit

Permalink
Change the way new advertised peer are connected. (#158)
Browse files Browse the repository at this point in the history
* change connection  process for advised peers

* add some test

* correct some comments

* Apply suggestions from code review

Co-authored-by: Tuomas Mäkinen <[email protected]>

---------

Co-authored-by: Tuomas Mäkinen <[email protected]>
  • Loading branch information
musitdev and tuommaki authored Mar 20, 2024
1 parent b20da9e commit ed90974
Show file tree
Hide file tree
Showing 2 changed files with 98 additions and 72 deletions.
13 changes: 4 additions & 9 deletions crates/node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -272,16 +272,11 @@ async fn run(config: Arc<Config>) -> Result<()> {
match addr.to_socket_addrs() {
Ok(mut socket_iter) => {
if let Some(peer) = socket_iter.next() {
match p2p.node().connect(peer).await {
Ok(_) => {
connected_nodes += 1;
continue;
}
Err(err) => {
tracing::warn!("failed to connect to {}: {}", peer, err);
}
let (connected, fail) = p2p.connect(peer).await;
connected_nodes += connected.len();
if !fail.is_empty() {
tracing::info!("Peer connection, fail to connect to these peers:{fail:?}");
}
break;
}
}
Err(err) => {
Expand Down
157 changes: 94 additions & 63 deletions crates/node/src/networking/p2p/pea2pea.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ pub struct P2P {
// This mapping is needed for proper cleanup on OnDisconnect.
peer_addr_mapping: Arc<tokio::sync::RwLock<HashMap<SocketAddr, SocketAddr>>>,
peer_list: Arc<tokio::sync::RwLock<BTreeSet<SocketAddr>>>,
current_connecting_peer_list: Arc<tokio::sync::RwLock<BTreeSet<SocketAddr>>>,
// Contains corrected peers that are used for asset file download.
pub peer_http_port_list: Arc<tokio::sync::RwLock<HashMap<SocketAddr, Option<u16>>>>,
connect_lock: Arc<tokio::sync::Mutex<()>>,
Expand Down Expand Up @@ -89,6 +90,7 @@ impl P2P {
psk: psk.to_vec(),
public_node_key,
peer_list: Default::default(),
current_connecting_peer_list: Default::default(),
peer_addr_mapping: Default::default(),
peer_http_port_list,
connect_lock: Arc::new(tokio::sync::Mutex::new(())),
Expand Down Expand Up @@ -191,6 +193,36 @@ impl P2P {

Ok(())
}

// Connect to peer at `addr`. Subsequent connections to newly discovered nodes are done in sequence, one at a time.
// Peer can be fail because they was 2 simultaneous connection. One is fail and the orher is ok.
pub async fn connect(&self, addr: SocketAddr) -> (BTreeSet<SocketAddr>, BTreeSet<SocketAddr>) {
let mut connected_peers = BTreeSet::new();
let mut failed_peers = BTreeSet::new();
let mut peer_to_connect_list = vec![addr];
while !peer_to_connect_list.is_empty() {
// Clear new peer list before connect
self.current_connecting_peer_list.write().await.clear();
let addr = peer_to_connect_list.pop().unwrap(); //unwrap tested in the while.
match self.node.connect(addr).await {
Ok(_) => {
connected_peers.insert(addr);
{
let peers = self.current_connecting_peer_list.write().await.clone();
peer_to_connect_list.extend(peers.iter());
};

// Only add peer that are connected.
self.peer_list.write().await.insert(addr);
}
Err(err) => {
tracing::error!("An error occurs during peer:{addr} connection: {err}",);
failed_peers.insert(addr);
}
};
}
(connected_peers, failed_peers)
}
}

#[async_trait::async_trait]
Expand Down Expand Up @@ -359,27 +391,12 @@ impl Handshake for P2P {
local_diff.remove(&local_p2p_addr);
local_diff.remove(remote_peer_p2p_addr);

let node = self.node();
// Connect to other not connected peers.
for addr in local_diff {
tokio::spawn({
let node = node.clone();
let peer_list = self.peer_list.clone();
async move {
tracing::debug!("connect to {}", &addr);

// XXX: If `node.connect(addr)` returns an error, it's omitted because:
// 1.) It's already logged.
// 2.) It often happens because there is already a connection between the 2 peers.
match node.connect(addr).await {
Ok(_) => {
peer_list.write().await.insert(addr);
tracing::debug!("connected to {}", &addr);
}
Err(err) => tracing::error!("failed to connect to {}: {}", &addr, err),
};
}
});
//add new peer to node list
{
self.current_connecting_peer_list
.write()
.await
.append(&mut local_diff);
}

self.peer_http_port_list
Expand Down Expand Up @@ -572,27 +589,20 @@ mod tests {
let (peer2, tx_sender2, mut tx_receiver2) = create_faulty_peer("peer2").await;
let (peer3, tx_sender3, mut tx_receiver3) = create_peer("peer3").await;

tracing::debug!("start listening");
peer1.node().start_listening().await.expect("peer1 listen");
peer2.node().start_listening().await.expect("peer2 listen");
peer3.node().start_listening().await.expect("peer3 listen");

tracing::debug!("connect peer2 to peer1");
peer2
.node()
.connect(peer1.node().listening_addr().unwrap())
.await
.unwrap();
let (new_peers, fail_peers) = peer2.connect(peer1.node().listening_addr().unwrap()).await;

assert_eq!(peer1.peer_http_port_list.read().await.len(), 1);
assert_eq!(peer2.peer_http_port_list.read().await.len(), 1);
assert_eq!(new_peers.len(), 1);
assert_eq!(fail_peers.len(), 0);

tracing::debug!("connect peer3 to peer1");
peer3
.node()
.connect(peer1.node().listening_addr().unwrap())
.await
.unwrap();
let (new_peers, fail_peers) = peer3.connect(peer1.node().listening_addr().unwrap()).await;

assert_eq!(new_peers.len(), 1);
assert_eq!(fail_peers.len(), 1);
assert_eq!(fail_peers.first(), Some(&"128.0.0.1:0".parse().unwrap()));

// Wait for the connection fail timeout.
tokio::time::sleep(tokio::time::Duration::from_millis(2000)).await;
Expand Down Expand Up @@ -626,57 +636,81 @@ mod tests {
}
}

// 3 peers
// 5 peers
// Peer2 connect to Peer1.
// Peer3 connect to Peer1.
// Peer3 automatically connect to Peer2.
// Peer4 connect to Peer3.
// Peer4 automatically connect to Peer1, Peer2.
// Peer5 connect to Peer4.
// Peer5 automatically connect to Peer1, Peer2, Peer3.
#[tokio::test]
async fn test_peer_list_inter_connection() {
//start_logger(LevelFilter::ERROR);

let (peer1, tx_sender1, mut tx_receiver1) = create_peer("peer1").await;
let (peer2, tx_sender2, mut tx_receiver2) = create_peer("peer2").await;
let (peer3, tx_sender3, mut tx_receiver3) = create_peer("peer3").await;
let (peer4, tx_sender4, mut tx_receiver4) = create_peer("peer4").await;
let (peer5, tx_sender5, mut tx_receiver5) = create_peer("peer5").await;

let bind_add = peer1.node().start_listening().await.expect("peer1 listen");
let bind_add = peer2.node().start_listening().await.expect("peer2 listen");
let bind_add = peer3.node().start_listening().await.expect("peer3 listen");
let bind_add = peer4.node().start_listening().await.expect("peer4 listen");
let bind_add = peer5.node().start_listening().await.expect("peer5 listen");

peer2
.node()
.connect(peer1.node().listening_addr().unwrap())
.await
.unwrap();
let (new_peers, fail_peers) = peer2.connect(peer1.node().listening_addr().unwrap()).await;
assert_eq!(new_peers.len(), 1);

assert_eq!(peer1.peer_http_port_list.read().await.len(), 1);
assert_eq!(peer2.peer_http_port_list.read().await.len(), 1);

peer3
.node()
.connect(peer1.node().listening_addr().unwrap())
.await
.unwrap();
let (new_peers, fail_peers) = peer3.connect(peer1.node().listening_addr().unwrap()).await;
assert_eq!(new_peers.len(), 2);

let (new_peers, fail_peers) = peer4.connect(peer3.node().listening_addr().unwrap()).await;
assert_eq!(new_peers.len(), 3);

let (new_peers, fail_peers) = peer5.connect(peer4.node().listening_addr().unwrap()).await;
assert_eq!(new_peers.len(), 4);

tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;

assert_eq!(peer1.peer_http_port_list.read().await.len(), 2);
assert_eq!(peer2.peer_http_port_list.read().await.len(), 2);
assert_eq!(peer3.peer_http_port_list.read().await.len(), 2);
assert_eq!(peer1.peer_http_port_list.read().await.len(), 4);
assert_eq!(peer2.peer_http_port_list.read().await.len(), 4);
assert_eq!(peer3.peer_http_port_list.read().await.len(), 4);
assert_eq!(peer4.peer_http_port_list.read().await.len(), 4);
assert_eq!(peer5.peer_http_port_list.read().await.len(), 4);

// Verify connections by sending Tx to all peers.
let tx = new_tx();
tx_sender2.send(tx.clone()).unwrap();
let recv_tx = tx_receiver1.recv().await.expect("peer1 recv");

assert_eq!(into_receive(tx.clone()), recv_tx.0);

let recv_tx = tx_receiver3.recv().await.expect("peer3 recv");
assert_eq!(into_receive(tx), recv_tx.0);
assert_eq!(into_receive(tx.clone()), recv_tx.0);

let recv_tx = tx_receiver4.recv().await.expect("peer4 recv");
assert_eq!(into_receive(tx.clone()), recv_tx.0);

let recv_tx = tx_receiver5.recv().await.expect("peer5 recv");
assert_eq!(into_receive(tx.clone()), recv_tx.0);

let tx = new_tx();
tx_sender3.send(tx.clone()).unwrap();
tx_sender5.send(tx.clone()).unwrap();

let recv_tx = tx_receiver1.recv().await.expect("peer1 recv");
assert_eq!(into_receive(tx.clone()), recv_tx.0);

let recv_tx = tx_receiver2.recv().await.expect("peer2 recv");
assert_eq!(into_receive(tx.clone()), recv_tx.0);

let recv_tx = tx_receiver3.recv().await.expect("peer3 recv");
assert_eq!(into_receive(tx.clone()), recv_tx.0);

let recv_tx = tx_receiver4.recv().await.expect("peer4 recv");
assert_eq!(into_receive(tx), recv_tx.0);
}

Expand All @@ -693,11 +727,10 @@ mod tests {
let (peer2, tx_sender2, mut tx_receiver2) = create_peer("peer2").await;
peer2.node().start_listening().await.expect("peer2 listen");

peer1
.node()
.connect(peer2.node().listening_addr().unwrap())
.await
.unwrap();
let (new_peers, fail_peers) =
peer1.connect(peer2.node().listening_addr().unwrap()).await;
assert_eq!(new_peers.len(), 1);
assert_eq!(fail_peers.len(), 0);
assert_eq!(peer1.peer_http_port_list.read().await.len(), 1);
assert_eq!(peer2.peer_http_port_list.read().await.len(), 1);

Expand Down Expand Up @@ -741,11 +774,9 @@ mod tests {
peer1.node().start_listening().await.expect("peer1 listen");
peer2.node().start_listening().await.expect("peer2 listen");

peer2
.node()
.connect(peer1.node().listening_addr().unwrap())
.await
.unwrap();
let (new_peers, fail_peers) = peer2.connect(peer1.node().listening_addr().unwrap()).await;
assert_eq!(new_peers.len(), 1);
assert_eq!(fail_peers.len(), 0);

let tx = new_tx();
tx_sender1.send(tx.clone()).unwrap();
Expand Down

0 comments on commit ed90974

Please sign in to comment.