Skip to content
This repository has been archived by the owner on Dec 17, 2024. It is now read-only.

Add bootstrap node and peer list exchange and automatique connection #40

Merged
merged 6 commits into from
Jan 21, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
272 changes: 268 additions & 4 deletions crates/node/src/networking/p2p.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,24 @@
use std::{collections::HashMap, io, net::SocketAddr, str, sync::Arc};
use std::{
collections::{BTreeSet, HashMap},
io,
net::SocketAddr,
str,
sync::Arc,
};
use tokio::io::AsyncReadExt;
use tokio::io::AsyncWriteExt;

use super::noise;
use bytes::{Bytes, BytesMut};
use eyre::Result;
use gevulot_node::types::Transaction;
use parking_lot::RwLock;
use pea2pea::{
protocols::{Handshake, Reading, Writing},
protocols::{Handshake, OnDisconnect, Reading, Writing},
Config, Connection, ConnectionSide, Node, Pea2Pea,
};
use sha3::{Digest, Sha3_256};

use super::noise;

#[async_trait::async_trait]
pub trait TxHandler: Send + Sync {
async fn recv_tx(&self, tx: Transaction) -> Result<()>;
Expand All @@ -38,6 +45,9 @@ pub struct P2P {
noise_states: Arc<RwLock<HashMap<SocketAddr, noise::State>>>,
tx_handler: Arc<tokio::sync::RwLock<Arc<dyn TxHandler>>>,
psk: Vec<u8>,
peer_list: Arc<RwLock<BTreeSet<SocketAddr>>>,
//Map to connection local addr notified on_disconnect and the peer connection addr (peer_list).
peer_addr_mapping: Arc<RwLock<HashMap<SocketAddr, SocketAddr>>>,
}

impl Pea2Pea for P2P {
Expand Down Expand Up @@ -67,12 +77,15 @@ impl P2P {
noise_states: Default::default(),
tx_handler: Arc::new(tokio::sync::RwLock::new(Arc::new(BlackholeTxHandler {}))),
psk: psk.to_vec(),
peer_list: Default::default(),
peer_addr_mapping: Default::default(),
};

// Enable node functionalities.
instance.enable_handshake().await;
instance.enable_reading().await;
instance.enable_writing().await;
instance.enable_disconnect().await;

instance
}
Expand Down Expand Up @@ -111,6 +124,118 @@ impl Handshake for P2P {
// save the noise state to be reused by Reading and Writing
self.noise_states.write().insert(conn.addr(), noise_state);

//exchange peer list

let node_conn_side = !conn.side();
let stream = self.borrow_stream(&mut conn);

let local_bind_addr = self.node.listening_addr().unwrap();
let peer_list_bytes: Vec<u8> = {
let peer_list: &mut BTreeSet<SocketAddr> = &mut self.peer_list.write();
peer_list.insert(local_bind_addr); //add it if not present.
bincode::serialize(peer_list).map_err(|err| {
std::io::Error::new(std::io::ErrorKind::Other, format!("serialize error:{err}"))
})?
};

let (distant_peer_list, distant_listening_addr) = match node_conn_side {
ConnectionSide::Initiator => {
//on_disconnect doesn't notify with the bind port but the connect port.
//can't be use to open a connection.
//So the connecting node notify it's bind address.
let bind_addr_bytes = bincode::serialize(&self.node.listening_addr().unwrap())
.map_err(|err| {
std::io::Error::new(
std::io::ErrorKind::Other,
format!("serialize error:{err}"),
)
})?;
stream.write_u32(bind_addr_bytes.len() as u32).await?;
stream.write_all(&bind_addr_bytes).await?;

//send peer list
stream.write_u32(peer_list_bytes.len() as u32).await?;
stream.write_all(&peer_list_bytes).await?;

// receive the peer list
let buffer_len = stream.read_u32().await? as usize;
//TODO validate buffer lengh
let mut buffer = vec![0; buffer_len];
stream.read_exact(&mut buffer).await?;
let distant_peer_list: BTreeSet<SocketAddr> = bincode::deserialize(&buffer)
.map_err(|err| {
std::io::Error::new(
std::io::ErrorKind::Other,
format!("deserialize error:{err}"),
)
})?;

self.peer_addr_mapping
.write()
.insert(stream.peer_addr().unwrap(), stream.peer_addr().unwrap());

(distant_peer_list, stream.peer_addr().unwrap())
}
ConnectionSide::Responder => {
//receive the connecting node addr
let buffer_len = stream.read_u32().await? as usize;
let mut buffer = vec![0; buffer_len];
stream.read_exact(&mut buffer).await?;
let distant_listening_addr: SocketAddr =
bincode::deserialize(&buffer).map_err(|err| {
std::io::Error::new(
std::io::ErrorKind::Other,
format!("deserialize error:{err}"),
)
})?;
{
self.peer_list.write().insert(distant_listening_addr);
self.peer_addr_mapping
.write()
.insert(stream.peer_addr().unwrap(), distant_listening_addr);
}

// receive the peer list
let buffer_len = stream.read_u32().await? as usize;
let mut buffer = vec![0; buffer_len];
stream.read_exact(&mut buffer).await?;
let distant_peer_list: BTreeSet<SocketAddr> = bincode::deserialize(&buffer)
.map_err(|err| {
std::io::Error::new(
std::io::ErrorKind::Other,
format!("deserialize error:{err}"),
)
})?;

//send peer list
stream.write_u32(peer_list_bytes.len() as u32).await?;
stream.write_all(&peer_list_bytes).await?;

(distant_peer_list, distant_listening_addr)
}
};

//do peer comparition
let mut local_diff = {
let local_peer_list: &mut BTreeSet<SocketAddr> = &mut self.peer_list.write();
let local_diff: BTreeSet<SocketAddr> = distant_peer_list
.difference(local_peer_list)
.cloned()
.collect();

local_peer_list.append(&mut local_diff.iter().cloned().collect());
local_diff
};
local_diff.remove(&local_bind_addr);
local_diff.remove(&distant_listening_addr);

let node = self.node();
for addr in local_diff {
//the return error is not use because:
//already logged and mostly because there's double connection between 2 peers.
let _ = node.connect(addr).await;
}

Ok(conn)
}
}
Expand Down Expand Up @@ -147,6 +272,15 @@ impl Writing for P2P {
}
}

#[async_trait::async_trait]
impl OnDisconnect for P2P {
async fn on_disconnect(&self, addr: SocketAddr) {
if let Some(peer_conn_addr) = self.peer_addr_mapping.write().remove(&addr) {
self.peer_list.write().remove(&peer_conn_addr);
}
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down Expand Up @@ -175,8 +309,138 @@ mod tests {
}
}

#[tokio::test]
async fn test_peer_list_inter_connection() {
//start_logger(LevelFilter::ERROR);
tuommaki marked this conversation as resolved.
Show resolved Hide resolved

let (tx1, mut rx1) = mpsc::channel(1);
let (tx2, mut rx2) = mpsc::channel(1);
let (tx3, mut rx3) = mpsc::channel(1);
let (sink1, sink2, sink3) = (
Arc::new(Sink::new(Arc::new(tx1))),
Arc::new(Sink::new(Arc::new(tx2))),
Arc::new(Sink::new(Arc::new(tx3))),
);
let (peer1, peer2, peer3) = (
P2P::new("peer1", "127.0.0.1:0".parse().unwrap(), "secret passphrase").await,
P2P::new("peer2", "127.0.0.1:0".parse().unwrap(), "secret passphrase").await,
P2P::new("peer3", "127.0.0.1:0".parse().unwrap(), "secret passphrase").await,
);

tracing::debug!("start listening");
let bind_add = peer1.node().start_listening().await.expect("peer1 listen");
let bind_add = peer2.node().start_listening().await.expect("peer2 listen");
let bind_add = peer3.node().start_listening().await.expect("peer3 listen");

tracing::debug!("register tx handlers");
peer1.register_tx_handler(sink1.clone()).await;
peer2.register_tx_handler(sink2.clone()).await;
peer3.register_tx_handler(sink3.clone()).await;

tracing::debug!("connect peer2 to peer1");
peer2
.node()
.connect(peer1.node().listening_addr().unwrap())
.await
.unwrap();

tracing::debug!("connect peer3 to peer1");
peer3
.node()
.connect(peer1.node().listening_addr().unwrap())
.await
.unwrap();

tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;

tracing::debug!("send tx from peer2 to peer1 and peer3");
let tx = new_tx();
peer2.send_tx(&tx).await.unwrap();
tracing::debug!("recv tx on peer1 from peer2");
let recv_tx = rx1.recv().await.expect("sink recv");
assert_eq!(tx, recv_tx);
tracing::debug!("recv tx on peer3 from peer2");
let recv_tx = rx3.recv().await.expect("sink recv");
assert_eq!(tx, recv_tx);

let tx = new_tx();
tracing::debug!("send tx from peer3 to peer1 and peer2");
peer3.send_tx(&tx).await.unwrap();
tracing::debug!("recv tx on peer1 from peer3");
let recv_tx = rx1.recv().await.expect("sink recv");
assert_eq!(tx, recv_tx);
tracing::debug!("recv tx on peer2 from peer3");
let recv_tx = rx2.recv().await.expect("sink recv");
assert_eq!(tx, recv_tx);
}

#[tokio::test]
async fn test_two_peers_disconnect() {
//start_logger(LevelFilter::ERROR);
tuommaki marked this conversation as resolved.
Show resolved Hide resolved

let (tx1, mut rx1) = mpsc::channel(1);
let (tx2, mut rx2) = mpsc::channel(1);
let (sink1, sink2) = (
Arc::new(Sink::new(Arc::new(tx1))),
Arc::new(Sink::new(Arc::new(tx2))),
);

let peer1 = P2P::new("peer1", "127.0.0.1:0".parse().unwrap(), "secret passphrase").await;
peer1.node().start_listening().await.expect("peer1 listen");
peer1.register_tx_handler(sink1.clone()).await;

{
let peer2 =
P2P::new("peer2", "127.0.0.1:0".parse().unwrap(), "secret passphrase").await;
peer2.node().start_listening().await.expect("peer2 listen");

peer2.register_tx_handler(sink2.clone()).await;
tracing::debug!("Nodes init Done");

peer1
.node()
.connect(peer2.node().listening_addr().unwrap())
.await
.unwrap();

tracing::debug!("Nodes Connected");
tracing::debug!("send tx from peer1 to peer2");
let tx = new_tx();
peer1.send_tx(&tx).await.unwrap();
tracing::debug!("recv tx on peer2 from peer1");
let recv_tx = rx2.recv().await.expect("sink recv");
assert_eq!(tx, recv_tx);

let tx = new_tx();
tracing::debug!("send tx from peer2 to peer1");
peer2.send_tx(&tx).await.unwrap();
tracing::debug!("recv tx on peer1 from peer2");
let recv_tx = rx1.recv().await.expect("sink recv");
assert_eq!(tx, recv_tx);

let peers = peer2.node().connected_addrs();
for addr in peers {
peer2.node().disconnect(addr).await;
}
}

tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;

//simulate the silent node de-connection by dropping the node.
tracing::debug!("send tx from peer1 to disconnected peer2");
let tx = new_tx();
peer1.send_tx(&tx).await.unwrap();

tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;

assert_eq!(peer1.peer_list.read().len(), 1);
assert!(peer1.peer_addr_mapping.read().is_empty());
}

#[tokio::test]
async fn test_two_peers() {
//start_logger(LevelFilter::ERROR);
tuommaki marked this conversation as resolved.
Show resolved Hide resolved

let (tx1, mut rx1) = mpsc::channel(1);
let (tx2, mut rx2) = mpsc::channel(1);
let (sink1, sink2) = (
Expand Down