Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(gossipsub): remove control pool #554

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 11 additions & 43 deletions protocols/gossipsub/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -244,9 +244,6 @@ pub struct Behaviour<D = IdentityTransform, F = AllowAllSubscriptionFilter> {
/// Events that need to be yielded to the outside when polling.
events: VecDeque<ToSwarm<Event, HandlerIn>>,

/// Pools non-urgent control messages between heartbeats.
control_pool: HashMap<PeerId, Vec<ControlAction>>,

/// Information used for publishing messages.
publish_config: PublishConfig,

Expand Down Expand Up @@ -439,7 +436,6 @@ where
Ok(Behaviour {
metrics: metrics.map(|(registry, cfg)| Metrics::new(registry, cfg)),
events: VecDeque::new(),
control_pool: HashMap::new(),
publish_config: privacy.into(),
duplicate_cache: DuplicateCache::new(config.duplicate_cache_time()),
topic_peers: HashMap::new(),
Expand Down Expand Up @@ -994,12 +990,11 @@ where
if let Some((peer_score, ..)) = &mut self.peer_score {
peer_score.graft(&peer_id, topic_hash.clone());
}
Self::control_pool_add(
&mut self.control_pool,
self.send_message(
peer_id,
ControlAction::Graft {
RpcOut::Control(ControlAction::Graft {
topic_hash: topic_hash.clone(),
},
}),
);

// If the peer did not previously exist in any mesh, inform the handler
Expand Down Expand Up @@ -1098,7 +1093,7 @@ where
let on_unsubscribe = true;
let control =
self.make_prune(topic_hash, &peer, self.config.do_px(), on_unsubscribe);
Self::control_pool_add(&mut self.control_pool, peer, control);
self.send_message(peer, RpcOut::Control(control));

// If the peer did not previously exist in any mesh, inform the handler
peer_removed_from_mesh(
Expand Down Expand Up @@ -1269,12 +1264,11 @@ where
iwant_ids_vec
);

Self::control_pool_add(
&mut self.control_pool,
self.send_message(
*peer_id,
ControlAction::IWant {
RpcOut::Control(ControlAction::IWant {
message_ids: iwant_ids_vec,
},
}),
);
}
tracing::trace!(peer=%peer_id, "Completed IHAVE handling for peer");
Expand Down Expand Up @@ -2387,9 +2381,6 @@ where
self.send_graft_prune(to_graft, to_prune, no_px);
}

// piggyback pooled control messages
self.flush_control_pool();

// shift the memcache
self.mcache.shift();

Expand All @@ -2404,7 +2395,7 @@ where
/// and fanout peers
fn emit_gossip(&mut self) {
let mut rng = thread_rng();
for (topic_hash, peers) in self.mesh.iter().chain(self.fanout.iter()) {
for (topic_hash, peers) in self.mesh.clone().iter().chain(self.fanout.clone().iter()) {
let mut message_ids = self.mcache.get_gossip_message_ids(topic_hash);
if message_ids.is_empty() {
continue;
Expand Down Expand Up @@ -2456,13 +2447,12 @@ where
}

// send an IHAVE message
Self::control_pool_add(
&mut self.control_pool,
self.send_message(
peer,
ControlAction::IHave {
RpcOut::Control(ControlAction::IHave {
topic_hash: topic_hash.clone(),
message_ids: peer_message_ids,
},
}),
);
}
}
Expand Down Expand Up @@ -2707,27 +2697,6 @@ where
}
}

// adds a control action to control_pool
fn control_pool_add(
control_pool: &mut HashMap<PeerId, Vec<ControlAction>>,
peer: PeerId,
control: ControlAction,
) {
control_pool.entry(peer).or_default().push(control);
}

/// Takes each control action mapping and turns it into a message
fn flush_control_pool(&mut self) {
for (peer, controls) in self.control_pool.drain().collect::<Vec<_>>() {
for msg in controls {
self.send_message(peer, RpcOut::Control(msg));
}
}

// This clears all pending IWANT messages
self.pending_iwant_msgs.clear();
}

/// Send a [`RpcOut`] message to a peer. This will wrap the message in an arc if it
/// is not already an arc.
fn send_message(&mut self, peer_id: PeerId, rpc: RpcOut) {
Expand Down Expand Up @@ -3349,7 +3318,6 @@ impl<C: DataTransform, F: TopicSubscriptionFilter> fmt::Debug for Behaviour<C, F
f.debug_struct("Behaviour")
.field("config", &self.config)
.field("events", &self.events.len())
.field("control_pool", &self.control_pool)
.field("publish_config", &self.publish_config)
.field("topic_peers", &self.topic_peers)
.field("peer_topics", &self.peer_topics)
Expand Down
97 changes: 49 additions & 48 deletions protocols/gossipsub/src/behaviour/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -531,24 +531,21 @@ fn test_join() {
"Should have added 6 nodes to the mesh"
);

fn collect_grafts(
mut collected_grafts: Vec<ControlAction>,
(_, controls): (&PeerId, &Vec<ControlAction>),
) -> Vec<ControlAction> {
for c in controls.iter() {
if let ControlAction::Graft { topic_hash: _ } = c {
collected_grafts.push(c.clone())
fn is_graft(message: &&ToSwarm<Event, HandlerIn>) -> bool {
matches!(
message,
ToSwarm::NotifyHandler {
event: HandlerIn::Message(RpcOut::Control(ControlAction::Graft { topic_hash: _ })),
..
}
}
collected_grafts
)
}

// there should be mesh_n GRAFT messages.
let graft_messages = gs.control_pool.iter().fold(vec![], collect_grafts);
let graft_messages = gs.events.iter().filter(is_graft).count();

assert_eq!(
graft_messages.len(),
6,
graft_messages, 6,
"There should be 6 grafts messages sent to peers"
);

Expand Down Expand Up @@ -594,10 +591,10 @@ fn test_join() {
}

// there should now be 12 graft messages to be sent
let graft_messages = gs.control_pool.iter().fold(vec![], collect_grafts);
let graft_messages = gs.events.iter().filter(is_graft).count();

assert!(
graft_messages.len() == 12,
assert_eq!(
graft_messages, 12,
"There should be 12 grafts messages sent to peers"
);
}
Expand Down Expand Up @@ -1139,15 +1136,18 @@ fn test_handle_ihave_subscribed_and_msg_not_cached() {
);

// check that we sent an IWANT request for `unknown id`
let iwant_exists = match gs.control_pool.get(&peers[7]) {
Some(controls) => controls.iter().any(|c| match c {
ControlAction::IWant { message_ids } => message_ids
let iwant_exists = gs.events.iter().any(|message| {
matches!(
message,
ToSwarm::NotifyHandler {
peer_id,
event: HandlerIn::Message(RpcOut::Control(ControlAction::IWant { message_ids })),
..
} if peer_id == &peers[7] && message_ids
.iter()
.any(|m| *m == MessageId::new(b"unknown id")),
_ => false,
}),
_ => false,
};
.any(|m| *m == MessageId::new(b"unknown id"))
)
});

assert!(
iwant_exists,
Expand Down Expand Up @@ -1311,25 +1311,20 @@ fn count_control_msgs<D: DataTransform, F: TopicSubscriptionFilter>(
gs: &Behaviour<D, F>,
mut filter: impl FnMut(&PeerId, &ControlAction) -> bool,
) -> usize {
gs.control_pool
gs.events
.iter()
.map(|(peer_id, actions)| actions.iter().filter(|m| filter(peer_id, m)).count())
.sum::<usize>()
+ gs.events
.iter()
.filter(|e| match e {
ToSwarm::NotifyHandler {
peer_id,
event: HandlerIn::Message(RpcOut::Control(action)),
..
} => filter(peer_id, action),
_ => false,
})
.count()
.filter(|e| match e {
ToSwarm::NotifyHandler {
peer_id,
event: HandlerIn::Message(RpcOut::Control(action)),
..
} => filter(peer_id, action),
_ => false,
})
.count()
}

fn flush_events<D: DataTransform, F: TopicSubscriptionFilter>(gs: &mut Behaviour<D, F>) {
gs.control_pool.clear();
gs.events.clear();
}

Expand Down Expand Up @@ -1673,16 +1668,22 @@ fn no_gossip_gets_sent_to_explicit_peers() {
}

//assert that no gossip gets sent to explicit peer
assert_eq!(
gs.control_pool
.get(&peers[0])
.unwrap_or(&Vec::new())
.iter()
.filter(|m| matches!(m, ControlAction::IHave { .. }))
.count(),
0,
"Gossip got emitted to explicit peer"
);
let gossip_count = gs
.events
.iter()
.filter(|message| {
matches!(
message,
ToSwarm::NotifyHandler {
peer_id,
event: HandlerIn::Message(RpcOut::Control(ControlAction::IHave { .. })),
..
} if peer_id == &peers[0]
)
})
.count();

assert_eq!(gossip_count, 0, "Gossip got emitted to explicit peer");
}

// Tests the mesh maintenance addition
Expand Down
Loading