From 1f5d6ab62cdbcfbb76e9f2785dc795e97793fbff Mon Sep 17 00:00:00 2001 From: Age Manning Date: Tue, 5 Dec 2023 16:20:44 +1100 Subject: [PATCH 1/2] Add metrics for queue lenghts --- protocols/gossipsub/src/behaviour.rs | 10 +++++++++ protocols/gossipsub/src/metrics.rs | 31 ++++++++++++++++++++++++++++ protocols/gossipsub/src/types.rs | 10 +++++++++ 3 files changed, 51 insertions(+) diff --git a/protocols/gossipsub/src/behaviour.rs b/protocols/gossipsub/src/behaviour.rs index 8f72acb63bf..8732682a8ee 100644 --- a/protocols/gossipsub/src/behaviour.rs +++ b/protocols/gossipsub/src/behaviour.rs @@ -2051,6 +2051,16 @@ where tracing::debug!("Starting heartbeat"); let start = Instant::now(); + // Every heartbeat we sample the send queues to add to our metrics. We do this intentionally + // before we add all the gossip from this heartbeat in order to gain a true measure of + // steady-state size of the queues. + if let Some(m) = &mut self.metrics { + for sender_queue in self.handler_send_queues.values() { + m.observe_priority_queue_size(sender_queue.priority_len()); + m.observe_non_priority_queue_size(sender_queue.non_priority_len()); + } + } + self.heartbeat_ticks += 1; let mut to_graft = HashMap::new(); diff --git a/protocols/gossipsub/src/metrics.rs b/protocols/gossipsub/src/metrics.rs index 6b21b0685f8..a3679a044ea 100644 --- a/protocols/gossipsub/src/metrics.rs +++ b/protocols/gossipsub/src/metrics.rs @@ -178,6 +178,11 @@ pub(crate) struct Metrics { /// The number of times we have decided that an IWANT control message is required for this /// topic. A very high metric might indicate an underperforming network. topic_iwant_msgs: Family, + + /// The size of the priority queue. + priority_queue_size: Histogram, + /// The size of the non-priority queue. + non_priority_queue_size: Histogram, } impl Metrics { @@ -316,6 +321,20 @@ impl Metrics { metric }; + let priority_queue_size = Histogram::new(linear_buckets(0.0, 2500.0, 100)); + registry.register( + "priority_queue_size", + "Histogram of observed priority queue sizes", + priority_queue_size.clone(), + ); + + let non_priority_queue_size = Histogram::new(linear_buckets(0.0, 2500.0, 100)); + registry.register( + "non_priority_queue_size", + "Histogram of observed non-priority queue sizes", + non_priority_queue_size.clone(), + ); + Self { max_topics, max_never_subscribed_topics, @@ -343,6 +362,8 @@ impl Metrics { heartbeat_duration, memcache_misses, topic_iwant_msgs, + priority_queue_size, + non_priority_queue_size, } } @@ -532,6 +553,16 @@ impl Metrics { self.heartbeat_duration.observe(millis as f64); } + /// Observes a priority queue size. + pub(crate) fn observe_priority_queue_size(&mut self, len: usize) { + self.priority_queue_size.observe(len as f64); + } + + /// Observes a non-priority queue size. + pub(crate) fn observe_non_priority_queue_size(&mut self, len: usize) { + self.non_priority_queue_size.observe(len as f64); + } + /// Observe a score of a mesh peer. pub(crate) fn observe_mesh_peers_score(&mut self, topic: &TopicHash, score: f64) { if self.register_topic(topic).is_ok() { diff --git a/protocols/gossipsub/src/types.rs b/protocols/gossipsub/src/types.rs index f6438687960..18d6877defb 100644 --- a/protocols/gossipsub/src/types.rs +++ b/protocols/gossipsub/src/types.rs @@ -641,6 +641,16 @@ impl RpcSender { m.msg_sent(&message.topic, message.raw_protobuf_len()); } } + + /// Returns the current size of the priority queue. + pub(crate) fn priority_len(&self) -> usize { + self.len.load(Ordering::Relaxed) + } + + /// Returns the current size of the non-priority queue. + pub(crate) fn non_priority_len(&self) -> usize { + self.non_priority.len() + } } /// `RpcOut` sender that is priority aware. From f6c944a1bccd0fe216102b44a04835443bcd9253 Mon Sep 17 00:00:00 2001 From: Age Manning Date: Tue, 5 Dec 2023 16:21:21 +1100 Subject: [PATCH 2/2] Add changelog --- protocols/gossipsub/CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/protocols/gossipsub/CHANGELOG.md b/protocols/gossipsub/CHANGELOG.md index b8c16942b86..a6559f70c35 100644 --- a/protocols/gossipsub/CHANGELOG.md +++ b/protocols/gossipsub/CHANGELOG.md @@ -1,4 +1,6 @@ ## 0.46.1 - unreleased +- Adds metrics for priority and non-priority queue lengths. + - Implement publish and forward message dropping. - Implement backpressure by diferentiating between priority and non priority messages.