diff --git a/protocols/gossipsub/src/handler.rs b/protocols/gossipsub/src/handler.rs index 331aecf8987..907df61cc4a 100644 --- a/protocols/gossipsub/src/handler.rs +++ b/protocols/gossipsub/src/handler.rs @@ -247,10 +247,6 @@ impl EnabledHandler { }); } - // We may need to inform the behviour if we have a dropped a message. This gets set if that - // is the case. - let mut dropped_message = None; - // process outbound stream loop { match std::mem::replace( @@ -271,10 +267,11 @@ impl EnabledHandler { } => { if Pin::new(timeout).poll(cx).is_ready() { // Inform the behaviour and end the poll. - dropped_message = Some(HandlerEvent::MessageDropped(message)); self.outbound_substream = Some(OutboundSubstreamState::WaitingOutput(substream)); - break; + return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( + HandlerEvent::MessageDropped(message), + )); } } _ => {} // All other messages are not time-bound. @@ -348,13 +345,7 @@ impl EnabledHandler { } } - // If there was a timeout in sending a message, inform the behaviour before restarting the - // poll - if let Some(handler_event) = dropped_message { - return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(handler_event)); - } - - // Handle inbound messages + // Handle inbound messages. loop { match std::mem::replace( &mut self.inbound_substream, @@ -419,6 +410,32 @@ impl EnabledHandler { } } + // Drop the next message in queue if it's stale. + let mut peakable = self.send_queue.clone().peekable(); + if let Poll::Ready(Some(mut message)) = peakable.poll_next_unpin(cx) { + match message { + RpcOut::Publish { + message: _, + ref mut timeout, + } + | RpcOut::Forward { + message: _, + ref mut timeout, + } => { + if Pin::new(timeout).poll(cx).is_ready() { + // Drop the message. + let dropped = futures::ready!(self.send_queue.poll_next_unpin(cx)) + .expect("There should be a message"); + return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( + HandlerEvent::MessageDropped(dropped), + )); + } + } + // the next message in queue is not time bound. + _ => {} + } + } + Poll::Pending } }