diff --git a/protocols/gossipsub/src/behaviour.rs b/protocols/gossipsub/src/behaviour.rs index 3e3149fafc5d..36d15f1e687e 100644 --- a/protocols/gossipsub/src/behaviour.rs +++ b/protocols/gossipsub/src/behaviour.rs @@ -711,28 +711,27 @@ where // Send to peers we know are subscribed to the topic. let mut publish_failed = true; for peer_id in recipient_peers.iter() { - tracing::trace!(peer=%peer_id, "Sending message to peer"); - let sender = &mut self - .connected_peers - .get_mut(peer_id) - .expect("The peer must be connected") - .sender; - - match sender.publish( - raw_message.clone(), - self.config.publish_queue_duration(), - self.metrics.as_mut(), - ) { - Ok(_) => publish_failed = false, - Err(_) => { - self.failed_messages.entry(*peer_id).or_default().priority += 1; - - tracing::warn!(peer=%peer_id, "Publish queue full. Could not publish to peer"); - // Downscore the peer due to failed message. - if let Some((peer_score, ..)) = &mut self.peer_score { - peer_score.failed_message_slow_peer(peer_id); + if let Some(peer) = &mut self.connected_peers.get_mut(peer_id) { + tracing::trace!(peer=%peer_id, "Sending message to peer"); + match peer.sender.publish( + raw_message.clone(), + self.config.publish_queue_duration(), + self.metrics.as_mut(), + ) { + Ok(_) => publish_failed = false, + Err(_) => { + self.failed_messages.entry(*peer_id).or_default().priority += 1; + + tracing::warn!(peer=%peer_id, "Publish queue full. Could not publish to peer"); + // Downscore the peer due to failed message. + if let Some((peer_score, ..)) = &mut self.peer_score { + peer_score.failed_message_slow_peer(peer_id); + } } } + } else { + tracing::error!(peer = %peer_id, + "Could not PUBLISH, peer doesn't exist in connected peer list"); } } @@ -1013,19 +1012,18 @@ where for peer_id in added_peers { // Send a GRAFT control message - tracing::debug!(peer=%peer_id, "JOIN: Sending Graft message to peer"); if let Some((peer_score, ..)) = &mut self.peer_score { peer_score.graft(&peer_id, topic_hash.clone()); } - let sender = &mut self - .connected_peers - .get_mut(&peer_id) - .expect("Peer must be connected") - .sender; - - sender.graft(Graft { - topic_hash: topic_hash.clone(), - }); + if let Some(peer) = &mut self.connected_peers.get_mut(&peer_id) { + tracing::debug!(peer=%peer_id, "JOIN: Sending Graft message to peer"); + peer.sender.graft(Graft { + topic_hash: topic_hash.clone(), + }); + } else { + tracing::error!(peer = %peer_id, + "Could not GRAFT, peer doesn't exist in connected peer list"); + } // If the peer did not previously exist in any mesh, inform the handler peer_added_to_mesh( @@ -1117,17 +1115,14 @@ where } for peer_id in peers { // Send a PRUNE control message - tracing::debug!(%peer_id, "LEAVE: Sending PRUNE to peer"); - let on_unsubscribe = true; - let prune = - self.make_prune(topic_hash, &peer_id, self.config.do_px(), on_unsubscribe); - let sender = &mut self - .connected_peers - .get_mut(&peer_id) - .expect("Peer must be connected") - .sender; - - sender.prune(prune); + let prune = self.make_prune(topic_hash, &peer_id, self.config.do_px(), true); + if let Some(peer) = &mut self.connected_peers.get_mut(&peer_id) { + tracing::debug!(%peer_id, "LEAVE: Sending PRUNE to peer"); + peer.sender.prune(prune); + } else { + tracing::error!(peer = %peer_id, + "Could not PRUNE, peer doesn't exist in connected peer list"); + } // If the peer did not previously exist in any mesh, inform the handler peer_removed_from_mesh( @@ -1282,34 +1277,35 @@ where Instant::now() + self.config.iwant_followup_time(), ); } - tracing::trace!( - peer=%peer_id, - "IHAVE: Asking for the following messages from peer: {:?}", - iwant_ids_vec - ); - let sender = &mut self - .connected_peers - .get_mut(peer_id) - .expect("Peer must be connected") - .sender; + if let Some(peer) = &mut self.connected_peers.get_mut(peer_id) { + tracing::trace!( + peer=%peer_id, + "IHAVE: Asking for the following messages from peer: {:?}", + iwant_ids_vec + ); - if sender - .iwant(IWant { - message_ids: iwant_ids_vec, - }) - .is_err() - { - tracing::warn!(peer=%peer_id, "Send Queue full. Could not send IWANT"); + if peer + .sender + .iwant(IWant { + message_ids: iwant_ids_vec, + }) + .is_err() + { + tracing::warn!(peer=%peer_id, "Send Queue full. Could not send IWANT"); - if let Some((peer_score, ..)) = &mut self.peer_score { - peer_score.failed_message_slow_peer(peer_id); + if let Some((peer_score, ..)) = &mut self.peer_score { + peer_score.failed_message_slow_peer(peer_id); + } + // Increment failed message count + self.failed_messages + .entry(*peer_id) + .or_default() + .non_priority += 1; } - // Increment failed message count - self.failed_messages - .entry(*peer_id) - .or_default() - .non_priority += 1; + } else { + tracing::error!(peer = %peer_id, + "Could not IWANT, peer doesn't exist in connected peer list"); } } tracing::trace!(peer=%peer_id, "Completed IHAVE handling for peer"); @@ -1345,30 +1341,30 @@ where "IWANT: Peer has asked for message too many times; ignoring request" ); } else { - tracing::debug!(peer=%peer_id, "IWANT: Sending cached messages to peer"); - let sender = &mut self - .connected_peers - .get_mut(peer_id) - .expect("Peer must be connected") - .sender; - - if sender - .forward( - msg, - self.config.forward_queue_duration(), - self.metrics.as_mut(), - ) - .is_err() - { - // Downscore the peer - if let Some((peer_score, ..)) = &mut self.peer_score { - peer_score.failed_message_slow_peer(peer_id); + if let Some(peer) = &mut self.connected_peers.get_mut(peer_id) { + tracing::debug!(peer=%peer_id, "IWANT: Sending cached messages to peer"); + if peer + .sender + .forward( + msg, + self.config.forward_queue_duration(), + self.metrics.as_mut(), + ) + .is_err() + { + // Downscore the peer + if let Some((peer_score, ..)) = &mut self.peer_score { + peer_score.failed_message_slow_peer(peer_id); + } + // Increment the failed message count + self.failed_messages + .entry(*peer_id) + .or_default() + .non_priority += 1; } - // Increment the failed message count - self.failed_messages - .entry(*peer_id) - .or_default() - .non_priority += 1; + } else { + tracing::error!(peer = %peer_id, + "Could not IWANT, peer doesn't exist in connected peer list"); } } } @@ -2016,13 +2012,13 @@ where // If we need to send grafts to peer, do so immediately, rather than waiting for the // heartbeat. - let sender = &mut self - .connected_peers - .get_mut(propagation_source) - .expect("Peer must be connected") - .sender; - for topic_hash in topics_to_graft.into_iter() { - sender.graft(Graft { topic_hash }); + if let Some(peer) = &mut self.connected_peers.get_mut(propagation_source) { + for topic_hash in topics_to_graft.into_iter() { + peer.sender.graft(Graft { topic_hash }); + } + } else { + tracing::error!(peer = %propagation_source, + "Could not GRAFT, peer doesn't exist in connected peer list"); } // Notify the application of the subscriptions @@ -2505,28 +2501,29 @@ where } // send an IHAVE message - let sender = &mut self - .connected_peers - .get_mut(&peer_id) - .expect("Peer must be connected") - .sender; - if sender - .ihave(IHave { - topic_hash: topic_hash.clone(), - message_ids: peer_message_ids, - }) - .is_err() - { - tracing::warn!(peer=%peer_id, "Send Queue full. Could not send IHAVE"); + if let Some(peer) = &mut self.connected_peers.get_mut(&peer_id) { + if peer + .sender + .ihave(IHave { + topic_hash: topic_hash.clone(), + message_ids: peer_message_ids, + }) + .is_err() + { + tracing::warn!(peer=%peer_id, "Send Queue full. Could not send IHAVE"); - if let Some((peer_score, ..)) = &mut self.peer_score { - peer_score.failed_message_slow_peer(&peer_id); + if let Some((peer_score, ..)) = &mut self.peer_score { + peer_score.failed_message_slow_peer(&peer_id); + } + // Increment failed message count + self.failed_messages + .entry(peer_id) + .or_default() + .non_priority += 1; } - // Increment failed message count - self.failed_messages - .entry(peer_id) - .or_default() - .non_priority += 1; + } else { + tracing::error!(peer = %peer_id, + "Could not IHAVE, peer doesn't exist in connected peer list"); } } } @@ -2607,13 +2604,12 @@ where self.config.do_px() && !no_px.contains(peer_id), false, ); - let sender = &mut self - .connected_peers - .get_mut(peer_id) - .expect("Peer must be connected") - .sender; - - sender.prune(prune); + if let Some(peer) = &mut self.connected_peers.get_mut(&peer_id) { + peer.sender.prune(prune); + } else { + tracing::error!(peer = %peer_id, + "Could not PRUNE, peer doesn't exist in connected peer list"); + } // inform the handler peer_removed_from_mesh( @@ -2679,29 +2675,30 @@ where // forward the message to peers if !recipient_peers.is_empty() { for peer_id in recipient_peers.iter() { - tracing::debug!(%peer_id, message=%msg_id, "Sending message to peer"); - let sender = &mut self - .connected_peers - .get_mut(peer_id) - .expect("Peer must be connected") - .sender; - if sender - .forward( - message.clone(), - self.config.forward_queue_duration(), - self.metrics.as_mut(), - ) - .is_err() - { - // Downscore the peer - if let Some((peer_score, ..)) = &mut self.peer_score { - peer_score.failed_message_slow_peer(peer_id); + if let Some(peer) = self.connected_peers.get_mut(peer_id) { + tracing::debug!(%peer_id, message=%msg_id, "Sending message to peer"); + if peer + .sender + .forward( + message.clone(), + self.config.forward_queue_duration(), + self.metrics.as_mut(), + ) + .is_err() + { + // Downscore the peer + if let Some((peer_score, ..)) = &mut self.peer_score { + peer_score.failed_message_slow_peer(peer_id); + } + // Increment the failed message count + self.failed_messages + .entry(*peer_id) + .or_default() + .non_priority += 1; } - // Increment the failed message count - self.failed_messages - .entry(*peer_id) - .or_default() - .non_priority += 1; + } else { + tracing::error!(peer = %peer_id, + "Could not FORWARD, peer doesn't exist in connected peer list"); } } tracing::debug!("Completed forwarding message"); @@ -2850,14 +2847,13 @@ where tracing::debug!(peer=%peer_id, "New peer connected"); // We need to send our subscriptions to the newly-connected node. - let sender = &mut self - .connected_peers - .get_mut(&peer_id) - .expect("Peer must be connected") - .sender; - - for topic_hash in self.mesh.clone().into_keys() { - sender.subscribe(topic_hash); + if let Some(peer) = self.connected_peers.get_mut(&peer_id) { + for topic_hash in self.mesh.clone().into_keys() { + peer.sender.subscribe(topic_hash); + } + } else { + tracing::error!(peer = %peer_id, + "Could not SUBSCRIBE, peer doesn't exist in connected peer list"); } }