From 6da9d2d734da8d7c3b2a03695e2dd286a4ec2321 Mon Sep 17 00:00:00 2001 From: Boog900 Date: Mon, 30 Sep 2024 22:15:48 +0100 Subject: [PATCH] P2P: remove peer sync service (#299) * remove peer sync service * change `p2p` to not use the peer sync service * fmt & clippy * doc updates * review fixes * add a little more detail to comment --- Cargo.lock | 1 - p2p/p2p-core/src/client.rs | 14 +- p2p/p2p-core/src/client/connection.rs | 11 +- p2p/p2p-core/src/client/connector.rs | 19 +- p2p/p2p-core/src/client/handshaker.rs | 47 +- p2p/p2p-core/src/client/handshaker/builder.rs | 69 +-- .../src/client/handshaker/builder/dummy.rs | 22 - p2p/p2p-core/src/client/request_handler.rs | 24 +- p2p/p2p-core/src/client/timeout_monitor.rs | 27 +- p2p/p2p-core/src/lib.rs | 24 - p2p/p2p-core/src/services.rs | 22 - p2p/p2p/Cargo.toml | 1 - p2p/p2p/src/block_downloader.rs | 58 +-- p2p/p2p/src/block_downloader/request_chain.rs | 38 +- p2p/p2p/src/block_downloader/tests.rs | 44 +- p2p/p2p/src/client_pool.rs | 26 ++ p2p/p2p/src/constants.rs | 1 + p2p/p2p/src/lib.rs | 35 +- p2p/p2p/src/sync_states.rs | 420 ------------------ 19 files changed, 118 insertions(+), 785 deletions(-) delete mode 100644 p2p/p2p/src/sync_states.rs diff --git a/Cargo.lock b/Cargo.lock index fe9d1ed42..85222559c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -773,7 +773,6 @@ dependencies = [ "cuprate-wire", "dashmap", "futures", - "hex", "indexmap", "monero-serai", "pin-project", diff --git a/p2p/p2p-core/src/client.rs b/p2p/p2p-core/src/client.rs index 86851894f..73b33ba62 100644 --- a/p2p/p2p-core/src/client.rs +++ b/p2p/p2p-core/src/client.rs @@ -1,6 +1,6 @@ use std::{ fmt::{Debug, Display, Formatter}, - sync::Arc, + sync::{Arc, Mutex}, task::{ready, Context, Poll}, }; @@ -15,6 +15,7 @@ use tracing::Instrument; use cuprate_helper::asynch::InfallibleOneshotReceiver; use cuprate_pruning::PruningSeed; +use cuprate_wire::CoreSyncData; use crate::{ handles::{ConnectionGuard, ConnectionHandle}, @@ -59,8 +60,17 @@ pub struct PeerInformation { pub handle: ConnectionHandle, /// The direction of this connection (inbound|outbound). pub direction: ConnectionDirection, - /// The peers pruning seed. + /// The peer's [`PruningSeed`]. pub pruning_seed: PruningSeed, + /// The [`CoreSyncData`] of this peer. + /// + /// Data across fields are not necessarily related, so [`CoreSyncData::top_id`] is not always the + /// block hash for the block at height one below [`CoreSyncData::current_height`]. + /// + /// This value is behind a [`Mutex`] and is updated whenever the peer sends new information related + /// to their sync state. It is publicly accessible to anyone who has a peers [`Client`] handle. You + /// probably should not mutate this value unless you are creating a custom [`ProtocolRequestHandler`](crate::ProtocolRequestHandler). + pub core_sync_data: Arc>, } /// This represents a connection to a peer. diff --git a/p2p/p2p-core/src/client/connection.rs b/p2p/p2p-core/src/client/connection.rs index f7b9be5c0..892fa6492 100644 --- a/p2p/p2p-core/src/client/connection.rs +++ b/p2p/p2p-core/src/client/connection.rs @@ -22,7 +22,7 @@ use crate::{ constants::{REQUEST_TIMEOUT, SENDING_TIMEOUT}, handles::ConnectionGuard, AddressBook, BroadcastMessage, CoreSyncSvc, MessageID, NetworkZone, PeerError, PeerRequest, - PeerResponse, PeerSyncSvc, ProtocolRequestHandler, ProtocolResponse, SharedError, + PeerResponse, ProtocolRequestHandler, ProtocolResponse, SharedError, }; /// A request to the connection task from a [`Client`](crate::client::Client). @@ -71,7 +71,7 @@ const fn levin_command_response(message_id: MessageID, command: LevinCommand) -> } /// This represents a connection to a peer. -pub(crate) struct Connection { +pub(crate) struct Connection { /// The peer sink - where we send messages to the peer. peer_sink: Z::Sink, @@ -86,7 +86,7 @@ pub(crate) struct Connection { broadcast_stream: Pin>, /// The inner handler for any requests that come from the requested peer. - peer_request_handler: PeerRequestHandler, + peer_request_handler: PeerRequestHandler, /// The connection guard which will send signals to other parts of Cuprate when this connection is dropped. connection_guard: ConnectionGuard, @@ -94,12 +94,11 @@ pub(crate) struct Connection { error: SharedError, } -impl Connection +impl Connection where Z: NetworkZone, A: AddressBook, CS: CoreSyncSvc, - PS: PeerSyncSvc, PR: ProtocolRequestHandler, BrdcstStrm: Stream + Send + 'static, { @@ -108,7 +107,7 @@ where peer_sink: Z::Sink, client_rx: mpsc::Receiver, broadcast_stream: BrdcstStrm, - peer_request_handler: PeerRequestHandler, + peer_request_handler: PeerRequestHandler, connection_guard: ConnectionGuard, error: SharedError, ) -> Self { diff --git a/p2p/p2p-core/src/client/connector.rs b/p2p/p2p-core/src/client/connector.rs index 553f5a44f..b3780754b 100644 --- a/p2p/p2p-core/src/client/connector.rs +++ b/p2p/p2p-core/src/client/connector.rs @@ -16,7 +16,7 @@ use tower::{Service, ServiceExt}; use crate::{ client::{handshaker::HandShaker, Client, DoHandshakeRequest, HandshakeError, InternalPeerID}, - AddressBook, BroadcastMessage, ConnectionDirection, CoreSyncSvc, NetworkZone, PeerSyncSvc, + AddressBook, BroadcastMessage, ConnectionDirection, CoreSyncSvc, NetworkZone, ProtocolRequestHandler, }; @@ -32,27 +32,24 @@ pub struct ConnectRequest { } /// The connector service, this service connects to peer and returns the [`Client`]. -pub struct Connector { - handshaker: HandShaker, +pub struct Connector { + handshaker: HandShaker, } -impl - Connector +impl + Connector { /// Create a new connector from a handshaker. - pub const fn new( - handshaker: HandShaker, - ) -> Self { + pub const fn new(handshaker: HandShaker) -> Self { Self { handshaker } } } -impl - Service> for Connector +impl + Service> for Connector where AdrBook: AddressBook + Clone, CSync: CoreSyncSvc + Clone, - PSync: PeerSyncSvc + Clone, ProtoHdlr: ProtocolRequestHandler + Clone, BrdcstStrm: Stream + Send + 'static, BrdcstStrmMkr: Fn(InternalPeerID) -> BrdcstStrm + Clone + Send + 'static, diff --git a/p2p/p2p-core/src/client/handshaker.rs b/p2p/p2p-core/src/client/handshaker.rs index d6873a85f..bf5165e0d 100644 --- a/p2p/p2p-core/src/client/handshaker.rs +++ b/p2p/p2p-core/src/client/handshaker.rs @@ -8,7 +8,7 @@ use std::{ future::Future, marker::PhantomData, pin::Pin, - sync::Arc, + sync::{Arc, Mutex}, task::{Context, Poll}, }; @@ -40,10 +40,9 @@ use crate::{ PING_TIMEOUT, }, handles::HandleBuilder, - services::PeerSyncRequest, AddressBook, AddressBookRequest, AddressBookResponse, BroadcastMessage, ConnectionDirection, CoreSyncDataRequest, CoreSyncDataResponse, CoreSyncSvc, NetZoneAddress, NetworkZone, - PeerSyncSvc, ProtocolRequestHandler, SharedError, + ProtocolRequestHandler, SharedError, }; pub mod builder; @@ -87,13 +86,11 @@ pub struct DoHandshakeRequest { /// The peer handshaking service. #[derive(Debug, Clone)] -pub struct HandShaker { +pub struct HandShaker { /// The address book service. address_book: AdrBook, /// The core sync data service. core_sync_svc: CSync, - /// The peer sync service. - peer_sync_svc: PSync, /// The protocol request handler service. protocol_request_svc: ProtoHdlr, @@ -109,13 +106,12 @@ pub struct HandShaker, } -impl - HandShaker +impl + HandShaker { /// Creates a new handshaker. const fn new( address_book: AdrBook, - peer_sync_svc: PSync, core_sync_svc: CSync, protocol_request_svc: ProtoHdlr, broadcast_stream_maker: BrdcstStrmMkr, @@ -124,7 +120,6 @@ impl ) -> Self { Self { address_book, - peer_sync_svc, core_sync_svc, protocol_request_svc, broadcast_stream_maker, @@ -135,13 +130,11 @@ impl } } -impl - Service> - for HandShaker +impl + Service> for HandShaker where AdrBook: AddressBook + Clone, CSync: CoreSyncSvc + Clone, - PSync: PeerSyncSvc + Clone, ProtoHdlr: ProtocolRequestHandler + Clone, BrdcstStrm: Stream + Send + 'static, BrdcstStrmMkr: Fn(InternalPeerID) -> BrdcstStrm + Clone + Send + 'static, @@ -161,7 +154,6 @@ where let address_book = self.address_book.clone(); let protocol_request_svc = self.protocol_request_svc.clone(); let core_sync_svc = self.core_sync_svc.clone(); - let peer_sync_svc = self.peer_sync_svc.clone(); let our_basic_node_data = self.our_basic_node_data.clone(); let connection_parent_span = self.connection_parent_span.clone(); @@ -176,7 +168,6 @@ where broadcast_stream_maker, address_book, core_sync_svc, - peer_sync_svc, protocol_request_svc, our_basic_node_data, connection_parent_span, @@ -231,15 +222,13 @@ pub async fn ping(addr: N::Addr) -> Result } /// This function completes a handshake with the requested peer. -#[expect(clippy::too_many_arguments)] -async fn handshake( +async fn handshake( req: DoHandshakeRequest, broadcast_stream_maker: BrdcstStrmMkr, mut address_book: AdrBook, mut core_sync_svc: CSync, - mut peer_sync_svc: PSync, protocol_request_handler: ProtoHdlr, our_basic_node_data: BasicNodeData, connection_parent_span: Span, @@ -247,7 +236,6 @@ async fn handshake + Clone, CSync: CoreSyncSvc + Clone, - PSync: PeerSyncSvc + Clone, ProtoHdlr: ProtocolRequestHandler, BrdcstStrm: Stream + Send + 'static, BrdcstStrmMkr: Fn(InternalPeerID) -> BrdcstStrm + Send + 'static, @@ -458,17 +446,6 @@ where }) .await?; - // Tell the core sync service about the new peer. - peer_sync_svc - .ready() - .await? - .call(PeerSyncRequest::IncomingCoreSyncData( - addr, - handle.clone(), - peer_core_sync, - )) - .await?; - // Set up the connection data. let error_slot = SharedError::new(); let (connection_tx, client_rx) = mpsc::channel(1); @@ -478,18 +455,18 @@ where handle, direction, pruning_seed, + core_sync_data: Arc::new(Mutex::new(peer_core_sync)), }; let request_handler = PeerRequestHandler { address_book_svc: address_book.clone(), our_sync_svc: core_sync_svc.clone(), - peer_sync_svc: peer_sync_svc.clone(), protocol_request_handler, our_basic_node_data, peer_info: info.clone(), }; - let connection = Connection::::new( + let connection = Connection::::new( peer_sink, client_rx, broadcast_stream_maker(addr), @@ -509,13 +486,11 @@ where let semaphore = Arc::new(Semaphore::new(1)); let timeout_handle = tokio::spawn(connection_timeout_monitor_task( - info.id, - info.handle.clone(), + info.clone(), connection_tx.clone(), Arc::clone(&semaphore), address_book, core_sync_svc, - peer_sync_svc, )); let client = Client::::new( diff --git a/p2p/p2p-core/src/client/handshaker/builder.rs b/p2p/p2p-core/src/client/handshaker/builder.rs index 069811dfa..c7109edee 100644 --- a/p2p/p2p-core/src/client/handshaker/builder.rs +++ b/p2p/p2p-core/src/client/handshaker/builder.rs @@ -7,13 +7,11 @@ use cuprate_wire::BasicNodeData; use crate::{ client::{handshaker::HandShaker, InternalPeerID}, - AddressBook, BroadcastMessage, CoreSyncSvc, NetworkZone, PeerSyncSvc, ProtocolRequestHandler, + AddressBook, BroadcastMessage, CoreSyncSvc, NetworkZone, ProtocolRequestHandler, }; mod dummy; -pub use dummy::{ - DummyAddressBook, DummyCoreSyncSvc, DummyPeerSyncSvc, DummyProtocolRequestHandler, -}; +pub use dummy::{DummyAddressBook, DummyCoreSyncSvc, DummyProtocolRequestHandler}; /// A [`HandShaker`] [`Service`](tower::Service) builder. /// @@ -28,7 +26,6 @@ pub struct HandshakerBuilder< N: NetworkZone, AdrBook = DummyAddressBook, CSync = DummyCoreSyncSvc, - PSync = DummyPeerSyncSvc, ProtoHdlr = DummyProtocolRequestHandler, BrdcstStrmMkr = fn( InternalPeerID<::Addr>, @@ -38,8 +35,6 @@ pub struct HandshakerBuilder< address_book: AdrBook, /// The core sync data service. core_sync_svc: CSync, - /// The peer sync service. - peer_sync_svc: PSync, /// The protocol request service. protocol_request_svc: ProtoHdlr, /// Our [`BasicNodeData`] @@ -59,7 +54,6 @@ impl HandshakerBuilder { Self { address_book: DummyAddressBook, core_sync_svc: DummyCoreSyncSvc::static_mainnet_genesis(), - peer_sync_svc: DummyPeerSyncSvc, protocol_request_svc: DummyProtocolRequestHandler, our_basic_node_data, broadcast_stream_maker: |_| stream::pending(), @@ -69,8 +63,8 @@ impl HandshakerBuilder { } } -impl - HandshakerBuilder +impl + HandshakerBuilder { /// Changes the address book to the provided one. /// @@ -83,13 +77,12 @@ impl pub fn with_address_book( self, new_address_book: NAdrBook, - ) -> HandshakerBuilder + ) -> HandshakerBuilder where NAdrBook: AddressBook + Clone, { let Self { core_sync_svc, - peer_sync_svc, protocol_request_svc, our_basic_node_data, broadcast_stream_maker, @@ -100,7 +93,6 @@ impl HandshakerBuilder { address_book: new_address_book, core_sync_svc, - peer_sync_svc, protocol_request_svc, our_basic_node_data, broadcast_stream_maker, @@ -125,13 +117,12 @@ impl pub fn with_core_sync_svc( self, new_core_sync_svc: NCSync, - ) -> HandshakerBuilder + ) -> HandshakerBuilder where NCSync: CoreSyncSvc + Clone, { let Self { address_book, - peer_sync_svc, protocol_request_svc, our_basic_node_data, broadcast_stream_maker, @@ -142,43 +133,6 @@ impl HandshakerBuilder { address_book, core_sync_svc: new_core_sync_svc, - peer_sync_svc, - protocol_request_svc, - our_basic_node_data, - broadcast_stream_maker, - connection_parent_span, - _zone: PhantomData, - } - } - - /// Changes the peer sync service, which keeps track of peers sync states. - /// - /// ## Default Peer Sync Service - /// - /// The default peer sync service will be used if this method is not called. - /// - /// The default peer sync service will not keep track of peers sync states. - pub fn with_peer_sync_svc( - self, - new_peer_sync_svc: NPSync, - ) -> HandshakerBuilder - where - NPSync: PeerSyncSvc + Clone, - { - let Self { - address_book, - core_sync_svc, - protocol_request_svc, - our_basic_node_data, - broadcast_stream_maker, - connection_parent_span, - .. - } = self; - - HandshakerBuilder { - address_book, - core_sync_svc, - peer_sync_svc: new_peer_sync_svc, protocol_request_svc, our_basic_node_data, broadcast_stream_maker, @@ -197,14 +151,13 @@ impl pub fn with_protocol_request_handler( self, new_protocol_handler: NProtoHdlr, - ) -> HandshakerBuilder + ) -> HandshakerBuilder where NProtoHdlr: ProtocolRequestHandler + Clone, { let Self { address_book, core_sync_svc, - peer_sync_svc, our_basic_node_data, broadcast_stream_maker, connection_parent_span, @@ -214,7 +167,6 @@ impl HandshakerBuilder { address_book, core_sync_svc, - peer_sync_svc, protocol_request_svc: new_protocol_handler, our_basic_node_data, broadcast_stream_maker, @@ -233,7 +185,7 @@ impl pub fn with_broadcast_stream_maker( self, new_broadcast_stream_maker: NBrdcstStrmMkr, - ) -> HandshakerBuilder + ) -> HandshakerBuilder where BrdcstStrm: Stream + Send + 'static, NBrdcstStrmMkr: Fn(InternalPeerID) -> BrdcstStrm + Clone + Send + 'static, @@ -241,7 +193,6 @@ impl let Self { address_book, core_sync_svc, - peer_sync_svc, protocol_request_svc, our_basic_node_data, connection_parent_span, @@ -251,7 +202,6 @@ impl HandshakerBuilder { address_book, core_sync_svc, - peer_sync_svc, protocol_request_svc, our_basic_node_data, broadcast_stream_maker: new_broadcast_stream_maker, @@ -274,10 +224,9 @@ impl } /// Builds the [`HandShaker`]. - pub fn build(self) -> HandShaker { + pub fn build(self) -> HandShaker { HandShaker::new( self.address_book, - self.peer_sync_svc, self.core_sync_svc, self.protocol_request_svc, self.broadcast_stream_maker, diff --git a/p2p/p2p-core/src/client/handshaker/builder/dummy.rs b/p2p/p2p-core/src/client/handshaker/builder/dummy.rs index e3c4335a0..1dcc2bee8 100644 --- a/p2p/p2p-core/src/client/handshaker/builder/dummy.rs +++ b/p2p/p2p-core/src/client/handshaker/builder/dummy.rs @@ -10,32 +10,10 @@ use cuprate_wire::CoreSyncData; use crate::{ services::{ AddressBookRequest, AddressBookResponse, CoreSyncDataRequest, CoreSyncDataResponse, - PeerSyncRequest, PeerSyncResponse, }, NetworkZone, ProtocolRequest, ProtocolResponse, }; -/// A dummy peer sync service, that doesn't actually keep track of peers sync states. -#[derive(Debug, Clone)] -pub struct DummyPeerSyncSvc; - -impl Service> for DummyPeerSyncSvc { - type Response = PeerSyncResponse; - type Error = tower::BoxError; - type Future = Ready>; - - fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll> { - Poll::Ready(Ok(())) - } - - fn call(&mut self, req: PeerSyncRequest) -> Self::Future { - ready(Ok(match req { - PeerSyncRequest::PeersToSyncFrom { .. } => PeerSyncResponse::PeersToSyncFrom(vec![]), - PeerSyncRequest::IncomingCoreSyncData(_, _, _) => PeerSyncResponse::Ok, - })) - } -} - /// A dummy core sync service that just returns static [`CoreSyncData`]. #[derive(Debug, Clone)] pub struct DummyCoreSyncSvc(CoreSyncData); diff --git a/p2p/p2p-core/src/client/request_handler.rs b/p2p/p2p-core/src/client/request_handler.rs index 7059eed3b..c2f3b8e12 100644 --- a/p2p/p2p-core/src/client/request_handler.rs +++ b/p2p/p2p-core/src/client/request_handler.rs @@ -14,10 +14,8 @@ use crate::{ constants::MAX_PEERS_IN_PEER_LIST_MESSAGE, services::{ AddressBookRequest, AddressBookResponse, CoreSyncDataRequest, CoreSyncDataResponse, - PeerSyncRequest, }, - AddressBook, CoreSyncSvc, NetworkZone, PeerRequest, PeerResponse, PeerSyncSvc, - ProtocolRequestHandler, + AddressBook, CoreSyncSvc, NetworkZone, PeerRequest, PeerResponse, ProtocolRequestHandler, }; #[derive(thiserror::Error, Debug, Copy, Clone, Eq, PartialEq)] @@ -28,13 +26,11 @@ enum PeerRequestHandlerError { /// The peer request handler, handles incoming [`PeerRequest`]s to our node. #[derive(Debug, Clone)] -pub(crate) struct PeerRequestHandler { +pub(crate) struct PeerRequestHandler { /// The address book service. pub address_book_svc: A, /// Our core sync service. pub our_sync_svc: CS, - /// The peer sync service. - pub peer_sync_svc: PS, /// The handler for [`ProtocolRequest`](crate::ProtocolRequest)s to our node. pub protocol_request_handler: PR, @@ -46,12 +42,11 @@ pub(crate) struct PeerRequestHandler { pub peer_info: PeerInformation, } -impl PeerRequestHandler +impl PeerRequestHandler where Z: NetworkZone, A: AddressBook, CS: CoreSyncSvc, - PS: PeerSyncSvc, PR: ProtocolRequestHandler, { /// Handles an incoming [`PeerRequest`] to our node. @@ -104,18 +99,7 @@ where ) -> Result { // TODO: add a limit on the amount of these requests in a certain time period. - let peer_id = self.peer_info.id; - let handle = self.peer_info.handle.clone(); - - self.peer_sync_svc - .ready() - .await? - .call(PeerSyncRequest::IncomingCoreSyncData( - peer_id, - handle, - req.payload_data, - )) - .await?; + *self.peer_info.core_sync_data.lock().unwrap() = req.payload_data; let AddressBookResponse::Peers(peers) = self .address_book_svc diff --git a/p2p/p2p-core/src/client/timeout_monitor.rs b/p2p/p2p-core/src/client/timeout_monitor.rs index b736966af..d9703d6c6 100644 --- a/p2p/p2p-core/src/client/timeout_monitor.rs +++ b/p2p/p2p-core/src/client/timeout_monitor.rs @@ -15,35 +15,31 @@ use tracing::instrument; use cuprate_wire::{admin::TimedSyncRequest, AdminRequestMessage, AdminResponseMessage}; use crate::{ - client::{connection::ConnectionTaskRequest, InternalPeerID}, + client::{connection::ConnectionTaskRequest, PeerInformation}, constants::{MAX_PEERS_IN_PEER_LIST_MESSAGE, TIMEOUT_INTERVAL}, - handles::ConnectionHandle, - services::{AddressBookRequest, CoreSyncDataRequest, CoreSyncDataResponse, PeerSyncRequest}, - AddressBook, CoreSyncSvc, NetworkZone, PeerRequest, PeerResponse, PeerSyncSvc, + services::{AddressBookRequest, CoreSyncDataRequest, CoreSyncDataResponse}, + AddressBook, CoreSyncSvc, NetworkZone, PeerRequest, PeerResponse, }; /// The timeout monitor task, this task will send periodic timed sync requests to the peer to make sure it is still active. #[instrument( name = "timeout_monitor", level = "debug", - fields(addr = %id), + fields(addr = %peer_information.id), skip_all, )] -pub async fn connection_timeout_monitor_task( - id: InternalPeerID, - handle: ConnectionHandle, +pub async fn connection_timeout_monitor_task( + peer_information: PeerInformation, connection_tx: mpsc::Sender, semaphore: Arc, mut address_book_svc: AdrBook, mut core_sync_svc: CSync, - mut peer_core_sync_svc: PSync, ) -> Result<(), tower::BoxError> where AdrBook: AddressBook, CSync: CoreSyncSvc, - PSync: PeerSyncSvc, { let connection_tx_weak = connection_tx.downgrade(); drop(connection_tx); @@ -125,15 +121,6 @@ where )) .await?; - // Tell the peer sync service about the peers core sync data - peer_core_sync_svc - .ready() - .await? - .call(PeerSyncRequest::IncomingCoreSyncData( - id, - handle.clone(), - timed_sync.payload_data, - )) - .await?; + *peer_information.core_sync_data.lock().unwrap() = timed_sync.payload_data; } } diff --git a/p2p/p2p-core/src/lib.rs b/p2p/p2p-core/src/lib.rs index 04e86761d..ca83f8f25 100644 --- a/p2p/p2p-core/src/lib.rs +++ b/p2p/p2p-core/src/lib.rs @@ -192,30 +192,6 @@ pub trait NetworkZone: Clone + Copy + Send + 'static { // Below here is just helper traits, so we don't have to type out tower::Service bounds // everywhere but still get to use tower. -pub trait PeerSyncSvc: - tower::Service< - PeerSyncRequest, - Response = PeerSyncResponse, - Error = tower::BoxError, - Future = Self::Future2, - > + Send - + 'static -{ - // This allows us to put more restrictive bounds on the future without defining the future here - // explicitly. - type Future2: Future> + Send + 'static; -} - -impl PeerSyncSvc for T -where - T: tower::Service, Response = PeerSyncResponse, Error = tower::BoxError> - + Send - + 'static, - T::Future: Future> + Send + 'static, -{ - type Future2 = T::Future; -} - pub trait AddressBook: tower::Service< AddressBookRequest, diff --git a/p2p/p2p-core/src/services.rs b/p2p/p2p-core/src/services.rs index ba8768484..b858f33e2 100644 --- a/p2p/p2p-core/src/services.rs +++ b/p2p/p2p-core/src/services.rs @@ -6,28 +6,6 @@ use crate::{ NetworkZone, }; -/// A request to the service that keeps track of peers sync states. -pub enum PeerSyncRequest { - /// Request some peers to sync from. - /// - /// This takes in the current cumulative difficulty of our chain and will return peers that - /// claim to have a higher cumulative difficulty. - PeersToSyncFrom { - current_cumulative_difficulty: u128, - block_needed: Option, - }, - /// Add/update a peer's core sync data. - IncomingCoreSyncData(InternalPeerID, ConnectionHandle, CoreSyncData), -} - -/// A response from the service that keeps track of peers sync states. -pub enum PeerSyncResponse { - /// The return value of [`PeerSyncRequest::PeersToSyncFrom`]. - PeersToSyncFrom(Vec>), - /// A generic ok response. - Ok, -} - /// A request to the core sync service for our node's [`CoreSyncData`]. pub struct CoreSyncDataRequest; diff --git a/p2p/p2p/Cargo.toml b/p2p/p2p/Cargo.toml index ef852779d..b53baaa39 100644 --- a/p2p/p2p/Cargo.toml +++ b/p2p/p2p/Cargo.toml @@ -30,7 +30,6 @@ thiserror = { workspace = true } bytes = { workspace = true, features = ["std"] } rand = { workspace = true, features = ["std", "std_rng"] } rand_distr = { workspace = true, features = ["std"] } -hex = { workspace = true, features = ["std"] } tracing = { workspace = true, features = ["std", "attributes"] } borsh = { workspace = true, features = ["derive", "std"] } diff --git a/p2p/p2p/src/block_downloader.rs b/p2p/p2p/src/block_downloader.rs index 39980a038..eccb38502 100644 --- a/p2p/p2p/src/block_downloader.rs +++ b/p2p/p2p/src/block_downloader.rs @@ -22,11 +22,7 @@ use tower::{Service, ServiceExt}; use tracing::{instrument, Instrument, Span}; use cuprate_async_buffer::{BufferAppender, BufferStream}; -use cuprate_p2p_core::{ - handles::ConnectionHandle, - services::{PeerSyncRequest, PeerSyncResponse}, - NetworkZone, PeerSyncSvc, -}; +use cuprate_p2p_core::{handles::ConnectionHandle, NetworkZone}; use cuprate_pruning::{PruningSeed, CRYPTONOTE_MAX_BLOCK_HEIGHT}; use crate::{ @@ -137,14 +133,12 @@ pub enum ChainSvcResponse { /// The block downloader may fail before the whole chain is downloaded. If this is the case you can /// call this function again, so it can start the search again. #[instrument(level = "error", skip_all, name = "block_downloader")] -pub fn download_blocks( +pub fn download_blocks( client_pool: Arc>, - peer_sync_svc: S, our_chain_svc: C, config: BlockDownloaderConfig, ) -> BufferStream where - S: PeerSyncSvc + Clone, C: Service + Send + 'static, @@ -152,13 +146,8 @@ where { let (buffer_appender, buffer_stream) = cuprate_async_buffer::new_buffer(config.buffer_size); - let block_downloader = BlockDownloader::new( - client_pool, - peer_sync_svc, - our_chain_svc, - buffer_appender, - config, - ); + let block_downloader = + BlockDownloader::new(client_pool, our_chain_svc, buffer_appender, config); tokio::spawn( block_downloader @@ -195,12 +184,10 @@ where /// - request the next chain entry /// - download an already requested batch of blocks (this might happen due to an error in the previous request /// or because the queue of ready blocks is too large, so we need the oldest block to clear it). -struct BlockDownloader { +struct BlockDownloader { /// The client pool. client_pool: Arc>, - /// The service that holds the peer's sync states. - peer_sync_svc: S, /// The service that holds our current chain state. our_chain_svc: C, @@ -238,9 +225,8 @@ struct BlockDownloader { config: BlockDownloaderConfig, } -impl BlockDownloader +impl BlockDownloader where - S: PeerSyncSvc + Clone, C: Service + Send + 'static, @@ -249,16 +235,12 @@ where /// Creates a new [`BlockDownloader`] fn new( client_pool: Arc>, - - peer_sync_svc: S, our_chain_svc: C, buffer_appender: BufferAppender, - config: BlockDownloaderConfig, ) -> Self { Self { client_pool, - peer_sync_svc, our_chain_svc, amount_of_blocks_to_request: config.initial_batch_size, amount_of_blocks_to_request_updated_at: 0, @@ -495,22 +477,10 @@ where panic!("Chain service returned wrong response."); }; - let PeerSyncResponse::PeersToSyncFrom(peers) = self - .peer_sync_svc - .ready() - .await? - .call(PeerSyncRequest::PeersToSyncFrom { - current_cumulative_difficulty, - block_needed: None, - }) - .await? - else { - panic!("Peer sync service returned wrong response."); - }; - - tracing::debug!("Response received from peer sync service"); - - for client in self.client_pool.borrow_clients(&peers) { + for client in self + .client_pool + .clients_with_more_cumulative_difficulty(current_cumulative_difficulty) + { pending_peers .entry(client.info.pruning_seed) .or_default() @@ -621,12 +591,8 @@ where /// Starts the main loop of the block downloader. async fn run(mut self) -> Result<(), BlockDownloadError> { - let mut chain_tracker = initial_chain_search( - &self.client_pool, - self.peer_sync_svc.clone(), - &mut self.our_chain_svc, - ) - .await?; + let mut chain_tracker = + initial_chain_search(&self.client_pool, &mut self.our_chain_svc).await?; let mut pending_peers = BTreeMap::new(); diff --git a/p2p/p2p/src/block_downloader/request_chain.rs b/p2p/p2p/src/block_downloader/request_chain.rs index bde40ced0..d6a2a0af5 100644 --- a/p2p/p2p/src/block_downloader/request_chain.rs +++ b/p2p/p2p/src/block_downloader/request_chain.rs @@ -1,16 +1,12 @@ use std::{mem, sync::Arc}; -use rand::prelude::SliceRandom; -use rand::thread_rng; use tokio::{task::JoinSet, time::timeout}; use tower::{Service, ServiceExt}; use tracing::{instrument, Instrument, Span}; use cuprate_p2p_core::{ - client::InternalPeerID, - handles::ConnectionHandle, - services::{PeerSyncRequest, PeerSyncResponse}, - NetworkZone, PeerRequest, PeerResponse, PeerSyncSvc, ProtocolRequest, ProtocolResponse, + client::InternalPeerID, handles::ConnectionHandle, NetworkZone, PeerRequest, PeerResponse, + ProtocolRequest, ProtocolResponse, }; use cuprate_wire::protocol::{ChainRequest, ChainResponse}; @@ -83,13 +79,11 @@ pub(crate) async fn request_chain_entry_from_peer( /// /// We then wait for their response and choose the peer who claims the highest cumulative difficulty. #[instrument(level = "error", skip_all)] -pub async fn initial_chain_search( +pub async fn initial_chain_search( client_pool: &Arc>, - mut peer_sync_svc: S, mut our_chain_svc: C, ) -> Result, BlockDownloadError> where - S: PeerSyncSvc, C: Service, { tracing::debug!("Getting our chain history"); @@ -108,29 +102,9 @@ where let our_genesis = *block_ids.last().expect("Blockchain had no genesis block."); - tracing::debug!("Getting a list of peers with higher cumulative difficulty"); - - let PeerSyncResponse::PeersToSyncFrom(mut peers) = peer_sync_svc - .ready() - .await? - .call(PeerSyncRequest::PeersToSyncFrom { - block_needed: None, - current_cumulative_difficulty: cumulative_difficulty, - }) - .await? - else { - panic!("peer sync service sent wrong response."); - }; - - tracing::debug!( - "{} peers claim they have a higher cumulative difficulty", - peers.len() - ); - - // Shuffle the list to remove any possibility of peers being able to prioritize getting picked. - peers.shuffle(&mut thread_rng()); - - let mut peers = client_pool.borrow_clients(&peers); + let mut peers = client_pool + .clients_with_more_cumulative_difficulty(cumulative_difficulty) + .into_iter(); let mut futs = JoinSet::new(); diff --git a/p2p/p2p/src/block_downloader/tests.rs b/p2p/p2p/src/block_downloader/tests.rs index a5c5e9257..83dd417c2 100644 --- a/p2p/p2p/src/block_downloader/tests.rs +++ b/p2p/p2p/src/block_downloader/tests.rs @@ -2,7 +2,7 @@ use std::{ fmt::{Debug, Formatter}, future::Future, pin::Pin, - sync::Arc, + sync::{Arc, Mutex}, task::{Context, Poll}, time::Duration, }; @@ -20,13 +20,14 @@ use tower::{service_fn, Service}; use cuprate_fixed_bytes::ByteArrayVec; use cuprate_p2p_core::{ client::{mock_client, Client, InternalPeerID, PeerInformation}, - services::{PeerSyncRequest, PeerSyncResponse}, - ClearNet, ConnectionDirection, NetworkZone, PeerRequest, PeerResponse, ProtocolRequest, - ProtocolResponse, + ClearNet, ConnectionDirection, PeerRequest, PeerResponse, ProtocolRequest, ProtocolResponse, }; use cuprate_pruning::PruningSeed; use cuprate_types::{BlockCompleteEntry, TransactionBlobs}; -use cuprate_wire::protocol::{ChainResponse, GetObjectsResponse}; +use cuprate_wire::{ + protocol::{ChainResponse, GetObjectsResponse}, + CoreSyncData, +}; use crate::{ block_downloader::{download_blocks, BlockDownloaderConfig, ChainSvcRequest, ChainSvcResponse}, @@ -52,19 +53,14 @@ proptest! { timeout(Duration::from_secs(600), async move { let client_pool = ClientPool::new(); - let mut peer_ids = Vec::with_capacity(peers); - for _ in 0..peers { let client = mock_block_downloader_client(Arc::clone(&blockchain)); - peer_ids.push(client.info.id); - client_pool.add_new_client(client); } let stream = download_blocks( client_pool, - SyncStateSvc(peer_ids) , OurChainSvc { genesis: *blockchain.blocks.first().unwrap().0 }, @@ -255,31 +251,19 @@ fn mock_block_downloader_client(blockchain: Arc) -> Client(Vec>); - -impl Service> for SyncStateSvc { - type Response = PeerSyncResponse; - type Error = tower::BoxError; - type Future = - Pin> + Send + 'static>>; - - fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll> { - Poll::Ready(Ok(())) - } - - fn call(&mut self, _: PeerSyncRequest) -> Self::Future { - let peers = self.0.clone(); - - async move { Ok(PeerSyncResponse::PeersToSyncFrom(peers)) }.boxed() - } -} - struct OurChainSvc { genesis: [u8; 32], } diff --git a/p2p/p2p/src/client_pool.rs b/p2p/p2p/src/client_pool.rs index 3405224ab..77d3b6e5c 100644 --- a/p2p/p2p/src/client_pool.rs +++ b/p2p/p2p/src/client_pool.rs @@ -127,6 +127,32 @@ impl ClientPool { ) -> impl Iterator> + sealed::Captures<(&'a (), &'b ())> { peers.iter().filter_map(|peer| self.borrow_client(peer)) } + + /// Borrows all [`Client`]s from the pool that have claimed a higher cumulative difficulty than + /// the amount passed in. + /// + /// The [`Client`]s are wrapped in [`ClientPoolDropGuard`] which + /// will return the clients to the pool when they are dropped. + pub fn clients_with_more_cumulative_difficulty( + self: &Arc, + cumulative_difficulty: u128, + ) -> Vec> { + let peers = self + .clients + .iter() + .filter_map(|element| { + let peer_sync_info = element.value().info.core_sync_data.lock().unwrap(); + + if peer_sync_info.cumulative_difficulty() > cumulative_difficulty { + Some(*element.key()) + } else { + None + } + }) + .collect::>(); + + self.borrow_clients(&peers).collect() + } } mod sealed { diff --git a/p2p/p2p/src/constants.rs b/p2p/p2p/src/constants.rs index 4c08eb8ad..f2349600a 100644 --- a/p2p/p2p/src/constants.rs +++ b/p2p/p2p/src/constants.rs @@ -16,6 +16,7 @@ pub(crate) const MAX_SEED_CONNECTIONS: usize = 3; pub(crate) const OUTBOUND_CONNECTION_ATTEMPT_TIMEOUT: Duration = Duration::from_secs(5); /// The durations of a short ban. +#[cfg_attr(not(test), expect(dead_code))] pub(crate) const SHORT_BAN: Duration = Duration::from_secs(60 * 10); /// The durations of a medium ban. diff --git a/p2p/p2p/src/lib.rs b/p2p/p2p/src/lib.rs index 2f51c6c55..4a35aced1 100644 --- a/p2p/p2p/src/lib.rs +++ b/p2p/p2p/src/lib.rs @@ -5,11 +5,7 @@ use std::sync::Arc; use futures::FutureExt; -use tokio::{ - sync::{mpsc, watch}, - task::JoinSet, -}; -use tokio_stream::wrappers::WatchStream; +use tokio::{sync::mpsc, task::JoinSet}; use tower::{buffer::Buffer, util::BoxCloneService, Service, ServiceExt}; use tracing::{instrument, Instrument, Span}; @@ -17,7 +13,7 @@ use cuprate_async_buffer::BufferStream; use cuprate_p2p_core::{ client::Connector, client::InternalPeerID, - services::{AddressBookRequest, AddressBookResponse, PeerSyncRequest}, + services::{AddressBookRequest, AddressBookResponse}, CoreSyncSvc, NetworkZone, ProtocolRequestHandler, }; @@ -28,7 +24,6 @@ pub mod config; pub mod connection_maintainer; mod constants; mod inbound_server; -mod sync_states; use block_downloader::{BlockBatch, BlockDownloaderConfig, ChainSvcRequest, ChainSvcResponse}; pub use broadcast::{BroadcastRequest, BroadcastSvc}; @@ -63,12 +58,6 @@ where config.max_inbound_connections + config.outbound_connections, ); - let (sync_states_svc, top_block_watch) = sync_states::PeerSyncSvc::new(); - let sync_states_svc = Buffer::new( - sync_states_svc, - config.max_inbound_connections + config.outbound_connections, - ); - // Use the default config. Changing the defaults affects tx fluff times, which could affect D++ so for now don't allow changing // this. let (broadcast_svc, outbound_mkr, inbound_mkr) = @@ -83,7 +72,6 @@ where let outbound_handshaker_builder = cuprate_p2p_core::client::HandshakerBuilder::new(basic_node_data) .with_address_book(address_book.clone()) - .with_peer_sync_svc(sync_states_svc.clone()) .with_core_sync_svc(core_sync_svc) .with_protocol_request_handler(protocol_request_handler) .with_broadcast_stream_maker(outbound_mkr) @@ -136,9 +124,7 @@ where Ok(NetworkInterface { pool: client_pool, broadcast_svc, - top_block_watch, make_connection_tx, - sync_states_svc, address_book: address_book.boxed_clone(), _background_tasks: Arc::new(background_tasks), }) @@ -151,16 +137,11 @@ pub struct NetworkInterface { pool: Arc>, /// A [`Service`] that allows broadcasting to all connected peers. broadcast_svc: BroadcastSvc, - /// A [`watch`] channel that contains the highest seen cumulative difficulty and other info - /// on that claimed chain. - top_block_watch: watch::Receiver, /// A channel to request extra connections. #[expect(dead_code, reason = "will be used eventually")] make_connection_tx: mpsc::Sender, /// The address book service. address_book: BoxCloneService, AddressBookResponse, tower::BoxError>, - /// The peer's sync states service. - sync_states_svc: Buffer, PeerSyncRequest>, /// Background tasks that will be aborted when this interface is dropped. _background_tasks: Arc>, } @@ -183,17 +164,7 @@ impl NetworkInterface { + 'static, C::Future: Send + 'static, { - block_downloader::download_blocks( - Arc::clone(&self.pool), - self.sync_states_svc.clone(), - our_chain_service, - config, - ) - } - - /// Returns a stream which yields the highest seen sync state from a connected peer. - pub fn top_sync_stream(&self) -> WatchStream { - WatchStream::from_changes(self.top_block_watch.clone()) + block_downloader::download_blocks(Arc::clone(&self.pool), our_chain_service, config) } /// Returns the address book service. diff --git a/p2p/p2p/src/sync_states.rs b/p2p/p2p/src/sync_states.rs deleted file mode 100644 index 0c03795bb..000000000 --- a/p2p/p2p/src/sync_states.rs +++ /dev/null @@ -1,420 +0,0 @@ -//! # Sync States -//! -//! This module contains a [`PeerSyncSvc`], which keeps track of the claimed chain states of connected peers. -//! This allows checking if we are behind and getting a list of peers who claim they are ahead. -use std::{ - cmp::Ordering, - collections::{BTreeMap, HashMap, HashSet}, - future::{ready, Ready}, - task::{Context, Poll}, -}; - -use futures::{stream::FuturesUnordered, StreamExt}; -use tokio::sync::watch; -use tower::Service; - -use cuprate_p2p_core::{ - client::InternalPeerID, - handles::ConnectionHandle, - services::{PeerSyncRequest, PeerSyncResponse}, - NetworkZone, -}; -use cuprate_pruning::{PruningSeed, CRYPTONOTE_MAX_BLOCK_HEIGHT}; -use cuprate_wire::CoreSyncData; - -use crate::{client_pool::disconnect_monitor::PeerDisconnectFut, constants::SHORT_BAN}; - -/// The highest claimed sync info from our connected peers. -#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] -pub struct NewSyncInfo { - /// The peers chain height. - pub chain_height: u64, - /// The peers top block's hash. - pub top_hash: [u8; 32], - /// The peers cumulative difficulty. - pub cumulative_difficulty: u128, -} - -/// A service that keeps track of our peers blockchains. -/// -/// This is the service that handles: -/// 1. Finding out if we need to sync -/// 1. Giving the peers that should be synced _from_, to the requester -pub(crate) struct PeerSyncSvc { - /// A map of cumulative difficulties to peers. - cumulative_difficulties: BTreeMap>>, - /// A map of peers to cumulative difficulties. - peers: HashMap, (u128, PruningSeed)>, - /// A watch channel for *a* top synced peer info. - new_height_watcher: watch::Sender, - /// The handle to the peer that has data in `new_height_watcher`. - last_peer_in_watcher_handle: Option, - /// A [`FuturesUnordered`] that resolves when a peer disconnects. - closed_connections: FuturesUnordered>, -} - -impl PeerSyncSvc { - /// Creates a new [`PeerSyncSvc`] with a [`Receiver`](watch::Receiver) that will be updated with - /// the highest seen sync data, this makes no guarantees about which peer will be chosen in case of a tie. - pub(crate) fn new() -> (Self, watch::Receiver) { - let (watch_tx, mut watch_rx) = watch::channel(NewSyncInfo { - chain_height: 0, - top_hash: [0; 32], - cumulative_difficulty: 0, - }); - - watch_rx.mark_unchanged(); - - ( - Self { - cumulative_difficulties: BTreeMap::new(), - peers: HashMap::new(), - new_height_watcher: watch_tx, - last_peer_in_watcher_handle: None, - closed_connections: FuturesUnordered::new(), - }, - watch_rx, - ) - } - - /// This function checks if any peers have disconnected, removing them if they have. - fn poll_disconnected(&mut self, cx: &mut Context<'_>) { - while let Poll::Ready(Some(peer_id)) = self.closed_connections.poll_next_unpin(cx) { - tracing::trace!("Peer {peer_id} disconnected, removing from peers sync info service."); - let (peer_cum_diff, _) = self.peers.remove(&peer_id).unwrap(); - - let cum_diff_peers = self - .cumulative_difficulties - .get_mut(&peer_cum_diff) - .unwrap(); - cum_diff_peers.remove(&peer_id); - if cum_diff_peers.is_empty() { - // If this was the last peer remove the whole entry for this cumulative difficulty. - self.cumulative_difficulties.remove(&peer_cum_diff); - } - } - } - - /// Returns a list of peers that claim to have a higher cumulative difficulty than `current_cum_diff`. - fn peers_to_sync_from( - &self, - current_cum_diff: u128, - block_needed: Option, - ) -> Vec> { - self.cumulative_difficulties - .range((current_cum_diff + 1)..) - .flat_map(|(_, peers)| peers) - .filter(|peer| { - if let Some(block_needed) = block_needed { - // we just use CRYPTONOTE_MAX_BLOCK_HEIGHT as the blockchain height, this only means - // we don't take into account the tip blocks which are not pruned. - self.peers[peer] - .1 - .has_full_block(block_needed, CRYPTONOTE_MAX_BLOCK_HEIGHT) - } else { - true - } - }) - .copied() - .collect() - } - - /// Updates a peers sync state. - fn update_peer_sync_info( - &mut self, - peer_id: InternalPeerID, - handle: ConnectionHandle, - core_sync_data: &CoreSyncData, - ) -> Result<(), tower::BoxError> { - tracing::trace!( - "Received new core sync data from peer, top hash: {}", - hex::encode(core_sync_data.top_id) - ); - - let new_cumulative_difficulty = core_sync_data.cumulative_difficulty(); - - if let Some((old_cum_diff, _)) = self.peers.get_mut(&peer_id) { - match (*old_cum_diff).cmp(&new_cumulative_difficulty) { - Ordering::Equal => { - // If the cumulative difficulty of the peers chain hasn't changed then no need to update anything. - return Ok(()); - } - Ordering::Greater => { - // This will only happen if a peer lowers its cumulative difficulty during the connection. - // This won't happen if a peer re-syncs their blockchain as then the connection would have closed. - tracing::debug!( - "Peer's claimed cumulative difficulty has dropped, closing connection and banning peer for: {} seconds.", SHORT_BAN.as_secs() - ); - handle.ban_peer(SHORT_BAN); - return Err("Peers cumulative difficulty dropped".into()); - } - Ordering::Less => (), - } - - // Remove the old cumulative difficulty entry for this peer - let old_cum_diff_peers = self.cumulative_difficulties.get_mut(old_cum_diff).unwrap(); - old_cum_diff_peers.remove(&peer_id); - if old_cum_diff_peers.is_empty() { - // If this was the last peer remove the whole entry for this cumulative difficulty. - self.cumulative_difficulties.remove(old_cum_diff); - } - // update the cumulative difficulty - *old_cum_diff = new_cumulative_difficulty; - } else { - // The peer is new so add it the list of peers. - self.peers.insert( - peer_id, - ( - new_cumulative_difficulty, - PruningSeed::decompress_p2p_rules(core_sync_data.pruning_seed)?, - ), - ); - - // add it to the list of peers to watch for disconnection. - self.closed_connections.push(PeerDisconnectFut { - closed_fut: handle.closed(), - peer_id: Some(peer_id), - }); - } - - self.cumulative_difficulties - .entry(new_cumulative_difficulty) - .or_default() - .insert(peer_id); - - // If the claimed cumulative difficulty is higher than the current one in the watcher - // or if the peer in the watch has disconnected, update it. - if self.new_height_watcher.borrow().cumulative_difficulty < new_cumulative_difficulty - || self - .last_peer_in_watcher_handle - .as_ref() - .is_some_and(ConnectionHandle::is_closed) - { - tracing::debug!( - "Updating sync watcher channel with new highest seen cumulative difficulty: {new_cumulative_difficulty}" - ); - #[expect( - clippy::let_underscore_must_use, - reason = "dropped receivers can be ignored" - )] - let _ = self.new_height_watcher.send(NewSyncInfo { - top_hash: core_sync_data.top_id, - chain_height: core_sync_data.current_height, - cumulative_difficulty: new_cumulative_difficulty, - }); - self.last_peer_in_watcher_handle.replace(handle); - } - - Ok(()) - } -} - -impl Service> for PeerSyncSvc { - type Response = PeerSyncResponse; - type Error = tower::BoxError; - type Future = Ready>; - - fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { - self.poll_disconnected(cx); - - Poll::Ready(Ok(())) - } - - fn call(&mut self, req: PeerSyncRequest) -> Self::Future { - let res = match req { - PeerSyncRequest::PeersToSyncFrom { - current_cumulative_difficulty, - block_needed, - } => Ok(PeerSyncResponse::PeersToSyncFrom(self.peers_to_sync_from( - current_cumulative_difficulty, - block_needed, - ))), - PeerSyncRequest::IncomingCoreSyncData(peer_id, handle, sync_data) => self - .update_peer_sync_info(peer_id, handle, &sync_data) - .map(|()| PeerSyncResponse::Ok), - }; - - ready(res) - } -} - -#[cfg(test)] -mod tests { - use tower::{Service, ServiceExt}; - - use cuprate_p2p_core::{ - client::InternalPeerID, handles::HandleBuilder, services::PeerSyncRequest, - }; - use cuprate_wire::CoreSyncData; - - use cuprate_p2p_core::services::PeerSyncResponse; - use cuprate_test_utils::test_netzone::TestNetZone; - - use super::PeerSyncSvc; - - #[tokio::test] - async fn top_sync_channel_updates() { - let (_g, handle) = HandleBuilder::new().build(); - - let (mut svc, mut watch) = PeerSyncSvc::>::new(); - - assert!(!watch.has_changed().unwrap()); - - svc.ready() - .await - .unwrap() - .call(PeerSyncRequest::IncomingCoreSyncData( - InternalPeerID::Unknown(0), - handle.clone(), - CoreSyncData { - cumulative_difficulty: 1_000, - cumulative_difficulty_top64: 0, - current_height: 0, - pruning_seed: 0, - top_id: [0; 32], - top_version: 0, - }, - )) - .await - .unwrap(); - - assert!(watch.has_changed().unwrap()); - - assert_eq!(watch.borrow().top_hash, [0; 32]); - assert_eq!(watch.borrow().cumulative_difficulty, 1000); - assert_eq!(watch.borrow_and_update().chain_height, 0); - - svc.ready() - .await - .unwrap() - .call(PeerSyncRequest::IncomingCoreSyncData( - InternalPeerID::Unknown(1), - handle.clone(), - CoreSyncData { - cumulative_difficulty: 1_000, - cumulative_difficulty_top64: 0, - current_height: 0, - pruning_seed: 0, - top_id: [0; 32], - top_version: 0, - }, - )) - .await - .unwrap(); - - assert!(!watch.has_changed().unwrap()); - - svc.ready() - .await - .unwrap() - .call(PeerSyncRequest::IncomingCoreSyncData( - InternalPeerID::Unknown(2), - handle.clone(), - CoreSyncData { - cumulative_difficulty: 1_001, - cumulative_difficulty_top64: 0, - current_height: 0, - pruning_seed: 0, - top_id: [1; 32], - top_version: 0, - }, - )) - .await - .unwrap(); - - assert!(watch.has_changed().unwrap()); - - assert_eq!(watch.borrow().top_hash, [1; 32]); - assert_eq!(watch.borrow().cumulative_difficulty, 1001); - assert_eq!(watch.borrow_and_update().chain_height, 0); - } - - #[tokio::test] - async fn peer_sync_info_updates() { - let (_g, handle) = HandleBuilder::new().build(); - - let (mut svc, _watch) = PeerSyncSvc::>::new(); - - svc.ready() - .await - .unwrap() - .call(PeerSyncRequest::IncomingCoreSyncData( - InternalPeerID::Unknown(0), - handle.clone(), - CoreSyncData { - cumulative_difficulty: 1_000, - cumulative_difficulty_top64: 0, - current_height: 0, - pruning_seed: 0, - top_id: [0; 32], - top_version: 0, - }, - )) - .await - .unwrap(); - - assert_eq!(svc.peers.len(), 1); - assert_eq!(svc.cumulative_difficulties.len(), 1); - - svc.ready() - .await - .unwrap() - .call(PeerSyncRequest::IncomingCoreSyncData( - InternalPeerID::Unknown(0), - handle.clone(), - CoreSyncData { - cumulative_difficulty: 1_001, - cumulative_difficulty_top64: 0, - current_height: 0, - pruning_seed: 0, - top_id: [0; 32], - top_version: 0, - }, - )) - .await - .unwrap(); - - assert_eq!(svc.peers.len(), 1); - assert_eq!(svc.cumulative_difficulties.len(), 1); - - svc.ready() - .await - .unwrap() - .call(PeerSyncRequest::IncomingCoreSyncData( - InternalPeerID::Unknown(1), - handle.clone(), - CoreSyncData { - cumulative_difficulty: 10, - cumulative_difficulty_top64: 0, - current_height: 0, - pruning_seed: 0, - top_id: [0; 32], - top_version: 0, - }, - )) - .await - .unwrap(); - - assert_eq!(svc.peers.len(), 2); - assert_eq!(svc.cumulative_difficulties.len(), 2); - - let PeerSyncResponse::PeersToSyncFrom(peers) = svc - .ready() - .await - .unwrap() - .call(PeerSyncRequest::PeersToSyncFrom { - block_needed: None, - current_cumulative_difficulty: 0, - }) - .await - .unwrap() - else { - panic!("Wrong response for request.") - }; - - assert!( - peers.contains(&InternalPeerID::Unknown(0)) - && peers.contains(&InternalPeerID::Unknown(1)) - ); - } -}