Skip to content

Commit

Permalink
fix AggregatedMetrics::merge_metrics
Browse files Browse the repository at this point in the history
the method would insert non-mergeable backend metrics.
  • Loading branch information
Keksoj committed Jul 11, 2024
1 parent 64b558e commit 15ac8eb
Showing 1 changed file with 32 additions and 45 deletions.
77 changes: 32 additions & 45 deletions command/src/proto/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::collections::BTreeMap;

use command::{
filtered_metrics::Inner, AggregatedMetrics, Bucket, ClusterMetrics, FilteredHistogram,
filtered_metrics::Inner, AggregatedMetrics, BackendMetrics, Bucket, FilteredHistogram,
FilteredMetrics,
};
use prost::DecodeError;
Expand Down Expand Up @@ -56,63 +56,50 @@ impl AggregatedMetrics {

for (_worker_id, worker) in workers {
for (metric_name, new_value) in worker.proxy {
if !new_value.is_mergeable() {
continue;
if new_value.is_mergeable() {
self.proxying
.entry(metric_name)
.and_modify(|old_value| old_value.merge(&new_value))
.or_insert(new_value);
}
self.proxying
.entry(metric_name)
.and_modify(|old_value| old_value.merge(&new_value))
.or_insert(new_value);
}

for (cluster_id, mut cluster_metrics) in worker.clusters {
for (metric_name, new_value) in cluster_metrics.cluster {
if !new_value.is_mergeable() {
continue;
if new_value.is_mergeable() {
let cluster = self.clusters.entry(cluster_id.to_owned()).or_default();

cluster
.cluster
.entry(metric_name)
.and_modify(|old_value| old_value.merge(&new_value))
.or_insert(new_value);
}
self.clusters
.entry(cluster_id.to_owned())
.and_modify(|cluster| {
cluster
.cluster
.entry(metric_name.clone())
.and_modify(|old_value| old_value.merge(&new_value))
.or_insert(new_value.clone());
})
.or_insert(ClusterMetrics {
cluster: BTreeMap::from([(metric_name, new_value)]),
backends: Vec::new(),
});
}

for backend in cluster_metrics.backends.drain(..) {
for (metric_name, new_value) in &backend.metrics {
if !new_value.is_mergeable() {
continue;
}
self.clusters
.entry(cluster_id.to_owned())
.and_modify(|cluster| {
let found_backend = cluster
.backends
.iter_mut()
.find(|present| present.backend_id == backend.backend_id);

let Some(existing_backend) = found_backend else {
cluster.backends.push(backend.clone());
return;
};
for (metric_name, new_value) in backend.metrics {
if new_value.is_mergeable() {
let cluster = self.clusters.entry(cluster_id.to_owned()).or_default();

let found_backend = cluster
.backends
.iter_mut()
.find(|present| &present.backend_id == &backend.backend_id);

if let Some(existing_backend) = found_backend {
let _ = existing_backend
.metrics
.entry(metric_name.clone())
.entry(metric_name)
.and_modify(|old_value| old_value.merge(&new_value))
.or_insert(new_value.to_owned());
})
.or_insert(ClusterMetrics {
cluster: BTreeMap::new(),
backends: vec![backend.clone()],
});
.or_insert(new_value);
} else {
cluster.backends.push(BackendMetrics {
backend_id: backend.backend_id.clone(),
metrics: BTreeMap::from([(metric_name, new_value)]),
});
};
}
}
}
}
Expand Down

0 comments on commit 15ac8eb

Please sign in to comment.