From 96288b8984e5c19b821297b2278c428c8d0d3786 Mon Sep 17 00:00:00 2001 From: Victor Ermolaev <16148931+vnermolaev@users.noreply.github.com> Date: Fri, 14 Apr 2023 17:19:47 +0200 Subject: [PATCH] fix(gossipsub): gracefully disable handler on stream errors Previously, we closed the entire connection upon receiving too many upgrade errors. This is unnecessarily aggressive. For example, an upgrade error may be caused by the remote dropping a stream during the initial handshake which is completely isolated from other protocols running on the same connection. Instead of closing the connection, set `KeepAlive::No`. Related: #3591. Resolves: #3690. Pull-Request: #3625. --- Cargo.lock | 4 +- protocols/gossipsub/CHANGELOG.md | 4 + protocols/gossipsub/Cargo.toml | 4 +- protocols/gossipsub/src/error.rs | 10 +- protocols/gossipsub/src/error_priv.rs | 6 - protocols/gossipsub/src/handler.rs | 489 +++++++++++++------------- protocols/gossipsub/src/lib.rs | 5 +- protocols/gossipsub/src/protocol.rs | 17 +- swarm/CHANGELOG.md | 6 + swarm/Cargo.toml | 2 +- swarm/src/handler.rs | 32 ++ 11 files changed, 301 insertions(+), 278 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 822c2d4094d..27e08c402a4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2438,6 +2438,7 @@ dependencies = [ "base64 0.21.0", "byteorder", "bytes", + "either", "env_logger 0.10.0", "fnv", "futures", @@ -2462,6 +2463,7 @@ dependencies = [ "smallvec", "thiserror", "unsigned-varint", + "void", "wasm-timer", ] @@ -2841,7 +2843,7 @@ dependencies = [ [[package]] name = "libp2p-swarm" -version = "0.42.1" +version = "0.42.2" dependencies = [ "async-std", "either", diff --git a/protocols/gossipsub/CHANGELOG.md b/protocols/gossipsub/CHANGELOG.md index bfc3e40c335..3006727df17 100644 --- a/protocols/gossipsub/CHANGELOG.md +++ b/protocols/gossipsub/CHANGELOG.md @@ -2,7 +2,11 @@ - Fix erroneously duplicate message IDs. See [PR 3716]. +- Gracefully disable handler on stream errors. Deprecate a few variants of `HandlerError`. + See [PR 3625]. + [PR 3716]: https://github.com/libp2p/rust-libp2p/pull/3716 +[PR 3625]: https://github.com/libp2p/rust-libp2p/pull/3325 ## 0.44.2 diff --git a/protocols/gossipsub/Cargo.toml b/protocols/gossipsub/Cargo.toml index 53d9d4be702..d59e1375315 100644 --- a/protocols/gossipsub/Cargo.toml +++ b/protocols/gossipsub/Cargo.toml @@ -11,7 +11,8 @@ keywords = ["peer-to-peer", "libp2p", "networking"] categories = ["network-programming", "asynchronous"] [dependencies] -libp2p-swarm = { version = "0.42.1", path = "../../swarm" } +either = "1.5" +libp2p-swarm = { version = "0.42.2", path = "../../swarm" } libp2p-core = { version = "0.39.0", path = "../../core" } libp2p-identity = { version = "0.1.2", path = "../../identity" } bytes = "1.4" @@ -33,6 +34,7 @@ serde = { version = "1", optional = true, features = ["derive"] } thiserror = "1.0" wasm-timer = "0.2.5" instant = "0.1.11" +void = "1.0.2" # Metrics dependencies prometheus-client = "0.19.0" diff --git a/protocols/gossipsub/src/error.rs b/protocols/gossipsub/src/error.rs index aa04144ff79..61ef13bd248 100644 --- a/protocols/gossipsub/src/error.rs +++ b/protocols/gossipsub/src/error.rs @@ -30,16 +30,10 @@ pub type PublishError = crate::error_priv::PublishError; )] pub type SubscriptionError = crate::error_priv::SubscriptionError; -#[deprecated( - since = "0.44.0", - note = "Use re-exports that omit `Gossipsub` prefix, i.e. `libp2p::gossipsub::HandlerError" -)] +#[deprecated(note = "This error will no longer be emitted")] pub type GossipsubHandlerError = crate::error_priv::HandlerError; -#[deprecated( - since = "0.44.0", - note = "Use `libp2p::gossipsub::HandlerError` instead, as the `error` module will become crate-private in the future." -)] +#[deprecated(note = "This error will no longer be emitted")] pub type HandlerError = crate::error_priv::HandlerError; #[deprecated( diff --git a/protocols/gossipsub/src/error_priv.rs b/protocols/gossipsub/src/error_priv.rs index 09bbfbb3543..04cc72028cd 100644 --- a/protocols/gossipsub/src/error_priv.rs +++ b/protocols/gossipsub/src/error_priv.rs @@ -134,12 +134,6 @@ impl std::fmt::Display for ValidationError { impl std::error::Error for ValidationError {} -impl From for HandlerError { - fn from(error: std::io::Error) -> HandlerError { - HandlerError::Codec(quick_protobuf_codec::Error::from(error)) - } -} - impl From for PublishError { fn from(error: std::io::Error) -> PublishError { PublishError::TransformFailed(error) diff --git a/protocols/gossipsub/src/handler.rs b/protocols/gossipsub/src/handler.rs index 604803b874e..609bb81a306 100644 --- a/protocols/gossipsub/src/handler.rs +++ b/protocols/gossipsub/src/handler.rs @@ -21,30 +21,26 @@ use crate::protocol::{GossipsubCodec, ProtocolConfig}; use crate::rpc_proto::proto; use crate::types::{PeerKind, RawMessage, Rpc}; -use crate::{HandlerError, ValidationError}; +use crate::ValidationError; use asynchronous_codec::Framed; +use futures::future::Either; use futures::prelude::*; use futures::StreamExt; use instant::Instant; -use libp2p_core::upgrade::{NegotiationError, UpgradeError}; +use libp2p_core::upgrade::{DeniedUpgrade, NegotiationError, UpgradeError}; use libp2p_swarm::handler::{ ConnectionEvent, ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerUpgrErr, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound, KeepAlive, SubstreamProtocol, }; use libp2p_swarm::NegotiatedSubstream; -use log::{error, trace, warn}; use smallvec::SmallVec; use std::{ - collections::VecDeque, - io, pin::Pin, task::{Context, Poll}, time::Duration, }; - -/// The initial time (in seconds) we set the keep alive for protocol negotiations to occur. -const INITIAL_KEEP_ALIVE: u64 = 30; +use void::Void; /// The event emitted by the Handler. This informs the behaviour of various events created /// by the handler. @@ -75,17 +71,23 @@ pub enum HandlerIn { LeftMesh, } -/// The maximum number of substreams we accept or create before disconnecting from the peer. +/// The maximum number of inbound or outbound substreams attempts we allow. /// /// Gossipsub is supposed to have a single long-lived inbound and outbound substream. On failure we /// attempt to recreate these. This imposes an upper bound of new substreams before we consider the -/// connection faulty and disconnect. This also prevents against potential substream creation loops. -const MAX_SUBSTREAM_CREATION: usize = 5; +/// connection faulty and disable the handler. This also prevents against potential substream +/// creation loops. +const MAX_SUBSTREAM_ATTEMPTS: usize = 5; + +pub enum Handler { + Enabled(EnabledHandler), + Disabled(DisabledHandler), +} /// Protocol Handler that manages a single long-lived substream with a peer. -pub struct Handler { +pub struct EnabledHandler { /// Upgrade configuration for the gossipsub protocol. - listen_protocol: SubstreamProtocol, + listen_protocol: ProtocolConfig, /// The single long-lived outbound substream. outbound_substream: Option, @@ -100,11 +102,11 @@ pub struct Handler { /// requests. outbound_substream_establishing: bool, - /// The number of outbound substreams we have created. - outbound_substreams_created: usize, + /// The number of outbound substreams we have requested. + outbound_substream_attempts: usize, /// The number of inbound substreams that have been created by the peer. - inbound_substreams_created: usize, + inbound_substream_attempts: usize, /// The type of peer this handler is associated to. peer_kind: Option, @@ -114,27 +116,29 @@ pub struct Handler { // NOTE: Use this flag rather than checking the substream count each poll. peer_kind_sent: bool, - /// If the peer doesn't support the gossipsub protocol we do not immediately disconnect. - /// Rather, we disable the handler and prevent any incoming or outgoing substreams from being - /// established. - /// - /// This value is set to true to indicate the peer doesn't support gossipsub. - protocol_unsupported: bool, + last_io_activity: Instant, - /// The amount of time we allow idle connections before disconnecting. + /// The amount of time we keep an idle connection alive. idle_timeout: Duration, - /// Collection of errors from attempting an upgrade. - upgrade_errors: VecDeque>, - - /// Flag determining whether to maintain the connection to the peer. - keep_alive: KeepAlive, - /// Keeps track of whether this connection is for a peer in the mesh. This is used to make /// decisions about the keep alive state for this connection. in_mesh: bool, } +pub enum DisabledHandler { + /// If the peer doesn't support the gossipsub protocol we do not immediately disconnect. + /// Rather, we disable the handler and prevent any incoming or outgoing substreams from being + /// established. + ProtocolUnsupported { + /// Keeps track on whether we have sent the peer kind to the behaviour. + peer_kind_sent: bool, + }, + /// The maximum number of inbound or outbound substream attempts have happened and thereby the + /// handler has been disabled. + MaxSubstreamAttempts, +} + /// State of the inbound substream, opened either by us or by the remote. enum InboundSubstreamState { /// Waiting for a message from the remote. The idle state for an inbound substream. @@ -153,8 +157,6 @@ enum OutboundSubstreamState { PendingSend(Framed, proto::RPC), /// Waiting to flush the substream so that the data arrives to the remote. PendingFlush(Framed), - /// The substream is being closed. Used by either substream. - _Closing(Framed), /// An error occurred during processing. Poisoned, } @@ -162,120 +164,57 @@ enum OutboundSubstreamState { impl Handler { /// Builds a new [`Handler`]. pub fn new(protocol_config: ProtocolConfig, idle_timeout: Duration) -> Self { - Handler { - listen_protocol: SubstreamProtocol::new(protocol_config, ()), + Handler::Enabled(EnabledHandler { + listen_protocol: protocol_config, inbound_substream: None, outbound_substream: None, outbound_substream_establishing: false, - outbound_substreams_created: 0, - inbound_substreams_created: 0, + outbound_substream_attempts: 0, + inbound_substream_attempts: 0, send_queue: SmallVec::new(), peer_kind: None, peer_kind_sent: false, - protocol_unsupported: false, + last_io_activity: Instant::now(), idle_timeout, - upgrade_errors: VecDeque::new(), - keep_alive: KeepAlive::Until(Instant::now() + Duration::from_secs(INITIAL_KEEP_ALIVE)), in_mesh: false, - } + }) } +} +impl EnabledHandler { fn on_fully_negotiated_inbound( &mut self, - FullyNegotiatedInbound { protocol, .. }: FullyNegotiatedInbound< - ::InboundProtocol, - ::InboundOpenInfo, - >, + (substream, peer_kind): (Framed, PeerKind), ) { - let (substream, peer_kind) = protocol; - - // If the peer doesn't support the protocol, reject all substreams - if self.protocol_unsupported { - return; - } - - self.inbound_substreams_created += 1; - // update the known kind of peer if self.peer_kind.is_none() { self.peer_kind = Some(peer_kind); } // new inbound substream. Replace the current one, if it exists. - trace!("New inbound substream request"); + log::trace!("New inbound substream request"); self.inbound_substream = Some(InboundSubstreamState::WaitingInput(substream)); } fn on_fully_negotiated_outbound( &mut self, - FullyNegotiatedOutbound { - protocol, - info: message, - }: FullyNegotiatedOutbound< - ::OutboundProtocol, - ::OutboundOpenInfo, + FullyNegotiatedOutbound { protocol, .. }: FullyNegotiatedOutbound< + ::OutboundProtocol, + ::OutboundOpenInfo, >, ) { let (substream, peer_kind) = protocol; - // If the peer doesn't support the protocol, reject all substreams - if self.protocol_unsupported { - return; - } - - self.outbound_substream_establishing = false; - self.outbound_substreams_created += 1; - // update the known kind of peer if self.peer_kind.is_none() { self.peer_kind = Some(peer_kind); } - // Should never establish a new outbound substream if one already exists. - // If this happens, an outbound message is not sent. - if self.outbound_substream.is_some() { - warn!("Established an outbound substream with one already available"); - // Add the message back to the send queue - self.send_queue.push(message); - } else { - self.outbound_substream = Some(OutboundSubstreamState::PendingSend(substream, message)); - } - } -} - -impl ConnectionHandler for Handler { - type InEvent = HandlerIn; - type OutEvent = HandlerEvent; - type Error = HandlerError; - type InboundOpenInfo = (); - type InboundProtocol = ProtocolConfig; - type OutboundOpenInfo = proto::RPC; - type OutboundProtocol = ProtocolConfig; - - fn listen_protocol(&self) -> SubstreamProtocol { - self.listen_protocol.clone() - } - - fn on_behaviour_event(&mut self, message: HandlerIn) { - if !self.protocol_unsupported { - match message { - HandlerIn::Message(m) => self.send_queue.push(m), - // If we have joined the mesh, keep the connection alive. - HandlerIn::JoinedMesh => { - self.in_mesh = true; - self.keep_alive = KeepAlive::Yes; - } - // If we have left the mesh, start the idle timer. - HandlerIn::LeftMesh => { - self.in_mesh = false; - self.keep_alive = KeepAlive::Until(Instant::now() + self.idle_timeout); - } - } - } - } - - fn connection_keep_alive(&self) -> KeepAlive { - self.keep_alive + assert!( + self.outbound_substream.is_none(), + "Established an outbound substream with one already available" + ); + self.outbound_substream = Some(OutboundSubstreamState::WaitingOutput(substream)); } fn poll( @@ -283,52 +222,12 @@ impl ConnectionHandler for Handler { cx: &mut Context<'_>, ) -> Poll< ConnectionHandlerEvent< - Self::OutboundProtocol, - Self::OutboundOpenInfo, - Self::OutEvent, - Self::Error, + ::OutboundProtocol, + ::OutboundOpenInfo, + ::OutEvent, + ::Error, >, > { - // Handle any upgrade errors - if let Some(error) = self.upgrade_errors.pop_front() { - let reported_error = match error { - // Timeout errors get mapped to NegotiationTimeout and we close the connection. - ConnectionHandlerUpgrErr::Timeout | ConnectionHandlerUpgrErr::Timer => { - Some(HandlerError::NegotiationTimeout) - } - // There was an error post negotiation, close the connection. - ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(e)) => Some(e), - ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(negotiation_error)) => { - match negotiation_error { - NegotiationError::Failed => { - // The protocol is not supported - self.protocol_unsupported = true; - if !self.peer_kind_sent { - self.peer_kind_sent = true; - // clear all substreams so the keep alive returns false - self.inbound_substream = None; - self.outbound_substream = None; - self.keep_alive = KeepAlive::No; - return Poll::Ready(ConnectionHandlerEvent::Custom( - HandlerEvent::PeerKind(PeerKind::NotSupported), - )); - } else { - None - } - } - NegotiationError::ProtocolError(e) => { - Some(HandlerError::NegotiationProtocolError(e)) - } - } - } - }; - - // If there was a fatal error, close the connection. - if let Some(error) = reported_error { - return Poll::Ready(ConnectionHandlerEvent::Close(error)); - } - } - if !self.peer_kind_sent { if let Some(peer_kind) = self.peer_kind.as_ref() { self.peer_kind_sent = true; @@ -338,28 +237,14 @@ impl ConnectionHandler for Handler { } } - if self.inbound_substreams_created > MAX_SUBSTREAM_CREATION { - // Too many inbound substreams have been created, end the connection. - return Poll::Ready(ConnectionHandlerEvent::Close( - HandlerError::MaxInboundSubstreams, - )); - } - - // determine if we need to create the stream + // determine if we need to create the outbound stream if !self.send_queue.is_empty() && self.outbound_substream.is_none() && !self.outbound_substream_establishing { - if self.outbound_substreams_created >= MAX_SUBSTREAM_CREATION { - return Poll::Ready(ConnectionHandlerEvent::Close( - HandlerError::MaxOutboundSubstreams, - )); - } - let message = self.send_queue.remove(0); - self.send_queue.shrink_to_fit(); self.outbound_substream_establishing = true; return Poll::Ready(ConnectionHandlerEvent::OutboundSubstreamRequest { - protocol: self.listen_protocol.clone().map_info(|()| message), + protocol: SubstreamProtocol::new(self.listen_protocol.clone(), ()), }); } @@ -372,34 +257,22 @@ impl ConnectionHandler for Handler { Some(InboundSubstreamState::WaitingInput(mut substream)) => { match substream.poll_next_unpin(cx) { Poll::Ready(Some(Ok(message))) => { - if !self.in_mesh { - self.keep_alive = - KeepAlive::Until(Instant::now() + self.idle_timeout); - } + self.last_io_activity = Instant::now(); self.inbound_substream = Some(InboundSubstreamState::WaitingInput(substream)); return Poll::Ready(ConnectionHandlerEvent::Custom(message)); } Poll::Ready(Some(Err(error))) => { - match error { - HandlerError::MaxTransmissionSize => { - warn!("Message exceeded the maximum transmission size"); - self.inbound_substream = - Some(InboundSubstreamState::WaitingInput(substream)); - } - _ => { - warn!("Inbound stream error: {}", error); - // More serious errors, close this side of the stream. If the - // peer is still around, they will re-establish their - // connection - self.inbound_substream = - Some(InboundSubstreamState::Closing(substream)); - } - } + log::debug!("Failed to read from inbound stream: {error}"); + // Close this side of the stream. If the + // peer is still around, they will re-establish their + // outbound stream i.e. our inbound stream. + self.inbound_substream = + Some(InboundSubstreamState::Closing(substream)); } // peer closed the stream Poll::Ready(None) => { - warn!("Peer closed their outbound stream"); + log::debug!("Inbound stream closed by remote"); self.inbound_substream = Some(InboundSubstreamState::Closing(substream)); } @@ -417,12 +290,9 @@ impl ConnectionHandler for Handler { // Don't close the connection but just drop the inbound substream. // In case the remote has more to send, they will open up a new // substream. - warn!("Inbound substream error while closing: {:?}", e); + log::debug!("Inbound substream error while closing: {e}"); } self.inbound_substream = None; - if self.outbound_substream.is_none() { - self.keep_alive = KeepAlive::No; - } break; } Poll::Pending => { @@ -469,23 +339,19 @@ impl ConnectionHandler for Handler { self.outbound_substream = Some(OutboundSubstreamState::PendingFlush(substream)) } - Err(HandlerError::MaxTransmissionSize) => { - error!("Message exceeded the maximum transmission size and was not sent."); - self.outbound_substream = - Some(OutboundSubstreamState::WaitingOutput(substream)); - } Err(e) => { - error!("Error sending message: {}", e); - return Poll::Ready(ConnectionHandlerEvent::Close(e)); + log::debug!("Failed to send message on outbound stream: {e}"); + self.outbound_substream = None; + break; } } } Poll::Ready(Err(e)) => { - error!("Outbound substream error while sending output: {:?}", e); - return Poll::Ready(ConnectionHandlerEvent::Close(e)); + log::debug!("Failed to send message on outbound stream: {e}"); + self.outbound_substream = None; + break; } Poll::Pending => { - self.keep_alive = KeepAlive::Yes; self.outbound_substream = Some(OutboundSubstreamState::PendingSend(substream, message)); break; @@ -495,49 +361,18 @@ impl ConnectionHandler for Handler { Some(OutboundSubstreamState::PendingFlush(mut substream)) => { match Sink::poll_flush(Pin::new(&mut substream), cx) { Poll::Ready(Ok(())) => { - if !self.in_mesh { - // if not in the mesh, reset the idle timeout - self.keep_alive = - KeepAlive::Until(Instant::now() + self.idle_timeout); - } + self.last_io_activity = Instant::now(); self.outbound_substream = Some(OutboundSubstreamState::WaitingOutput(substream)) } Poll::Ready(Err(e)) => { - return Poll::Ready(ConnectionHandlerEvent::Close(e)) - } - Poll::Pending => { - self.keep_alive = KeepAlive::Yes; - self.outbound_substream = - Some(OutboundSubstreamState::PendingFlush(substream)); - break; - } - } - } - // Currently never used - manual shutdown may implement this in the future - Some(OutboundSubstreamState::_Closing(mut substream)) => { - match Sink::poll_close(Pin::new(&mut substream), cx) { - Poll::Ready(Ok(())) => { + log::debug!("Failed to flush outbound stream: {e}"); self.outbound_substream = None; - if self.inbound_substream.is_none() { - self.keep_alive = KeepAlive::No; - } break; } - Poll::Ready(Err(e)) => { - warn!("Outbound substream error while closing: {:?}", e); - return Poll::Ready(ConnectionHandlerEvent::Close( - io::Error::new( - io::ErrorKind::BrokenPipe, - "Failed to close outbound substream", - ) - .into(), - )); - } Poll::Pending => { - self.keep_alive = KeepAlive::No; self.outbound_substream = - Some(OutboundSubstreamState::_Closing(substream)); + Some(OutboundSubstreamState::PendingFlush(substream)); break; } } @@ -554,6 +389,92 @@ impl ConnectionHandler for Handler { Poll::Pending } +} + +impl ConnectionHandler for Handler { + type InEvent = HandlerIn; + type OutEvent = HandlerEvent; + type Error = Void; + type InboundOpenInfo = (); + type InboundProtocol = either::Either; + type OutboundOpenInfo = (); + type OutboundProtocol = ProtocolConfig; + + fn listen_protocol(&self) -> SubstreamProtocol { + match self { + Handler::Enabled(handler) => { + SubstreamProtocol::new(either::Either::Left(handler.listen_protocol.clone()), ()) + } + Handler::Disabled(_) => { + SubstreamProtocol::new(either::Either::Right(DeniedUpgrade), ()) + } + } + } + + fn on_behaviour_event(&mut self, message: HandlerIn) { + match self { + Handler::Enabled(handler) => match message { + HandlerIn::Message(m) => handler.send_queue.push(m), + HandlerIn::JoinedMesh => { + handler.in_mesh = true; + } + HandlerIn::LeftMesh => { + handler.in_mesh = false; + } + }, + Handler::Disabled(_) => { + log::debug!("Handler is disabled. Dropping message {:?}", message); + } + } + } + + fn connection_keep_alive(&self) -> KeepAlive { + match self { + Handler::Enabled(handler) => { + if handler.in_mesh { + return KeepAlive::Yes; + } + + if let Some( + OutboundSubstreamState::PendingSend(_, _) + | OutboundSubstreamState::PendingFlush(_), + ) = handler.outbound_substream + { + return KeepAlive::Yes; + } + + KeepAlive::Until(handler.last_io_activity + handler.idle_timeout) + } + Handler::Disabled(_) => KeepAlive::No, + } + } + + fn poll( + &mut self, + cx: &mut Context<'_>, + ) -> Poll< + ConnectionHandlerEvent< + Self::OutboundProtocol, + Self::OutboundOpenInfo, + Self::OutEvent, + Self::Error, + >, + > { + match self { + Handler::Enabled(handler) => handler.poll(cx), + Handler::Disabled(DisabledHandler::ProtocolUnsupported { peer_kind_sent }) => { + if !*peer_kind_sent { + *peer_kind_sent = true; + return Poll::Ready(ConnectionHandlerEvent::Custom(HandlerEvent::PeerKind( + PeerKind::NotSupported, + ))); + } + + Poll::Pending + } + Handler::Disabled(DisabledHandler::MaxSubstreamAttempts) => Poll::Pending, + } + } fn on_connection_event( &mut self, @@ -564,19 +485,83 @@ impl ConnectionHandler for Handler { Self::OutboundOpenInfo, >, ) { - match event { - ConnectionEvent::FullyNegotiatedInbound(fully_negotiated_inbound) => { - self.on_fully_negotiated_inbound(fully_negotiated_inbound) - } - ConnectionEvent::FullyNegotiatedOutbound(fully_negotiated_outbound) => { - self.on_fully_negotiated_outbound(fully_negotiated_outbound) - } - ConnectionEvent::DialUpgradeError(DialUpgradeError { error: e, .. }) => { - self.outbound_substream_establishing = false; - warn!("Dial upgrade error {:?}", e); - self.upgrade_errors.push_back(e); + match self { + Handler::Enabled(handler) => { + if event.is_inbound() { + handler.inbound_substream_attempts += 1; + + if handler.inbound_substream_attempts == MAX_SUBSTREAM_ATTEMPTS { + log::warn!( + "The maximum number of inbound substreams attempts has been exceeded" + ); + *self = Handler::Disabled(DisabledHandler::MaxSubstreamAttempts); + return; + } + } + + if event.is_outbound() { + handler.outbound_substream_establishing = false; + + handler.outbound_substream_attempts += 1; + + if handler.outbound_substream_attempts == MAX_SUBSTREAM_ATTEMPTS { + log::warn!( + "The maximum number of outbound substream attempts has been exceeded" + ); + *self = Handler::Disabled(DisabledHandler::MaxSubstreamAttempts); + return; + } + } + + match event { + ConnectionEvent::FullyNegotiatedInbound(FullyNegotiatedInbound { + protocol, + .. + }) => match protocol { + Either::Left(protocol) => handler.on_fully_negotiated_inbound(protocol), + Either::Right(v) => void::unreachable(v), + }, + ConnectionEvent::FullyNegotiatedOutbound(fully_negotiated_outbound) => { + handler.on_fully_negotiated_outbound(fully_negotiated_outbound) + } + ConnectionEvent::DialUpgradeError(DialUpgradeError { + error: ConnectionHandlerUpgrErr::Timeout | ConnectionHandlerUpgrErr::Timer, + .. + }) => { + log::debug!("Dial upgrade error: Protocol negotiation timeout"); + } + ConnectionEvent::DialUpgradeError(DialUpgradeError { + error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(e)), + .. + }) => void::unreachable(e), + ConnectionEvent::DialUpgradeError(DialUpgradeError { + error: + ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select( + NegotiationError::Failed, + )), + .. + }) => { + // The protocol is not supported + log::debug!( + "The remote peer does not support gossipsub on this connection" + ); + *self = Handler::Disabled(DisabledHandler::ProtocolUnsupported { + peer_kind_sent: false, + }); + } + ConnectionEvent::DialUpgradeError(DialUpgradeError { + error: + ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select( + NegotiationError::ProtocolError(e), + )), + .. + }) => { + log::debug!("Protocol negotiation failed: {e}") + } + ConnectionEvent::AddressChange(_) | ConnectionEvent::ListenUpgradeError(_) => {} + } } - ConnectionEvent::AddressChange(_) | ConnectionEvent::ListenUpgradeError(_) => {} + Handler::Disabled(_) => {} } } } diff --git a/protocols/gossipsub/src/lib.rs b/protocols/gossipsub/src/lib.rs index 222a2f34f93..4a1d63d93da 100644 --- a/protocols/gossipsub/src/lib.rs +++ b/protocols/gossipsub/src/lib.rs @@ -158,9 +158,12 @@ mod types; mod rpc_proto; +#[deprecated(note = "This error will no longer be emitted")] +pub type HandlerError = error_priv::HandlerError; + pub use self::behaviour::{Behaviour, Event, MessageAuthenticity}; pub use self::config::{Config, ConfigBuilder, ValidationMode, Version}; -pub use self::error_priv::{HandlerError, PublishError, SubscriptionError, ValidationError}; +pub use self::error_priv::{PublishError, SubscriptionError, ValidationError}; pub use self::peer_score::{ score_parameter_decay, score_parameter_decay_with_base, PeerScoreParams, PeerScoreThresholds, TopicScoreParams, diff --git a/protocols/gossipsub/src/protocol.rs b/protocols/gossipsub/src/protocol.rs index 98e05567929..34d931bbaf9 100644 --- a/protocols/gossipsub/src/protocol.rs +++ b/protocols/gossipsub/src/protocol.rs @@ -24,8 +24,8 @@ use crate::topic::TopicHash; use crate::types::{ ControlAction, MessageId, PeerInfo, PeerKind, RawMessage, Rpc, Subscription, SubscriptionAction, }; +use crate::ValidationError; use crate::{rpc_proto::proto, Config}; -use crate::{HandlerError, ValidationError}; use asynchronous_codec::{Decoder, Encoder, Framed}; use byteorder::{BigEndian, ByteOrder}; use bytes::BytesMut; @@ -37,6 +37,7 @@ use log::{debug, warn}; use quick_protobuf::Writer; use std::pin::Pin; use unsigned_varint::codec; +use void::Void; pub(crate) const SIGNING_PREFIX: &[u8] = b"libp2p-pubsub:"; @@ -147,7 +148,7 @@ where TSocket: AsyncRead + AsyncWrite + Unpin + Send + 'static, { type Output = (Framed, PeerKind); - type Error = HandlerError; + type Error = Void; type Future = Pin> + Send>>; fn upgrade_inbound(self, socket: TSocket, protocol_id: Self::Info) -> Self::Future { @@ -168,7 +169,7 @@ where TSocket: AsyncWrite + AsyncRead + Unpin + Send + 'static, { type Output = (Framed, PeerKind); - type Error = HandlerError; + type Error = Void; type Future = Pin> + Send>>; fn upgrade_outbound(self, socket: TSocket, protocol_id: Self::Info) -> Self::Future { @@ -268,18 +269,18 @@ impl GossipsubCodec { impl Encoder for GossipsubCodec { type Item = proto::RPC; - type Error = HandlerError; + type Error = quick_protobuf_codec::Error; - fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), HandlerError> { - Ok(self.codec.encode(item, dst)?) + fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> { + self.codec.encode(item, dst) } } impl Decoder for GossipsubCodec { type Item = HandlerEvent; - type Error = HandlerError; + type Error = quick_protobuf_codec::Error; - fn decode(&mut self, src: &mut BytesMut) -> Result, HandlerError> { + fn decode(&mut self, src: &mut BytesMut) -> Result, Self::Error> { let rpc = match self.codec.decode(src)? { Some(p) => p, None => return Ok(None), diff --git a/swarm/CHANGELOG.md b/swarm/CHANGELOG.md index 67a31cf8e94..03d128d76b7 100644 --- a/swarm/CHANGELOG.md +++ b/swarm/CHANGELOG.md @@ -1,3 +1,9 @@ +## 0.42.2 - unreleased + +- Add `ConnectionEvent::{is_outbound,is_inbound}`. See [PR 3625]. + +[PR 3625]: https://github.com/libp2p/rust-libp2p/pull/3625 + ## 0.42.1 - Deprecate `ConnectionLimits` in favor of `libp2p::connection_limits`. diff --git a/swarm/Cargo.toml b/swarm/Cargo.toml index eea518f2586..820fabc3aac 100644 --- a/swarm/Cargo.toml +++ b/swarm/Cargo.toml @@ -3,7 +3,7 @@ name = "libp2p-swarm" edition = "2021" rust-version = "1.62.0" description = "The libp2p swarm" -version = "0.42.1" +version = "0.42.2" authors = ["Parity Technologies "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" diff --git a/swarm/src/handler.rs b/swarm/src/handler.rs index 4093ecaee32..1917117c44e 100644 --- a/swarm/src/handler.rs +++ b/swarm/src/handler.rs @@ -212,6 +212,38 @@ pub enum ConnectionEvent<'a, IP: InboundUpgradeSend, OP: OutboundUpgradeSend, IO ListenUpgradeError(ListenUpgradeError), } +impl<'a, IP: InboundUpgradeSend, OP: OutboundUpgradeSend, IOI, OOI> + ConnectionEvent<'a, IP, OP, IOI, OOI> +{ + /// Whether the event concerns an outbound stream. + pub fn is_outbound(&self) -> bool { + match self { + ConnectionEvent::DialUpgradeError(_) | ConnectionEvent::FullyNegotiatedOutbound(_) => { + true + } + ConnectionEvent::FullyNegotiatedInbound(_) + | ConnectionEvent::AddressChange(_) + | ConnectionEvent::ListenUpgradeError(_) => false, + } + } + + /// Whether the event concerns an inbound stream. + pub fn is_inbound(&self) -> bool { + // Note: This will get simpler with https://github.com/libp2p/rust-libp2p/pull/3605. + match self { + ConnectionEvent::FullyNegotiatedInbound(_) + | ConnectionEvent::ListenUpgradeError(ListenUpgradeError { + error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(_)), // Only `Select` is relevant, the others may be for other handlers too. + .. + }) => true, + ConnectionEvent::FullyNegotiatedOutbound(_) + | ConnectionEvent::ListenUpgradeError(_) + | ConnectionEvent::AddressChange(_) + | ConnectionEvent::DialUpgradeError(_) => false, + } + } +} + /// [`ConnectionEvent`] variant that informs the handler about /// the output of a successful upgrade on a new inbound substream. ///