From c4c4848b7a31adaa7d5b0847017600e7fd8649f0 Mon Sep 17 00:00:00 2001 From: Scott Opell Date: Mon, 8 Jul 2024 13:43:30 -0400 Subject: [PATCH] Adds Proof Of Concept HTTP unix domain socket listener --- metrics-exporter-prometheus/Cargo.toml | 37 ++++++- .../examples/prometheus_uds_server.rs | 62 ++++++++++++ .../src/exporter/builder.rs | 30 ++++++ .../src/exporter/mod.rs | 8 ++ .../src/exporter/uds_listener.rs | 96 +++++++++++++++++++ 5 files changed, 228 insertions(+), 5 deletions(-) create mode 100644 metrics-exporter-prometheus/examples/prometheus_uds_server.rs create mode 100644 metrics-exporter-prometheus/src/exporter/uds_listener.rs diff --git a/metrics-exporter-prometheus/Cargo.toml b/metrics-exporter-prometheus/Cargo.toml index c2372fac..d3b669e0 100644 --- a/metrics-exporter-prometheus/Cargo.toml +++ b/metrics-exporter-prometheus/Cargo.toml @@ -20,24 +20,47 @@ keywords = ["metrics", "telemetry", "prometheus"] default = ["http-listener", "push-gateway"] async-runtime = ["tokio", "hyper-util/tokio"] http-listener = ["async-runtime", "ipnet", "tracing", "_hyper-server"] +uds-listener = ["http-listener"] push-gateway = ["async-runtime", "tracing", "_hyper-client"] _hyper-server = ["http-body-util", "hyper/server", "hyper-util/server-auto"] -_hyper-client = ["http-body-util", "hyper/client", "hyper-util/client", "hyper-util/http1", "hyper-util/client-legacy", "hyper-rustls"] +_hyper-client = [ + "http-body-util", + "hyper/client", + "hyper-util/client", + "hyper-util/http1", + "hyper-util/client-legacy", + "hyper-rustls", +] [dependencies] metrics = { version = "^0.23", path = "../metrics" } -metrics-util = { version = "^0.17", path = "../metrics-util", default-features = false, features = ["recency", "registry", "summary"] } +metrics-util = { version = "^0.17", path = "../metrics-util", default-features = false, features = [ + "recency", + "registry", + "summary", +] } thiserror = { version = "1", default-features = false } quanta = { version = "0.12", default-features = false } indexmap = { version = "2.1", default-features = false, features = ["std"] } base64 = { version = "0.22.0", default-features = false, features = ["std"] } # Optional -hyper = { version = "1.1", features = [ "server", "client" ], optional = true } -hyper-util = { version="0.1.3", features = [ "tokio", "service", "client", "client-legacy", "http1" ], optional = true } +hyper = { version = "1.1", features = ["server", "client"], optional = true } +hyper-util = { version = "0.1.3", features = [ + "tokio", + "service", + "client", + "client-legacy", + "http1", +], optional = true } http-body-util = { version = "0.1.0", optional = true } ipnet = { version = "2", optional = true } -tokio = { version = "1", features = ["rt", "net", "time", "rt-multi-thread"], optional = true } +tokio = { version = "1", features = [ + "rt", + "net", + "time", + "rt-multi-thread", +], optional = true } tracing = { version = "0.1.26", optional = true } hyper-rustls = { version = "0.27.2", optional = true } @@ -55,6 +78,10 @@ required-features = ["push-gateway"] name = "prometheus_server" required-features = ["http-listener"] +[[example]] +name = "prometheus_uds_server" +required-features = ["uds-listener"] + [package.metadata.docs.rs] all-features = true rustdoc-args = ["--cfg", "docsrs"] diff --git a/metrics-exporter-prometheus/examples/prometheus_uds_server.rs b/metrics-exporter-prometheus/examples/prometheus_uds_server.rs new file mode 100644 index 00000000..770c07ff --- /dev/null +++ b/metrics-exporter-prometheus/examples/prometheus_uds_server.rs @@ -0,0 +1,62 @@ +use std::thread; +use std::time::Duration; + +use metrics::{counter, describe_counter, describe_histogram, gauge, histogram}; +use metrics_exporter_prometheus::PrometheusBuilder; +use metrics_util::MetricKindMask; + +use quanta::Clock; +use rand::{thread_rng, Rng}; + +fn main() { + tracing_subscriber::fmt::init(); + + let builder = PrometheusBuilder::new().with_http_uds_listener("/tmp/metrics.sock"); + builder + .idle_timeout( + MetricKindMask::COUNTER | MetricKindMask::HISTOGRAM, + Some(Duration::from_secs(10)), + ) + .install() + .expect("failed to install Prometheus recorder"); + + // We register these metrics, which gives us a chance to specify a description for them. The + // Prometheus exporter records this description and adds it as HELP text when the endpoint is + // scraped. + // + // Registering metrics ahead of using them is not required, but is the only way to specify the + // description of a metric. + describe_counter!("tcp_server_loops", "The iterations of the TCP server event loop so far."); + describe_histogram!( + "tcp_server_loop_delta_secs", + "The time taken for iterations of the TCP server event loop." + ); + + let clock = Clock::new(); + let mut last = None; + + counter!("idle_metric").increment(1); + gauge!("testing").set(42.0); + + // Loop over and over, pretending to do some work. + loop { + counter!("tcp_server_loops", "system" => "foo").increment(1); + + if let Some(t) = last { + let delta: Duration = clock.now() - t; + histogram!("tcp_server_loop_delta_secs", "system" => "foo").record(delta); + } + + let increment_gauge = thread_rng().gen_bool(0.75); + let gauge = gauge!("lucky_iterations"); + if increment_gauge { + gauge.increment(1.0); + } else { + gauge.decrement(1.0); + } + + last = Some(clock.now()); + + thread::sleep(Duration::from_millis(750)); + } +} diff --git a/metrics-exporter-prometheus/src/exporter/builder.rs b/metrics-exporter-prometheus/src/exporter/builder.rs index 309ed8f2..a6668975 100644 --- a/metrics-exporter-prometheus/src/exporter/builder.rs +++ b/metrics-exporter-prometheus/src/exporter/builder.rs @@ -38,6 +38,8 @@ pub struct PrometheusBuilder { exporter_config: ExporterConfig, #[cfg(feature = "http-listener")] allowed_addresses: Option>, + #[cfg(feature = "uds-listener")] + listen_path: std::path::PathBuf, quantiles: Vec, bucket_duration: Option, bucket_count: Option, @@ -61,12 +63,17 @@ impl PrometheusBuilder { #[cfg(not(feature = "http-listener"))] let exporter_config = ExporterConfig::Unconfigured; + #[cfg(feature = "uds-listener")] + let listen_path = std::path::PathBuf::from("/tmp/metrics.sock"); + let upkeep_timeout = Duration::from_secs(5); Self { exporter_config, #[cfg(feature = "http-listener")] allowed_addresses: None, + #[cfg(feature = "uds-listener")] + listen_path, quantiles, bucket_duration: None, bucket_count: None, @@ -133,6 +140,24 @@ impl PrometheusBuilder { Ok(self) } + /// Configures the exporter to expose an HTTP Unix Domain Socket listener that functions as a [scrape endpoint]. + /// + /// The HTTP listener that is spawned will respond to GET requests on any request path. + /// + /// Running in HTTP listener mode is mutually exclusive with the push gateway i.e. enabling the + /// HTTP listener will disable the push gateway, and vise versa. + /// + /// Defaults to disabled, if enabled, default listening at `/tmp/metrics.sock` + /// + /// [scrape endpoint]: https://prometheus.io/docs/instrumenting/exposition_formats/#text-based-format + #[cfg(feature = "uds-listener")] + #[cfg_attr(docsrs, doc(cfg(feature = "uds-listener")))] + #[must_use] + pub fn with_http_uds_listener(mut self, addr: impl Into) -> Self { + self.exporter_config = ExporterConfig::UdsListener { listen_path: addr.into() }; + self + } + /// Adds an IP address or subnet to the allowlist for the scrape endpoint. /// /// If a client makes a request to the scrape endpoint and their IP is not present in the @@ -457,6 +482,11 @@ impl PrometheusBuilder { endpoint, interval, username, password, handle, ) } + + #[cfg(feature = "uds-listener")] + ExporterConfig::UdsListener { listen_path } => { + super::uds_listener::new_http_uds_listener(handle, listen_path)? + } }, )) } diff --git a/metrics-exporter-prometheus/src/exporter/mod.rs b/metrics-exporter-prometheus/src/exporter/mod.rs index eb25ceed..6bf3cde5 100644 --- a/metrics-exporter-prometheus/src/exporter/mod.rs +++ b/metrics-exporter-prometheus/src/exporter/mod.rs @@ -20,6 +20,9 @@ enum ExporterConfig { #[cfg(feature = "http-listener")] HttpListener { listen_address: SocketAddr }, + #[cfg(feature = "uds-listener")] + UdsListener { listen_path: std::path::PathBuf }, + // Run a push gateway task sending to the given `endpoint` after `interval` time has elapsed, // infinitely. #[cfg(feature = "push-gateway")] @@ -43,6 +46,8 @@ impl ExporterConfig { #[cfg(feature = "push-gateway")] Self::PushGateway { .. } => "push-gateway", Self::Unconfigured => "unconfigured,", + #[cfg(feature = "uds-listener")] + Self::UdsListener { .. } => "uds-listener", } } } @@ -53,4 +58,7 @@ mod http_listener; #[cfg(feature = "push-gateway")] mod push_gateway; +#[cfg(feature = "uds-listener")] +mod uds_listener; + pub(crate) mod builder; diff --git a/metrics-exporter-prometheus/src/exporter/uds_listener.rs b/metrics-exporter-prometheus/src/exporter/uds_listener.rs new file mode 100644 index 00000000..5aadddc2 --- /dev/null +++ b/metrics-exporter-prometheus/src/exporter/uds_listener.rs @@ -0,0 +1,96 @@ +use std::net::SocketAddr; + +use http_body_util::Full; +use hyper::{ + body::{self, Bytes, Incoming}, + server::conn::http1::Builder as HyperHttpBuilder, + service::service_fn, + Request, Response, StatusCode, +}; +use hyper_util::rt::TokioIo; +use ipnet::IpNet; +use std::path::PathBuf; +use tokio::net::{TcpListener, TcpStream, UnixListener, UnixStream}; +use tracing::warn; + +use crate::{common::BuildError, ExporterFuture, PrometheusHandle}; + +struct UnixListeningExporter { + handle: PrometheusHandle, +} + +impl UnixListeningExporter { + async fn serve(&self, listener: UnixListener) -> Result<(), hyper::Error> { + loop { + let stream = match listener.accept().await { + Ok((stream, _)) => stream, + Err(e) => { + warn!(error = ?e, "Error accepting connection. Ignoring request."); + continue; + } + }; + + self.process_stream(stream).await; + } + } + + async fn process_stream(&self, stream: UnixStream) { + let handle = self.handle.clone(); + let service = service_fn(move |req: Request| { + let handle = handle.clone(); + async move { Ok::<_, hyper::Error>(Self::handle_http_request(&handle, &req)) } + }); + + tokio::spawn(async move { + if let Err(err) = + HyperHttpBuilder::new().serve_connection(TokioIo::new(stream), service).await + { + warn!(error = ?err, "Error serving connection."); + }; + }); + } + + fn handle_http_request( + handle: &PrometheusHandle, + req: &Request, + ) -> Response> { + Response::new(match req.uri().path() { + "/health" => "OK".into(), + _ => handle.render().into(), + }) + } + + fn new_forbidden_response() -> Response> { + // This unwrap should not fail because we don't use any function that + // can assign an Err to it's inner such as `Builder::header``. A unit test + // will have to suffice to detect if this fails to hold true. + Response::builder().status(StatusCode::FORBIDDEN).body(Full::::default()).unwrap() + } +} + +/// Creates an `ExporterFuture` implementing a http listener that servies prometheus metrics. +/// +/// # Errors +/// Will return Err if it cannot bind to the listen address +pub(crate) fn new_http_uds_listener( + handle: PrometheusHandle, + listen_path: PathBuf, +) -> Result { + let listener = UnixListener::bind(listen_path) + .and_then(|listener| Ok(listener)) + .map_err(|e| BuildError::FailedToCreateHTTPListener(e.to_string()))?; + + let exporter = UnixListeningExporter { handle }; + + Ok(Box::pin(async move { exporter.serve(listener).await })) +} + +#[cfg(test)] +mod tests { + use crate::exporter::uds_listener::UnixListeningExporter; + + #[test] + fn new_forbidden_response_always_succeeds() { + UnixListeningExporter::new_forbidden_response(); // doesn't panic + } +}