Skip to content

Commit

Permalink
Fix network bool
Browse files Browse the repository at this point in the history
  • Loading branch information
rustaceanrob committed Mar 25, 2024
1 parent c80f973 commit 474df15
Show file tree
Hide file tree
Showing 4 changed files with 287 additions and 52 deletions.
96 changes: 46 additions & 50 deletions examples/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,10 @@ async fn init_outbound_conn(mut sock: TcpStream) -> Result<(), Box<dyn std::erro
let (mut proxy_reader, mut proxy_writer) = sock.split();
let mut buf_reader = BufReader::new(&mut proxy_reader);
let mut buffer = Vec::new();
// only reads 4 bytes
let n = buf_reader.read_to_end(&mut buffer).await?;
println!("Bytes read from local connection: {n}");
let recv_magic: [u8; 4] = buffer[..4].try_into()?;
println!("Got magic: {}", hex::encode(recv_magic));
// not working
if M.to_bytes().ne(&recv_magic) {
return Err(Box::new(io::Error::new(
io::ErrorKind::Other,
Expand All @@ -64,9 +62,7 @@ async fn init_outbound_conn(mut sock: TcpStream) -> Result<(), Box<dyn std::erro
"Connections must open with Version message.",
)));
}
// let decoding = VersionMessage::consensus_decode(&mut cursor)?;
// println!("{:?}", decoding);
// let remote_addr = decoding.receiver.socket_addr()?;
let version = buffer.clone();
let payload = buffer[24..].to_vec();
let mut cursor = std::io::Cursor::new(payload);
let ver = VersionMessage::consensus_decode_from_finite_reader(&mut cursor)?;
Expand All @@ -76,63 +72,63 @@ async fn init_outbound_conn(mut sock: TcpStream) -> Result<(), Box<dyn std::erro
let handshake = initialize_v2_handshake(None)?;
println!("Initiating handshake.");
outbound
.write_all(handshake.message.clone().as_slice())
.write_all(&version)
.await?;
println!("Sent handshake to remote.");
let (mut remote_reader, mut remote_writer) = outbound.split();
let mut buf_reader = BufReader::new(&mut remote_reader);
let mut buffer = Vec::new();
println!("Reading handshake response from remote.");
let n = buf_reader.read_to_end(&mut buffer).await?;
println!("Bytes read from remote host: {n}");
println!("Completing handshake");
println!("{}", hex::encode(&buffer));
if n < 64 {
return Err(Box::new(io::Error::new(
io::ErrorKind::Other,
"Remote cannot perform V2 handshake. Disconnecting.",
)));
}
let finish_handshake = initiator_complete_v2_handshake(buffer, handshake, false)?;
println!("Remote handshake accepted. Sending garbage terminator.");
remote_writer.write_all(&finish_handshake.message).await?;
let mut packet_handler = finish_handshake.packet_handler;
let decrypt = packet_handler.clone();
let (tx, mut rx) = mpsc::channel::<ChannelMessage>(10);
let mut tx2 = tx.clone();
// then communicate as usual
tokio::spawn(async move {
match communicate_outbound(tx, outbound, decrypt).await {
Ok(()) => {
println!("Remote disconnected.");
}
Err(_) => {
println!("Error decrypting package from remote and writing to local.");
}
}
});
loop {
while let Some(message) = rx.recv().await {
match message {
Ok((message, destination)) => match destination {
SendTo::Remote => {}
SendTo::Local => {
println!("Passing message to local node.");
proxy_writer.write_all(&message).await?;
}
},
Err(e) => {
return Err(Box::new(io::Error::new(io::ErrorKind::Other, "")));
}
}
}
// let mut buffer = Vec::new();
// let n = proxy_reader.read_to_end(&mut buffer).await?;
// println!("Got a message from local.");
// if n == 0 {
// println!("Local node disconnected.");
// return Ok(());
// }
// let message = packet_handler.prepare_v2_packet(buffer, None, false)?;
// remote_writer.write_all(&message).await?;
}
// println!("Completing handshake.");
// let finish_handshake = initiator_complete_v2_handshake(buffer, handshake, false)?;
// remote_writer.write_all(&finish_handshake.message).await?;
// println!("Remote handshake accepted. Sending garbage terminator.");
// let mut packet_handler = finish_handshake.packet_handler;
// println!("Session ID: {:?}", hex::encode(packet_handler.session_id));
// println!("Their garbage terminator: {:?}", hex::encode(packet_handler.other_garbage_terminator));
// let mut buffer = Vec::new();
// remote_reader.read_to_end(&mut buffer).await?;
// println!("Received: {:?}", hex::encode(buffer));
Ok(())
// let decrypt = packet_handler.clone();
// let (tx, mut rx) = mpsc::channel::<ChannelMessage>(10);
// let mut tx2 = tx.clone();
// // then communicate as usual
// tokio::spawn(async move {
// match communicate_outbound(tx, outbound, decrypt).await {
// Ok(()) => {
// println!("Remote disconnected.");
// }
// Err(_) => {
// println!("Error decrypting package from remote and writing to local.");
// }
// }
// });
// loop {
// while let Some(message) = rx.recv().await {
// match message {
// Ok((message, destination)) => match destination {
// SendTo::Remote => {}
// SendTo::Local => {
// println!("Passing message to local node.");
// proxy_writer.write_all(&message).await?;
// }
// },
// Err(e) => {
// return Err(Box::new(io::Error::new(io::ErrorKind::Other, "")));
// }
// }
// }
// }
}

async fn communicate_outbound(
Expand Down
234 changes: 234 additions & 0 deletions examples/v1.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,234 @@
use bip324::{initialize_v2_handshake, initiator_complete_v2_handshake, PacketHandler};
use bitcoin::p2p::Magic;
use bitcoin::{
consensus::Decodable,
p2p::{message::RawNetworkMessage, message_network::VersionMessage},
};
use core::fmt;
use std::io;
use tokio::io::{AsyncReadExt, AsyncWriteExt, BufReader};
use tokio::net::{TcpListener, TcpStream};
use tokio::select;
use tokio::sync::{broadcast, mpsc};

const PROXY: &str = "127.0.0.1:1324";
const M: Magic = Magic::SIGNET;
// type ChannelMessage = Result<(Vec<u8>, SendTo), PeerError>;

// #[derive(Clone, Copy, Debug)]
// enum SendTo {
// Remote,
// Local,
// }

// #[derive(Clone, Copy, Debug)]
// enum PeerError {
// DecryptionFailure,
// BytesReadError,
// UnknownMessage,
// }

// impl fmt::Display for PeerError {
// fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
// match *self {
// PeerError::DecryptionFailure => write!(f, "Decryption failed"),
// PeerError::BytesReadError => write!(f, "Error occurred while reading bytes"),
// PeerError::UnknownMessage => write!(f, "Received unknown message"),
// }
// }
// }

async fn init_outbound_conn(mut proxy: TcpStream) -> Result<(), Box<dyn std::error::Error>> {
println!("Initialing outbound connection.");
let mut buffer: Vec<u8> = Vec::new();
let n = proxy.read_to_end(&mut buffer).await?;
println!("Bytes read from local connection: {n}");
let recv_magic: [u8; 4] = buffer[..4].try_into()?;
println!("Got magic: {}", hex::encode(recv_magic));
if M.to_bytes().ne(&recv_magic) {
return Err(Box::new(io::Error::new(
io::ErrorKind::Other,
"Invalid magic.",
)));
}
println!("Matches our network.");
let mut cursor = std::io::Cursor::new(buffer.clone());
let msg = RawNetworkMessage::consensus_decode(&mut cursor)?;
let command = msg.payload().command();
println!("Message command: {}.", command.to_string());
if !command.to_string().eq("version") {
return Err(Box::new(io::Error::new(
io::ErrorKind::Other,
"Connections must open with Version message.",
)));
}
let version = buffer.clone();
let payload = buffer[24..].to_vec();
let mut cursor = std::io::Cursor::new(payload);
let ver = VersionMessage::consensus_decode_from_finite_reader(&mut cursor)?;
let remote_addr = ver.receiver.socket_addr()?;
println!("Connecting to: {}...", remote_addr.to_string());
let mut outbound = TcpStream::connect(remote_addr).await?;
println!("Sending Version message...");
outbound
.write_all(&version)
.await?;
println!("Sent version to remote.");
let mut buffer = Vec::new();
println!("Reading Version response from remote...");
let n = outbound.read_to_end(&mut buffer).await?;
println!("Bytes read from {} host: {n}", remote_addr.to_string());
println!("{}", hex::encode(&buffer));
if n < 64 {
return Err(Box::new(io::Error::new(
io::ErrorKind::Other,
"Remote closed connection. Disconnecting.",
)));
}
println!("Writing the received message to local.");
proxy.write_all(&buffer).await?;
let mut cursor = std::io::Cursor::new(buffer.clone());
let msg = RawNetworkMessage::consensus_decode(&mut cursor)?;
let command = msg.payload().command();
println!("Message command from {}: {}.", remote_addr.to_string(), command.to_string());
let (mut client_reader, mut client_writer) = proxy.split();
let (mut remote_reader, mut remote_writer) = outbound.split();
loop {
select! {
res = tokio::io::copy(&mut client_reader, &mut remote_writer) => {
match res {
Ok(bytes) => {
println!("Responded to {} with {bytes} bytes.", remote_addr.to_string());
if bytes == 0 {
return Err(Box::new(io::Error::new(
io::ErrorKind::Other,
"Client closed connection. Disconnecting.",
)));
}
},
Err(_) => {
return Err(Box::new(io::Error::new(
io::ErrorKind::Other,
"Remote closed connection: Disconnecting.",
)));
},
}
},
res = tokio::io::copy(&mut remote_reader, &mut client_writer) => {
match res {
Ok(bytes) => {
println!("Responded to local with {bytes} bytes.");
if bytes == 0 {
return Err(Box::new(io::Error::new(
io::ErrorKind::Other,
"Client closed connection. Disconnecting.",
)));
}
},
Err(_) => {
return Err(Box::new(io::Error::new(
io::ErrorKind::Other,
"Client closed connection. Disconnecting.",
)));
},
}
},
}
}
}

// async fn communicate_outbound(
// sender: tokio::sync::mpsc::Sender<ChannelMessage>,
// mut channel: tokio::sync::mpsc::Receiver<ChannelMessage>,
// mut sock: TcpStream,
// ) -> Result<(), PeerError> {
// loop {
// let messages = channel.recv().await;
// match messages {
// Some(message) => {
// match message {
// Ok((message, _)) => {
// let mut cursor = std::io::Cursor::new(message.clone());
// let msg = RawNetworkMessage::consensus_decode(&mut cursor).map_err(|e| PeerError::UnknownMessage)?;
// let command = msg.payload().command();
// println!("Sending a message to remote. Command: {}.", command.to_string());
// sock.write_all(&message).await.map_err(|_e| PeerError::BytesReadError)?;
// },
// Err(_) => {
// return Err(PeerError::UnknownMessage)
// },
// }
// },
// None => {},
// }
// let mut buffer = Vec::new();
// let mut cursor = std::io::Cursor::new(buffer.clone());
// let n = sock.read_to_end(&mut buffer).await.map_err(|e| PeerError::BytesReadError)?;
// if n > 0 {
// let msg = RawNetworkMessage::consensus_decode(&mut cursor).map_err(|_e| PeerError::UnknownMessage)?;
// let command = msg.payload().command();
// println!("Message sent over channel to local thread: {}.", command.to_string());
// sender.send(Ok((buffer, SendTo::Local))).await.map_err(|_e| PeerError::UnknownMessage)?;
// }
// }
// }

// async fn communicate_local(
// sender: tokio::sync::mpsc::Sender<ChannelMessage>,
// mut channel: tokio::sync::mpsc::Receiver<ChannelMessage>,
// mut local: TcpStream,
// ) -> Result<(), PeerError> {
// loop {
// let messages = channel.recv().await;
// match messages {
// Some(message) => {
// match message {
// Ok((message, _)) => {
// let mut cursor = std::io::Cursor::new(message.clone());
// let msg = RawNetworkMessage::consensus_decode(&mut cursor).map_err(|e| PeerError::UnknownMessage)?;
// let command = msg.payload().command();
// println!("Sending a message to local. Command: {}.", command.to_string());
// local.write_all(&message).await.map_err(|_e| PeerError::BytesReadError)?;
// },
// Err(_) => {
// return Err(PeerError::UnknownMessage)
// },
// }
// },
// None => {},
// }
// let mut buffer = Vec::new();
// let mut cursor = std::io::Cursor::new(buffer.clone());
// let n = local.read_to_end(&mut buffer).await.map_err(|e| PeerError::BytesReadError)?;
// if n > 0 {
// let msg = RawNetworkMessage::consensus_decode(&mut cursor).map_err(|_e| PeerError::UnknownMessage)?;
// let command = msg.payload().command();
// println!("Message sent over channel: {}.", command.to_string());
// sender.send(Ok((buffer, SendTo::Local))).await.map_err(|_e| PeerError::UnknownMessage)?;
// }
// }
// }

#[tokio::main]
async fn main() {
let proxy = TcpListener::bind(PROXY)
.await
.expect("Failed to bind to proxy port.");
println!("Listening for connections on {PROXY}");
loop {
let (stream, _) = proxy
.accept()
.await
.expect("Failed to accept inbound connection.");
tokio::spawn(async move {
match init_outbound_conn(stream).await {
Ok(_) => {
println!("Ended connection with no errors.");
}
Err(e) => {
println!("Ended connection with error: {e}.");
}
};
});
}
}
Loading

0 comments on commit 474df15

Please sign in to comment.