Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(l1): getBlockHeaders eth capability test #989

Draft
wants to merge 11 commits into
base: main
Choose a base branch
from
12 changes: 9 additions & 3 deletions cmd/ethereum_rust/ethereum_rust.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,12 +128,18 @@ async fn main() {
"Adding block {} with hash {:#x}.",
block.header.number, hash
);
if add_block(&block, &store).is_err() {
let result = add_block(&block, &store);
if result.is_err() {
warn!(
"Failed to add block {} with hash {:#x}.",
block.header.number, hash
"Failed to add block {} with hash {:#x}: {:?}.",
block.header.number, hash, result
);
// FIXME: Remove and/or discuss this before PR REVIEW
break;
}
// FIXME: Remove and/or discuss this before PR REVIEW
store.update_latest_block_number(block.header.number).unwrap();
store.set_canonical_block(block.header.number, hash).unwrap();
}
info!("Added {} blocks to blockchain", size);
}
Expand Down
39 changes: 39 additions & 0 deletions crates/common/rlp/decode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,10 @@ impl RLPDecode for u8 {

match rlp[0] {
// Single byte in the range [0x00, 0x7f]
// FIXME: This slice can fail if out of bounds
0..=0x7f => Ok((rlp[0], &rlp[1..])),

// FIXME: This slice can fail if out of bounds
// RLP_NULL represents zero
RLP_NULL => Ok((0, &rlp[1..])),

Expand Down Expand Up @@ -270,6 +272,8 @@ impl<T: RLPDecode> RLPDecode for Vec<T> {

let (is_list, payload, input_rest) = decode_rlp_item(rlp)?;
if !is_list {
println!("THE PAYLOAD = {payload:?}");
println!("REST OF INPUT = {input_rest:?}");
return Err(RLPDecodeError::MalformedData);
}

Expand Down Expand Up @@ -330,6 +334,41 @@ impl<T1: RLPDecode, T2: RLPDecode, T3: RLPDecode> RLPDecode for (T1, T2, T3) {
}
}

// FIXME: Clean up this code
impl<
T1: RLPDecode + std::fmt::Debug,
T2: RLPDecode + std::fmt::Debug,
T3: RLPDecode + std::fmt::Debug,
T4: RLPDecode + std::fmt::Debug,
> RLPDecode for (T1, T2, T3, T4)
{
fn decode_unfinished(rlp: &[u8]) -> Result<(Self, &[u8]), RLPDecodeError> {
if rlp.is_empty() {
return Err(RLPDecodeError::InvalidLength);
}
let (is_list, payload, input_rest) = decode_rlp_item(rlp)?;
if !is_list {
return Err(RLPDecodeError::MalformedData);
}
// FIXME: Remove the prints
let (first, first_rest) = T1::decode_unfinished(payload)?;
println!("FIRST DECODED = {first:x?}");
let (second, second_rest) = T2::decode_unfinished(first_rest)?;
println!("SECOND DECODED = {second:x?}");
let (third, third_rest) = T3::decode_unfinished(second_rest)?;
println!("THIRD DECODED = {third:x?}");
let (fourth, fourth_rest) = T4::decode_unfinished(third_rest)?;
println!("FOURTH DECODED = {fourth:x?}");
// check that there is no more data to decode after the fourth element.
if !fourth_rest.is_empty() {
println!("LIST IS NOT EMPTY = {fourth_rest:?}");
return Err(RLPDecodeError::MalformedData);
}

Ok(((first, second, third, fourth), input_rest))
}
}

/// Decodes an RLP item from a slice of bytes.
/// It returns a 3-element tuple with the following elements:
/// - A boolean indicating if the item is a list or not.
Expand Down
6 changes: 4 additions & 2 deletions crates/common/types/fork_id.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@ use super::{BlockHash, BlockNumber, ChainConfig};

#[derive(Debug)]
pub struct ForkId {
fork_hash: H32,
fork_next: BlockNumber,
// FIXME: Return this fields to private
// before PR review.
pub fork_hash: H32,
pub fork_next: BlockNumber,
}

impl ForkId {
Expand Down
36 changes: 21 additions & 15 deletions crates/networking/p2p/net.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ pub async fn start_network(
let discovery_handle = tokio::spawn(discover_peers(
udp_addr,
signer.clone(),
storage.clone(),
table.clone(),
bootnodes,
));
Expand All @@ -66,6 +67,7 @@ pub async fn start_network(
async fn discover_peers(
udp_addr: SocketAddr,
signer: SigningKey,
storage: Store,
table: Arc<Mutex<KademliaTable>>,
bootnodes: Vec<BootNode>,
) {
Expand All @@ -74,6 +76,7 @@ async fn discover_peers(
let server_handler = tokio::spawn(discover_peers_server(
udp_addr,
udp_socket.clone(),
storage,
table.clone(),
signer.clone(),
));
Expand Down Expand Up @@ -111,6 +114,7 @@ async fn discover_peers(
async fn discover_peers_server(
udp_addr: SocketAddr,
udp_socket: Arc<UdpSocket>,
storage: Store,
table: Arc<Mutex<KademliaTable>>,
signer: SigningKey,
) {
Expand Down Expand Up @@ -196,9 +200,10 @@ async fn discover_peers_server(

let mut msg_buf = vec![0; read - 32];
buf[32..read].clone_into(&mut msg_buf);
let signer_clone = signer.clone();
let signer = signer.clone();
let storage = storage.clone();
tokio::spawn(async move {
handle_peer_as_initiator(signer_clone, &msg_buf, &peer.node, table)
handle_peer_as_initiator(signer, &msg_buf, &peer.node, storage, table)
.await;
});
} else {
Expand Down Expand Up @@ -724,13 +729,10 @@ async fn pong(socket: &UdpSocket, to_addr: SocketAddr, ping_hash: H256, signer:
let _ = socket.send_to(&buf, to_addr).await;
}

// TODO build a proper listen loop that receives requests from both
// peers and business layer and propagate storage to use when required
// https://github.com/lambdaclass/lambda_ethereum_rust/issues/840
async fn serve_requests(
tcp_addr: SocketAddr,
signer: SigningKey,
_storage: Store,
storage: Store,
table: Arc<Mutex<KademliaTable>>,
) {
let tcp_socket = TcpSocket::new_v4().unwrap();
Expand All @@ -742,6 +744,7 @@ async fn serve_requests(
tokio::spawn(handle_peer_as_receiver(
signer.clone(),
stream,
storage.clone(),
table.clone(),
));
}
Expand All @@ -750,16 +753,18 @@ async fn serve_requests(
async fn handle_peer_as_receiver(
signer: SigningKey,
stream: TcpStream,
storage: Store,
table: Arc<Mutex<KademliaTable>>,
) {
let conn = RLPxConnection::receiver(signer, stream);
let conn = RLPxConnection::receiver(signer, stream, storage);
handle_peer(conn, table).await;
}

async fn handle_peer_as_initiator(
signer: SigningKey,
msg: &[u8],
node: &Node,
storage: Store,
table: Arc<Mutex<KademliaTable>>,
) {
info!("Trying RLPx connection with {node:?}");
Expand All @@ -768,19 +773,16 @@ async fn handle_peer_as_initiator(
.connect(SocketAddr::new(node.ip, node.tcp_port))
.await
.unwrap();
let conn = RLPxConnection::initiator(signer, msg, stream).await;
let conn = RLPxConnection::initiator(signer, msg, stream, storage).await;
handle_peer(conn, table).await;
}

async fn handle_peer(mut conn: RLPxConnection<TcpStream>, table: Arc<Mutex<KademliaTable>>) {
match conn.handshake().await {
Ok(_) => {
// TODO Properly build listen loop
// https://github.com/lambdaclass/lambda_ethereum_rust/issues/840
// loop {
// conn.await_messages();
// }
}
Ok(_) => match conn.handle_peer().await {
Ok(_) => unreachable!(),
Err(e) => info!("Error during RLPx connection: ({e})"),
},
Err(e) => {
// Discard peer from kademlia table
info!("Handshake failed, discarding peer: ({e})");
Expand All @@ -798,6 +800,7 @@ pub fn node_id_from_signing_key(signer: &SigningKey) -> H512 {
#[cfg(test)]
mod tests {
use super::*;
use ethereum_rust_storage::EngineType;
use kademlia::bucket_number;
use rand::rngs::OsRng;
use std::{
Expand Down Expand Up @@ -844,12 +847,15 @@ mod tests {
let signer = SigningKey::random(&mut OsRng);
let udp_socket = Arc::new(UdpSocket::bind(addr).await.unwrap());
let node_id = node_id_from_signing_key(&signer);
let storage =
Store::new("temp.db", EngineType::InMemory).expect("Failed to create test DB");
let table = Arc::new(Mutex::new(KademliaTable::new(node_id)));

if should_start_server {
tokio::spawn(discover_peers_server(
addr,
udp_socket.clone(),
storage.clone(),
table.clone(),
signer.clone(),
));
Expand Down
Loading
Loading