Skip to content

Commit

Permalink
feat(swarm): don't close connection in OneShotHandler
Browse files Browse the repository at this point in the history
Related: #3591.

Pull-Request: #4715.
  • Loading branch information
thomaseizinger authored Oct 24, 2023
1 parent 6456bf5 commit 7bbca11
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 23 deletions.
18 changes: 13 additions & 5 deletions protocols/floodsub/src/layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ use libp2p_core::{Endpoint, Multiaddr};
use libp2p_identity::PeerId;
use libp2p_swarm::behaviour::{ConnectionClosed, ConnectionEstablished, FromSwarm};
use libp2p_swarm::{
dial_opts::DialOpts, ConnectionDenied, ConnectionId, NetworkBehaviour, NotifyHandler,
OneShotHandler, THandler, THandlerInEvent, THandlerOutEvent, ToSwarm,
dial_opts::DialOpts, CloseConnection, ConnectionDenied, ConnectionId, NetworkBehaviour,
NotifyHandler, OneShotHandler, THandler, THandlerInEvent, THandlerOutEvent, ToSwarm,
};
use log::warn;
use smallvec::SmallVec;
Expand Down Expand Up @@ -354,13 +354,21 @@ impl NetworkBehaviour for Floodsub {
fn on_connection_handler_event(
&mut self,
propagation_source: PeerId,
_connection_id: ConnectionId,
connection_id: ConnectionId,
event: THandlerOutEvent<Self>,
) {
// We ignore successful sends or timeouts.
let event = match event {
InnerMessage::Rx(event) => event,
InnerMessage::Sent => return,
Ok(InnerMessage::Rx(event)) => event,
Ok(InnerMessage::Sent) => return,
Err(e) => {
log::debug!("Failed to send floodsub message: {e}");
self.events.push_back(ToSwarm::CloseConnection {
peer_id: propagation_source,
connection: CloseConnection::One(connection_id),
});
return;
}
};

// Update connected peers topics
Expand Down
3 changes: 3 additions & 0 deletions swarm/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@
See [PR 4225](https://github.com/libp2p/rust-libp2p/pull/4225).
- Remove deprecated `keep_alive_timeout` in `OneShotHandlerConfig`.
See [PR 4677](https://github.com/libp2p/rust-libp2p/pull/4677).
- Don't close entire connection upon `DialUpgradeError`s within `OneShotHandler`.
Instead, the error is reported as `Err(e)` via `ConnectionHandler::ToBehaviour`.
See [PR 4715](https://github.com/libp2p/rust-libp2p/pull/4715).

## 0.43.6

Expand Down
26 changes: 8 additions & 18 deletions swarm/src/handler/one_shot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@

use crate::handler::{
ConnectionEvent, ConnectionHandler, ConnectionHandlerEvent, DialUpgradeError,
FullyNegotiatedInbound, FullyNegotiatedOutbound, KeepAlive, StreamUpgradeError,
SubstreamProtocol,
FullyNegotiatedInbound, FullyNegotiatedOutbound, KeepAlive, SubstreamProtocol,
};
use crate::upgrade::{InboundUpgradeSend, OutboundUpgradeSend};
use crate::StreamUpgradeError;
use smallvec::SmallVec;
use std::{error, fmt::Debug, task::Context, task::Poll, time::Duration};

Expand All @@ -35,10 +35,8 @@ where
{
/// The upgrade for inbound substreams.
listen_protocol: SubstreamProtocol<TInbound, ()>,
/// If `Some`, something bad happened and we should shut down the handler with an error.
pending_error: Option<StreamUpgradeError<<TOutbound as OutboundUpgradeSend>::Error>>,
/// Queue of events to produce in `poll()`.
events_out: SmallVec<[TEvent; 4]>,
events_out: SmallVec<[Result<TEvent, StreamUpgradeError<TOutbound::Error>>; 4]>,
/// Queue of outbound substreams to open.
dial_queue: SmallVec<[TOutbound; 4]>,
/// Current number of concurrent outbound substreams being opened.
Expand All @@ -60,7 +58,6 @@ where
) -> Self {
OneShotHandler {
listen_protocol,
pending_error: None,
events_out: SmallVec::new(),
dial_queue: SmallVec::new(),
dial_negotiated: 0,
Expand Down Expand Up @@ -121,8 +118,8 @@ where
TEvent: Debug + Send + 'static,
{
type FromBehaviour = TOutbound;
type ToBehaviour = TEvent;
type Error = StreamUpgradeError<<Self::OutboundProtocol as OutboundUpgradeSend>::Error>;
type ToBehaviour = Result<TEvent, StreamUpgradeError<TOutbound::Error>>;
type Error = void::Void;
type InboundProtocol = TInbound;
type OutboundProtocol = TOutbound;
type OutboundOpenInfo = ();
Expand Down Expand Up @@ -151,10 +148,6 @@ where
Self::Error,
>,
> {
if let Some(err) = self.pending_error.take() {
return Poll::Ready(ConnectionHandlerEvent::Close(err));
}

if !self.events_out.is_empty() {
return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
self.events_out.remove(0),
Expand Down Expand Up @@ -197,20 +190,17 @@ where
protocol: out,
..
}) => {
self.events_out.push(out.into());
self.events_out.push(Ok(out.into()));
}
ConnectionEvent::FullyNegotiatedOutbound(FullyNegotiatedOutbound {
protocol: out,
..
}) => {
self.dial_negotiated -= 1;
self.events_out.push(out.into());
self.events_out.push(Ok(out.into()));
}
ConnectionEvent::DialUpgradeError(DialUpgradeError { error, .. }) => {
if self.pending_error.is_none() {
log::debug!("DialUpgradeError: {error}");
self.keep_alive = KeepAlive::No;
}
self.events_out.push(Err(error));
}
ConnectionEvent::AddressChange(_)
| ConnectionEvent::ListenUpgradeError(_)
Expand Down

0 comments on commit 7bbca11

Please sign in to comment.