diff --git a/command/src/command.proto b/command/src/command.proto index f69db483d..63405ecda 100644 --- a/command/src/command.proto +++ b/command/src/command.proto @@ -572,6 +572,7 @@ message BackendMetrics { map metrics = 2; } +// A metric, in a "filtered" format, which means: sendable to outside programs. message FilteredMetrics { oneof inner { uint64 gauge = 1; @@ -579,6 +580,7 @@ message FilteredMetrics { uint64 time = 3; Percentiles percentiles = 4; FilteredTimeSerie time_serie = 5; + FilteredHistogram histogram = 6; } } @@ -588,7 +590,6 @@ message FilteredTimeSerie { repeated uint32 last_hour = 3; } - message Percentiles { required uint64 samples = 1; required uint64 p_50 = 2; @@ -600,6 +601,20 @@ message Percentiles { required uint64 p_100 = 8; } +// a histogram meant to be translated to prometheus +message FilteredHistogram { + required uint64 sum = 1; + required uint64 count = 2; + repeated Bucket buckets = 3; +} + +// a prometheus histogram bucket +message Bucket { + required uint64 count = 1; + // upper range of the bucket (le = less or equal) + required uint64 le = 2; +} + message RequestCounts { map map = 1; -} \ No newline at end of file +} diff --git a/command/src/proto/display.rs b/command/src/proto/display.rs index 3b7f02b5c..d4d5c39ac 100644 --- a/command/src/proto/display.rs +++ b/command/src/proto/display.rs @@ -18,6 +18,8 @@ use crate::proto::{ DisplayError, }; +use super::command::FilteredHistogram; + impl Display for CertificateAndKey { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { let versions = self.versions.iter().fold(String::new(), |acc, tls_v| { @@ -232,6 +234,7 @@ fn print_proxy_metrics(proxy_metrics: &BTreeMap) { let filtered = filter_metrics(proxy_metrics); print_gauges_and_counts(&filtered); print_percentiles(&filtered); + print_histograms(&filtered); } fn print_worker_metrics(worker_metrics: &WorkerMetrics) -> Result<(), DisplayError> { @@ -372,6 +375,37 @@ fn print_percentiles(filtered_metrics: &BTreeMap) { percentile_table.printstd(); } +fn print_histograms(filtered_metrics: &BTreeMap) { + let histograms: BTreeMap = filtered_metrics + .iter() + .filter_map(|(name, metric)| match metric.inner.clone() { + Some(filtered_metrics::Inner::Histogram(hist)) => Some((name.to_owned(), hist)), + _ => None, + }) + .collect(); + + for (name, histogram) in histograms { + print_histogram(&name, &histogram); + } +} + +fn print_histogram(metric_name: &str, hist: &FilteredHistogram) { + println!("{}", metric_name); + let mut first_row = Row::new(vec![cell!("sum"), cell!("count")]); + let mut value_row = Row::new(vec![cell!(hist.sum), cell!(hist.count)]); + + for bucket in &hist.buckets { + first_row.add_cell(cell!(bucket.le)); + value_row.add_cell(cell!(bucket.count)); + } + + let mut table = Table::new(); + table.set_format(*prettytable::format::consts::FORMAT_BOX_CHARS); + table.add_row(first_row); + table.add_row(value_row); + table.printstd(); +} + fn print_available_metrics(available_metrics: &AvailableMetrics) -> Result<(), DisplayError> { println!("Available metrics on the proxy level:"); for metric_name in &available_metrics.proxy_metrics { diff --git a/lib/src/metrics/local_drain.rs b/lib/src/metrics/local_drain.rs index faf010915..fc9fe3e8f 100644 --- a/lib/src/metrics/local_drain.rs +++ b/lib/src/metrics/local_drain.rs @@ -1,21 +1,40 @@ +//! A local drain to accumulate metrics and return them in a protobuf format +//! +//! The metrics are stored following this hierarchy (pseudo-rust): +//! +//! ```plain +//! LocalDrain { +//! proxy_metrics: MetricsMap { +//! map: BTreeMap +//! }, +//! cluster_metrics: BTreeMap +//! }, +//! backends: Vec +//! }, +//! }> +//! }> +//! } +//! ``` + #![allow(dead_code)] -use std::{ - collections::{btree_map::Entry, BTreeMap}, - str, - time::Instant, -}; +use std::{collections::BTreeMap, str, time::Instant}; use hdrhistogram::Histogram; use sozu_command::proto::command::{ - filtered_metrics, response_content::ContentType, AvailableMetrics, BackendMetrics, - ClusterMetrics, FilteredMetrics, MetricsConfiguration, Percentiles, QueryMetricsOptions, - ResponseContent, WorkerMetrics, + filtered_metrics, response_content::ContentType, AvailableMetrics, BackendMetrics, Bucket, + ClusterMetrics, FilteredHistogram, FilteredMetrics, MetricsConfiguration, Percentiles, + QueryMetricsOptions, ResponseContent, WorkerMetrics, }; use crate::metrics::{MetricError, MetricValue, Subscriber}; -/// This is how the metrics are stored in the local drain +/// metrics as stored in the local drain #[derive(Debug, Clone)] pub enum AggregatedMetric { Gauge(usize), @@ -101,45 +120,135 @@ pub fn histogram_to_percentiles(hist: &Histogram) -> Percentiles { } } -#[derive(Copy, Clone, Debug, PartialEq)] -enum MetricKind { - Gauge, - Count, - Time, +/// convert a collected histogram to a prometheus-compatible format +pub fn filter_histogram(hist: &Histogram) -> FilteredMetrics { + let count = hist.len(); + let mean = hist.mean(); + let min = hist.min(); + let max = hist.max(); + let sum = (count as f64 * mean) as u64; + let range = max - min; + let number_of_buckets = 10; + let mut buckets = Vec::new(); + + for i in 0..number_of_buckets { + let le = min + (i * range) / number_of_buckets; + let count = hist.count_between(min, min + le); + buckets.push(Bucket { le, count }); + } + FilteredMetrics { + inner: Some(filtered_metrics::Inner::Histogram(FilteredHistogram { + sum, + count, + buckets, + })), + } } -#[derive(Clone, Debug, PartialEq)] -enum MetricMeta { - Cluster, - ClusterBackend, +/// a map of metric_name -> metric value +#[derive(Debug, Clone)] +pub struct MetricsMap { + map: BTreeMap, +} + +impl MetricsMap { + fn new() -> Self { + Self { + map: BTreeMap::new(), + } + } + + /// convert a metrics map to a map of filtered metrics, + /// perform a double conversion for time metrics: to percentiles and to histogram + fn to_filtered_metrics( + &self, + filter_by_names: &Vec, + ) -> BTreeMap { + let mut to_filter = self.map.clone(); + if !filter_by_names.is_empty() { + to_filter.retain(|key, _| filter_by_names.contains(key)); + } + + let mut filtered_metrics = BTreeMap::new(); + for (name, metric) in to_filter { + filtered_metrics.insert(name.to_owned(), metric.to_filtered()); + // convert time metrics to a histogram format, on top of percentiles + match metric { + AggregatedMetric::Time(ref hist) => { + filtered_metrics.insert(format!("{}_hist", name), filter_histogram(hist)); + } + _ => {} + } + } + debug!("filtered metrics: {:#?}", filtered_metrics); + filtered_metrics + } + + fn metric_names(&self) -> Vec { + self.map.keys().map(|k| k.to_owned()).collect() + } } /// local equivalent to proto::command::ClusterMetrics #[derive(Debug)] pub struct LocalClusterMetrics { /// metric_name -> metric value - cluster: BTreeMap, - /// + cluster: MetricsMap, + // cluster: BTreeMap, backends: Vec, } impl LocalClusterMetrics { + fn receive_metric( + &mut self, + metric_name: &str, + metric: MetricValue, + ) -> Result<(), MetricError> { + match self.cluster.map.get_mut(metric_name) { + Some(existing_metric) => existing_metric.update(metric_name, metric), + None => { + let aggregated_metric = AggregatedMetric::new(metric)?; + self.cluster + .map + .insert(metric_name.to_owned(), aggregated_metric); + } + } + Ok(()) + } + + fn receive_backend_metric( + &mut self, + metric_name: &str, + backend_id: &str, + metric: MetricValue, + ) -> Result<(), MetricError> { + for backend_metrics in self.backends.iter_mut() { + if backend_metrics.backend_id == backend_id { + if let Some(existing_metric) = backend_metrics.metrics.map.get_mut(metric_name) { + existing_metric.update(metric_name, metric); + return Ok(()); + } + } + } + + let mut metrics = MetricsMap::new(); + metrics + .map + .insert(metric_name.to_owned(), AggregatedMetric::new(metric)?); + let backend = LocalBackendMetrics { + backend_id: backend_id.to_owned(), + metrics, + }; + + self.backends.push(backend); + Ok(()) + } + fn to_filtered_metrics( &self, metric_names: &Vec, ) -> Result { - let cluster = self - .cluster - .iter() - .filter(|(key, _)| { - if metric_names.is_empty() { - true - } else { - metric_names.contains(key) - } - }) - .map(|(metric_name, metric)| (metric_name.to_owned(), metric.to_filtered())) - .collect(); + let cluster = self.cluster.to_filtered_metrics(metric_names); let mut backends: Vec = Vec::new(); for backend in &self.backends { @@ -149,7 +258,7 @@ impl LocalClusterMetrics { } fn metric_names(&self) -> Vec { - let mut names: Vec = self.cluster.keys().map(|k| k.to_owned()).collect(); + let mut names: Vec = self.cluster.metric_names(); for backend in &self.backends { for name in backend.metrics_names() { @@ -174,7 +283,7 @@ impl LocalClusterMetrics { pub struct LocalBackendMetrics { backend_id: String, /// metric_name -> value - metrics: BTreeMap, + metrics: MetricsMap, } impl LocalBackendMetrics { @@ -182,18 +291,7 @@ impl LocalBackendMetrics { &self, metric_names: &Vec, ) -> Result { - let filtered_backend_metrics = self - .metrics - .iter() - .filter(|(key, _)| { - if metric_names.is_empty() { - true - } else { - metric_names.contains(key) - } - }) - .map(|(metric_name, value)| (metric_name.to_owned(), value.to_filtered())) - .collect::>(); + let filtered_backend_metrics = self.metrics.to_filtered_metrics(metric_names); Ok(BackendMetrics { backend_id: self.backend_id.to_owned(), @@ -202,7 +300,7 @@ impl LocalBackendMetrics { } fn metrics_names(&self) -> Vec { - self.metrics.keys().map(|k| k.to_owned()).collect() + self.metrics.metric_names() } } @@ -213,7 +311,7 @@ pub struct LocalDrain { pub prefix: String, pub created: Instant, /// metrics of the proxy server (metric_name -> metric value) - pub proxy_metrics: BTreeMap, + pub proxy_metrics: MetricsMap, /// cluster_id -> cluster_metrics cluster_metrics: BTreeMap, use_tagged_metrics: bool, @@ -226,7 +324,7 @@ impl LocalDrain { LocalDrain { prefix, created: Instant::now(), - proxy_metrics: BTreeMap::new(), + proxy_metrics: MetricsMap::new(), cluster_metrics: BTreeMap::new(), use_tagged_metrics: false, origin: String::from("x"), @@ -273,7 +371,7 @@ impl LocalDrain { } fn list_all_metric_names(&self) -> Result { - let proxy_metrics = self.proxy_metrics.keys().cloned().collect(); + let proxy_metrics = self.proxy_metrics.metric_names(); let mut cluster_metrics_names = Vec::new(); @@ -306,17 +404,8 @@ impl LocalDrain { &mut self, metric_names: &Vec, ) -> BTreeMap { - self.proxy_metrics - .iter() - .filter(|(key, _)| { - if metric_names.is_empty() { - true - } else { - metric_names.contains(key) - } - }) - .map(|(key, value)| (key.to_string(), value.to_filtered())) - .collect() + self.proxy_metrics.to_filtered_metrics(metric_names) + // filter_aggregated_metrics(&self.proxy_metrics, metric_names) } pub fn dump_cluster_metrics( @@ -338,12 +427,12 @@ impl LocalDrain { cluster_id: &str, metric_names: &Vec, ) -> Result { - let aggregated = self + let local_cluster_metrics = self .cluster_metrics .get(cluster_id) .ok_or(MetricError::NoMetrics(cluster_id.to_owned()))?; - let filtered = aggregated.to_filtered_metrics(metric_names)?; + let filtered = local_cluster_metrics.to_filtered_metrics(metric_names)?; Ok(filtered) } @@ -430,112 +519,65 @@ impl LocalDrain { }) } - // TODO: implement this as a method of LocalClusterMetrics for readability - fn receive_cluster_metric_new( + fn receive_cluster_metric( &mut self, metric_name: &str, cluster_id: &str, metric: MetricValue, - ) { + ) -> Result<(), MetricError> { if self.disable_cluster_metrics { - return; + return Ok(()); } let local_cluster_metric = self.cluster_metrics .entry(cluster_id.to_owned()) .or_insert(LocalClusterMetrics { - cluster: BTreeMap::new(), + cluster: MetricsMap::new(), backends: Vec::new(), }); - match local_cluster_metric.cluster.get_mut(metric_name) { - Some(existing_metric) => existing_metric.update(metric_name, metric), - None => { - let aggregated_metric = match AggregatedMetric::new(metric) { - Ok(m) => m, - Err(e) => { - return error!("Could not aggregate metric: {}", e.to_string()); - } - }; - local_cluster_metric - .cluster - .insert(metric_name.to_owned(), aggregated_metric); - } - } + local_cluster_metric.receive_metric(metric_name, metric) } - // TODO: implement this as a method of LocalBackendMetrics for readability fn receive_backend_metric( &mut self, metric_name: &str, cluster_id: &str, backend_id: &str, metric: MetricValue, - ) { + ) -> Result<(), MetricError> { if self.disable_cluster_metrics { - return; + return Ok(()); } - let aggregated_metric = match AggregatedMetric::new(metric.clone()) { - Ok(m) => m, - Err(e) => { - return error!("Could not aggregate metric: {}", e.to_string()); - } - }; - match self.cluster_metrics.entry(cluster_id.to_owned()) { - Entry::Vacant(entry) => { - let mut metrics = BTreeMap::new(); - metrics.insert(metric_name.to_owned(), aggregated_metric); - let backends = [LocalBackendMetrics { - backend_id: backend_id.to_owned(), - metrics, - }] - .to_vec(); - - entry.insert(LocalClusterMetrics { - cluster: BTreeMap::new(), - backends, + let local_cluster_metric = + self.cluster_metrics + .entry(cluster_id.to_owned()) + .or_insert(LocalClusterMetrics { + cluster: MetricsMap::new(), + backends: Vec::new(), }); - } - Entry::Occupied(mut entry) => { - for backend_metrics in &mut entry.get_mut().backends { - if backend_metrics.backend_id == backend_id { - if let Some(existing_metric) = backend_metrics.metrics.get_mut(metric_name) - { - existing_metric.update(metric_name, metric); - return; - } - } - } - let mut metrics = BTreeMap::new(); - metrics.insert(metric_name.to_owned(), aggregated_metric); - let backend = LocalBackendMetrics { - backend_id: backend_id.to_owned(), - metrics, - }; - - entry.get_mut().backends.push(backend); - } - } + local_cluster_metric.receive_backend_metric(metric_name, backend_id, metric) } - fn receive_proxy_metric(&mut self, metric_name: &str, metric: MetricValue) { - match self.proxy_metrics.get_mut(metric_name) { + fn receive_proxy_metric( + &mut self, + metric_name: &str, + metric: MetricValue, + ) -> Result<(), MetricError> { + match self.proxy_metrics.map.get_mut(metric_name) { Some(stored_metric) => stored_metric.update(metric_name, metric), None => { - let aggregated_metric = match AggregatedMetric::new(metric) { - Ok(m) => m, - Err(e) => { - return error!("Could not aggregate metric: {}", e.to_string()); - } - }; + let aggregated_metric = AggregatedMetric::new(metric)?; self.proxy_metrics + .map .insert(String::from(metric_name), aggregated_metric); } } + Ok(()) } } @@ -555,12 +597,16 @@ impl Subscriber for LocalDrain { metric ); - match (cluster_id, backend_id) { + let receive_result = match (cluster_id, backend_id) { (Some(cluster_id), Some(backend_id)) => { self.receive_backend_metric(key, cluster_id, backend_id, metric) } - (Some(cluster_id), None) => self.receive_cluster_metric_new(key, cluster_id, metric), + (Some(cluster_id), None) => self.receive_cluster_metric(key, cluster_id, metric), (None, _) => self.receive_proxy_metric(key, metric), + }; + + if let Err(e) = receive_result { + error!("Could not receive metric: {}", e.to_string()); } } }