Skip to content

Commit

Permalink
Merge joao's changes
Browse files Browse the repository at this point in the history
  • Loading branch information
AgeManning committed Dec 19, 2023
2 parents b68d509 + b9214e9 commit a0f6a6b
Show file tree
Hide file tree
Showing 4 changed files with 122 additions and 153 deletions.
2 changes: 0 additions & 2 deletions protocols/gossipsub/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@

- Removes the control pool and sends control messages on demand.

- Implement publish and forward message dropping.

- Implement backpressure by differentiating between priority and non priority messages.
Drop `Publish` and `Forward` messages when the queue becomes full.
See [PR 4914](https://github.com/libp2p/rust-libp2p/pull/4914)
Expand Down
173 changes: 81 additions & 92 deletions protocols/gossipsub/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -547,11 +547,11 @@ where
// send subscription request to all peers
for peer_id in self.peer_topics.keys() {
tracing::debug!(%peer_id, "Sending SUBSCRIBE to peer");
let sender = self
let sender = &mut self
.connected_peers
.get_mut(peer_id)
.expect("Peer must be connected")
.send_queue();
.sender;

sender.subscribe(topic_hash.clone());
}
Expand Down Expand Up @@ -579,11 +579,11 @@ where
// announce to all peers
for peer_id in self.peer_topics.keys() {
tracing::debug!(%peer_id, "Sending UNSUBSCRIBE to peer");
let sender = self
let sender = &mut self
.connected_peers
.get_mut(peer_id)
.expect("Peer should be connected")
.send_queue();
.sender;

sender.unsubscribe(topic_hash.clone());
}
Expand Down Expand Up @@ -694,7 +694,7 @@ where
}
}

// Explicit peers that are apart of the topic
// Explicit peers that are part of the topic
for peer in &self.explicit_peers {
if peers_on_topic.contains(peer) {
recipient_peers.insert(*peer);
Expand Down Expand Up @@ -732,35 +732,34 @@ where
}

// Send to peers we know are subscribed to the topic.
let mut errors = 0;
let mut publish_failed = true;
for peer_id in recipient_peers.iter() {
tracing::trace!(peer=%peer_id, "Sending message to peer");
let sender = self
let sender = &mut self
.connected_peers
.get_mut(peer_id)
.expect("The peer must be connected")
.send_queue();

if sender
.publish(
raw_message.clone(),
self.config.publish_queue_duration(),
self.metrics.as_mut(),
)
.is_err()
{
self.failed_messages.entry(*peer_id).or_default().priority += 1;

tracing::warn!(peer=%peer_id, "Publish queue full. Could not publish to peer");
// Downscore the peer due to failed message.
if let Some((peer_score, ..)) = &mut self.peer_score {
peer_score.failed_message_slow_peer(peer_id);
.sender;

match sender.publish(
raw_message.clone(),
self.config.publish_queue_duration(),
self.metrics.as_mut(),
) {
Ok(_) => publish_failed = false,
Err(_) => {
self.failed_messages.entry(*peer_id).or_default().priority += 1;

tracing::warn!(peer=%peer_id, "Publish queue full. Could not publish to peer");
// Downscore the peer due to failed message.
if let Some((peer_score, ..)) = &mut self.peer_score {
peer_score.failed_message_slow_peer(peer_id);
}
}

errors += 1;
}
}
if errors == recipient_peers.len() {

if publish_failed {
return Err(PublishError::InsufficientPeers);
}

Expand Down Expand Up @@ -1042,11 +1041,11 @@ where
if let Some((peer_score, ..)) = &mut self.peer_score {
peer_score.graft(&peer_id, topic_hash.clone());
}
let sender = self
let sender = &mut self
.connected_peers
.get_mut(&peer_id)
.expect("Peer must be connected")
.send_queue();
.sender;

sender.graft(Graft {
topic_hash: topic_hash.clone(),
Expand Down Expand Up @@ -1148,11 +1147,11 @@ where
let on_unsubscribe = true;
let prune =
self.make_prune(topic_hash, &peer_id, self.config.do_px(), on_unsubscribe);
let sender = self
let sender = &mut self
.connected_peers
.get_mut(&peer_id)
.expect("Peer must be connected")
.send_queue();
.sender;

sender.prune(prune);

Expand Down Expand Up @@ -1316,11 +1315,11 @@ where
iwant_ids_vec
);

let sender = self
let sender = &mut self
.connected_peers
.get_mut(peer_id)
.expect("Peer must be connected")
.send_queue();
.sender;

if sender
.iwant(IWant {
Expand Down Expand Up @@ -1374,11 +1373,11 @@ where
);
} else {
tracing::debug!(peer=%peer_id, "IWANT: Sending cached messages to peer");
let sender = self
let sender = &mut self
.connected_peers
.get_mut(peer_id)
.expect("Peer must be connected")
.send_queue();
.sender;

if sender
.forward(
Expand Down Expand Up @@ -1554,7 +1553,7 @@ where
.connected_peers
.get_mut(peer_id)
.expect("Peer must be connected")
.send_queue()
.sender
.clone();

for prune in to_prune_topics
Expand Down Expand Up @@ -2056,16 +2055,13 @@ where

// If we need to send grafts to peer, do so immediately, rather than waiting for the
// heartbeat.
// NOTE: There could be a potential race condition where the peer disconnects before we
// handle this message. We therefore ignore sending grafts if the peer is not connected.
if let Some(sender) = self
let sender = &mut self
.connected_peers
.get_mut(propagation_source)
.map(|connected_peer| connected_peer.send_queue())
{
for topic_hash in topics_to_graft.into_iter() {
sender.graft(Graft { topic_hash });
}
.expect("Peer must be connected")
.sender;
for topic_hash in topics_to_graft.into_iter() {
sender.graft(Graft { topic_hash });
}

// Notify the application of the subscriptions
Expand Down Expand Up @@ -2100,7 +2096,7 @@ where
// before we add all the gossip from this heartbeat in order to gain a true measure of
// steady-state size of the queues.
if let Some(m) = &mut self.metrics {
for sender_queue in self.connected_peers.values_mut().map(|v| v.send_queue()) {
for sender_queue in self.connected_peers.values_mut().map(|v| &v.sender) {
m.observe_priority_queue_size(sender_queue.priority_len());
m.observe_non_priority_queue_size(sender_queue.non_priority_len());
}
Expand Down Expand Up @@ -2570,11 +2566,11 @@ where
}

// send an IHAVE message
let sender = self
let sender = &mut self
.connected_peers
.get_mut(&peer_id)
.expect("Peer must be connected")
.send_queue();
.sender;
if sender
.ihave(IHave {
topic_hash: topic_hash.clone(),
Expand Down Expand Up @@ -2635,7 +2631,7 @@ where
.connected_peers
.get_mut(&peer_id)
.expect("Peer must be connected")
.send_queue()
.sender
.clone();

// The following prunes are not due to unsubscribing.
Expand Down Expand Up @@ -2673,11 +2669,11 @@ where
self.config.do_px() && !no_px.contains(peer_id),
false,
);
let sender = self
let sender = &mut self
.connected_peers
.get_mut(peer_id)
.expect("Peer must be connected")
.send_queue();
.sender;

sender.prune(prune);

Expand Down Expand Up @@ -2714,33 +2710,31 @@ where
tracing::debug!(message=%msg_id, "Forwarding message");
let mut recipient_peers = HashSet::new();

{
// Populate the recipient peers mapping

// Add explicit peers
for peer_id in &self.explicit_peers {
if let Some(topics) = self.peer_topics.get(peer_id) {
if Some(peer_id) != propagation_source
&& !originating_peers.contains(peer_id)
&& Some(peer_id) != message.source.as_ref()
&& topics.contains(&message.topic)
{
recipient_peers.insert(*peer_id);
}
// Populate the recipient peers mapping

// Add explicit peers
for peer_id in &self.explicit_peers {
if let Some(topics) = self.peer_topics.get(peer_id) {
if Some(peer_id) != propagation_source
&& !originating_peers.contains(peer_id)
&& Some(peer_id) != message.source.as_ref()
&& topics.contains(&message.topic)
{
recipient_peers.insert(*peer_id);
}
}
}

// add mesh peers
let topic = &message.topic;
// mesh
if let Some(mesh_peers) = self.mesh.get(topic) {
for peer_id in mesh_peers {
if Some(peer_id) != propagation_source
&& !originating_peers.contains(peer_id)
&& Some(peer_id) != message.source.as_ref()
{
recipient_peers.insert(*peer_id);
}
// add mesh peers
let topic = &message.topic;
// mesh
if let Some(mesh_peers) = self.mesh.get(topic) {
for peer_id in mesh_peers {
if Some(peer_id) != propagation_source
&& !originating_peers.contains(peer_id)
&& Some(peer_id) != message.source.as_ref()
{
recipient_peers.insert(*peer_id);
}
}
}
Expand All @@ -2749,11 +2743,11 @@ where
if !recipient_peers.is_empty() {
for peer_id in recipient_peers.iter() {
tracing::debug!(%peer_id, message=%msg_id, "Sending message to peer");
let sender = self
let sender = &mut self
.connected_peers
.get_mut(peer_id)
.expect("Peer must be connected")
.send_queue();
.sender;
if sender
.forward(
message.clone(),
Expand Down Expand Up @@ -2922,11 +2916,11 @@ where

tracing::debug!(peer=%peer_id, "New peer connected");
// We need to send our subscriptions to the newly-connected node.
let sender = self
let sender = &mut self
.connected_peers
.get_mut(&peer_id)
.expect("Peer must be connected")
.send_queue();
.sender;

for topic_hash in self.mesh.clone().into_keys() {
sender.subscribe(topic_hash);
Expand Down Expand Up @@ -2965,7 +2959,6 @@ where
.position(|v| v == &connection_id)
.expect("Previously established connection to peer must be present");
connections.connections.remove(index);
connections.handler_send_queue.remove(index);

// If there are more connections and this peer is in a mesh, inform the first connection
// handler.
Expand Down Expand Up @@ -3121,9 +3114,6 @@ where
_: &Multiaddr,
_: &Multiaddr,
) -> Result<THandler<Self>, ConnectionDenied> {
let handler_send_queue = RpcSender::new(self.config.connection_handler_queue_len());
let reciever = handler_send_queue.new_receiver();

// By default we assume a peer is only a floodsub peer.
//
// The protocol negotiation occurs once a message is sent/received. Once this happens we
Expand All @@ -3135,14 +3125,15 @@ where
.or_insert(PeerConnections {
kind: PeerKind::Floodsub,
connections: vec![],
handler_send_queue: vec![],
sender: RpcSender::new(self.config.connection_handler_queue_len()),
});
// Add the new connection
connected_peer.connections.push(connection_id);
// Add the send_queue_handler associated with the connection id.
connected_peer.handler_send_queue.push(handler_send_queue);

Ok(Handler::new(self.config.protocol_config(), reciever))
Ok(Handler::new(
self.config.protocol_config(),
connected_peer.sender.new_receiver(),
))
}

fn handle_established_outbound_connection(
Expand All @@ -3152,9 +3143,6 @@ where
_: &Multiaddr,
_: Endpoint,
) -> Result<THandler<Self>, ConnectionDenied> {
let handler_send_queue = RpcSender::new(self.config.connection_handler_queue_len());
let reciever = handler_send_queue.new_receiver();

// By default we assume a peer is only a floodsub peer.
//
// The protocol negotiation occurs once a message is sent/received. Once this happens we
Expand All @@ -3166,14 +3154,15 @@ where
.or_insert(PeerConnections {
kind: PeerKind::Floodsub,
connections: vec![],
handler_send_queue: vec![],
sender: RpcSender::new(self.config.connection_handler_queue_len()),
});
// Add the new connection
connected_peer.connections.push(connection_id);
// Add the send_queue_handler associated with the connection id.
connected_peer.handler_send_queue.push(handler_send_queue);

Ok(Handler::new(self.config.protocol_config(), reciever))
Ok(Handler::new(
self.config.protocol_config(),
connected_peer.sender.new_receiver(),
))
}

fn on_connection_handler_event(
Expand Down
Loading

0 comments on commit a0f6a6b

Please sign in to comment.