From b500f999d27309b00377f8e7062c53fa3d1e2223 Mon Sep 17 00:00:00 2001 From: taikulawo Date: Sun, 3 Nov 2024 15:18:18 +0800 Subject: [PATCH 01/11] 1 --- metrics-exporter-prometheus/Cargo.toml | 6 +- .../src/exporter/mod.rs | 5 + .../src/exporter/remote_write.rs | 69 +++++ .../src/exporter/remote_write_proto.rs | 275 ++++++++++++++++++ 4 files changed, 354 insertions(+), 1 deletion(-) create mode 100644 metrics-exporter-prometheus/src/exporter/remote_write.rs create mode 100644 metrics-exporter-prometheus/src/exporter/remote_write_proto.rs diff --git a/metrics-exporter-prometheus/Cargo.toml b/metrics-exporter-prometheus/Cargo.toml index 434e336f..21c23385 100644 --- a/metrics-exporter-prometheus/Cargo.toml +++ b/metrics-exporter-prometheus/Cargo.toml @@ -17,11 +17,12 @@ categories = ["development-tools::debugging"] keywords = ["metrics", "telemetry", "prometheus"] [features] -default = ["http-listener", "push-gateway"] +default = ["http-listener", "push-gateway","remote-write"] async-runtime = ["tokio", "hyper-util/tokio"] http-listener = ["async-runtime", "ipnet", "tracing", "_hyper-server"] uds-listener = ["http-listener"] push-gateway = ["async-runtime", "tracing", "_hyper-client"] +remote-write = ["_hyper-client","async-runtime","dep:prost","dep:snap","dep:prometheus-parse","dep:prost"] _hyper-server = ["http-body-util", "hyper/server", "hyper-util/server-auto"] _hyper-client = [ "http-body-util", @@ -48,7 +49,10 @@ metrics-util = { version = "^0.18", path = "../metrics-util", default-features = "registry", "summary", ] } +prometheus-parse = {version = "0.2.5", optional = true} +prost = {workspace = true, optional = true} quanta = { workspace = true } +snap = { version = "1.1.1", optional = true} thiserror = { workspace = true } tokio = { workspace = true, optional = true } tracing = { workspace = true, optional = true } diff --git a/metrics-exporter-prometheus/src/exporter/mod.rs b/metrics-exporter-prometheus/src/exporter/mod.rs index d10c0336..490daa64 100644 --- a/metrics-exporter-prometheus/src/exporter/mod.rs +++ b/metrics-exporter-prometheus/src/exporter/mod.rs @@ -71,4 +71,9 @@ mod http_listener; #[cfg(feature = "push-gateway")] mod push_gateway; +#[cfg(feature = "remote-write")] +mod remote_write; +#[cfg(feature = "remote-write")] +mod remote_write_proto; + pub(crate) mod builder; diff --git a/metrics-exporter-prometheus/src/exporter/remote_write.rs b/metrics-exporter-prometheus/src/exporter/remote_write.rs new file mode 100644 index 00000000..a0ce0544 --- /dev/null +++ b/metrics-exporter-prometheus/src/exporter/remote_write.rs @@ -0,0 +1,69 @@ +use std::time::Duration; + +use http_body_util::{BodyExt, Collected, Full}; +use hyper::{body::Bytes, Method, Request, Uri}; +use hyper_util::{client::legacy::Client, rt::TokioExecutor}; +use tracing::error; + +use crate::PrometheusHandle; + +use super::ExporterFuture; + +// Creates an ExporterFuture implementing a push gateway. +pub(super) fn new_remote_write( + endpoint: Uri, + interval: Duration, + handle: PrometheusHandle, +) -> ExporterFuture { + Box::pin(async move { + let https = hyper_rustls::HttpsConnectorBuilder::new() + .with_native_roots() + .expect("no native root CA certificates found") + .https_or_http() + .enable_http1() + .build(); + let client: Client<_, Full> = Client::builder(TokioExecutor::new()) + .pool_idle_timeout(Duration::from_secs(30)) + .build(https); + + loop { + // Sleep for `interval` amount of time, and then do a push. + tokio::time::sleep(interval).await; + + let mut builder = Request::builder(); + + let output = handle.render(); + let result = builder.method(Method::PUT).uri(endpoint.clone()).body(Full::from(output)); + let req = match result { + Ok(req) => req, + Err(e) => { + error!("failed to build push gateway request: {}", e); + continue; + } + }; + + match client.request(req).await { + Ok(response) => { + if !response.status().is_success() { + let status = response.status(); + let status = status.canonical_reason().unwrap_or_else(|| status.as_str()); + let body = response + .into_body() + .collect() + .await + .map(Collected::to_bytes) + .map_err(|_| ()) + .and_then(|b| String::from_utf8(b[..].to_vec()).map_err(|_| ())) + .unwrap_or_else(|()| String::from("")); + error!( + message = "unexpected status after pushing metrics to push gateway", + status, + %body, + ); + } + } + Err(e) => error!("error sending request to push gateway: {:?}", e), + } + } + }) +} diff --git a/metrics-exporter-prometheus/src/exporter/remote_write_proto.rs b/metrics-exporter-prometheus/src/exporter/remote_write_proto.rs new file mode 100644 index 00000000..09333fa8 --- /dev/null +++ b/metrics-exporter-prometheus/src/exporter/remote_write_proto.rs @@ -0,0 +1,275 @@ +//! Types and utilities for calling Prometheus remote write API endpoints. + +use hyper::{header, Method, Request, Uri}; + +/// Special label for the name of a metric. +pub const LABEL_NAME: &str = "__name__"; +pub const CONTENT_TYPE: &str = "application/x-protobuf"; +pub const HEADER_NAME_REMOTE_WRITE_VERSION: &str = "X-Prometheus-Remote-Write-Version"; +pub const REMOTE_WRITE_VERSION_01: &str = "0.1.0"; + +/// A write request. +/// +/// .proto: +/// ```protobuf +/// message WriteRequest { +/// repeated TimeSeries timeseries = 1; +/// // Cortex uses this field to determine the source of the write request. +/// // We reserve it to avoid any compatibility issues. +/// reserved 2; + +/// // Prometheus uses this field to send metadata, but this is +/// // omitted from v1 of the spec as it is experimental. +/// reserved 3; +/// } +/// ``` +#[derive(prost::Message, Clone, PartialEq)] +pub struct WriteRequest { + #[prost(message, repeated, tag = "1")] + pub timeseries: Vec, +} + +impl WriteRequest { + /// Prepare the write request for sending. + /// + /// Ensures that the request conforms to the specification. + /// See https://prometheus.io/docs/concepts/remote_write_spec. + pub fn sort(&mut self) { + for series in &mut self.timeseries { + series.sort_labels_and_samples(); + } + } + + pub fn sorted(mut self) -> Self { + self.sort(); + self + } + + /// Encode this write request as a protobuf message. + /// + /// NOTE: The API requires snappy compression, not a raw protobuf message. + pub fn encode_proto3(self) -> Vec { + prost::Message::encode_to_vec(&self.sorted()) + } + + pub fn encode_compressed(self) -> Result, snap::Error> { + snap::raw::Encoder::new().compress_vec(&self.encode_proto3()) + } + + /// Parse metrics from the Prometheus text format, and convert them into a + /// [`WriteRequest`]. + + pub fn from_text_format( + text: String, + ) -> Result> { + fn samples_to_timeseries( + samples: Vec, + ) -> Result, Box> { + let mut all_series = std::collections::HashMap::::new(); + + for sample in &samples { + let mut labels = + sample.labels.iter().map(|(k, v)| (k.as_str(), v.as_str())).collect::>(); + + labels.push((LABEL_NAME, sample.metric.as_str())); + + labels.sort_by(|a, b| a.0.cmp(b.0)); + + let mut ident = sample.metric.clone(); + ident.push_str("_$$_"); + for (k, v) in &labels { + ident.push_str(k); + ident.push('='); + ident.push_str(v); + } + + let series = all_series.entry(ident).or_insert_with(|| { + let labels = labels + .iter() + .map(|(k, v)| Label { name: k.to_string(), value: v.to_string() }) + .collect::>(); + + TimeSeries { labels, samples: vec![] } + }); + + let value = match sample.value { + prometheus_parse::Value::Counter(v) => v, + prometheus_parse::Value::Gauge(v) => v, + prometheus_parse::Value::Histogram(_) => { + Err("histogram not supported yet".to_string())? + } + prometheus_parse::Value::Summary(_) => { + Err("summary not supported yet".to_string())? + } + prometheus_parse::Value::Untyped(v) => v, + }; + + series + .samples + .push(Sample { value, timestamp: sample.timestamp.timestamp_millis() }); + } + + Ok(all_series.into_values().collect()) + } + + let iter = text.trim().lines().map(|x| Ok(x.to_string())); + let parsed = prometheus_parse::Scrape::parse(iter) + .map_err(|err| format!("could not parse input as Prometheus text format: {err}"))?; + + let mut series = samples_to_timeseries(parsed.samples)?; + series.sort_by(|a, b| { + let name_a = a.labels.iter().find(|x| x.name == LABEL_NAME).unwrap(); + let name_b = b.labels.iter().find(|x| x.name == LABEL_NAME).unwrap(); + name_a.value.cmp(&name_b.value) + }); + + let s = Self { timeseries: series }; + + Ok(s.sorted()) + } + + /// Build a fully prepared HTTP request that an be sent to a remote write endpoint. + pub fn build_http_request( + self, + endpoint: &Uri, + user_agent: &str, + ) -> Result>, Box> { + let req = Request::builder() + .method(Method::POST) + .uri(endpoint) + .header(header::CONTENT_TYPE, CONTENT_TYPE) + .header(HEADER_NAME_REMOTE_WRITE_VERSION, REMOTE_WRITE_VERSION_01) + .header(header::CONTENT_ENCODING, "snappy") + .header(header::USER_AGENT, user_agent) + .body(self.encode_compressed()?)?; + + Ok(req) + } +} + +/// A time series. +/// +/// .proto: +/// ```protobuf +/// message TimeSeries { +/// repeated Label labels = 1; +/// repeated Sample samples = 2; +/// } +/// ``` +#[derive(prost::Message, Clone, PartialEq)] +pub struct TimeSeries { + #[prost(message, repeated, tag = "1")] + pub labels: Vec