Skip to content

Commit

Permalink
Adds Proof Of Concept HTTP unix domain socket listener
Browse files Browse the repository at this point in the history
  • Loading branch information
scottopell committed Jul 8, 2024
1 parent f84efc4 commit c4c4848
Show file tree
Hide file tree
Showing 5 changed files with 228 additions and 5 deletions.
37 changes: 32 additions & 5 deletions metrics-exporter-prometheus/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,24 +20,47 @@ keywords = ["metrics", "telemetry", "prometheus"]
default = ["http-listener", "push-gateway"]
async-runtime = ["tokio", "hyper-util/tokio"]
http-listener = ["async-runtime", "ipnet", "tracing", "_hyper-server"]
uds-listener = ["http-listener"]
push-gateway = ["async-runtime", "tracing", "_hyper-client"]
_hyper-server = ["http-body-util", "hyper/server", "hyper-util/server-auto"]
_hyper-client = ["http-body-util", "hyper/client", "hyper-util/client", "hyper-util/http1", "hyper-util/client-legacy", "hyper-rustls"]
_hyper-client = [
"http-body-util",
"hyper/client",
"hyper-util/client",
"hyper-util/http1",
"hyper-util/client-legacy",
"hyper-rustls",
]

[dependencies]
metrics = { version = "^0.23", path = "../metrics" }
metrics-util = { version = "^0.17", path = "../metrics-util", default-features = false, features = ["recency", "registry", "summary"] }
metrics-util = { version = "^0.17", path = "../metrics-util", default-features = false, features = [
"recency",
"registry",
"summary",
] }
thiserror = { version = "1", default-features = false }
quanta = { version = "0.12", default-features = false }
indexmap = { version = "2.1", default-features = false, features = ["std"] }
base64 = { version = "0.22.0", default-features = false, features = ["std"] }

# Optional
hyper = { version = "1.1", features = [ "server", "client" ], optional = true }
hyper-util = { version="0.1.3", features = [ "tokio", "service", "client", "client-legacy", "http1" ], optional = true }
hyper = { version = "1.1", features = ["server", "client"], optional = true }
hyper-util = { version = "0.1.3", features = [
"tokio",
"service",
"client",
"client-legacy",
"http1",
], optional = true }
http-body-util = { version = "0.1.0", optional = true }
ipnet = { version = "2", optional = true }
tokio = { version = "1", features = ["rt", "net", "time", "rt-multi-thread"], optional = true }
tokio = { version = "1", features = [
"rt",
"net",
"time",
"rt-multi-thread",
], optional = true }
tracing = { version = "0.1.26", optional = true }
hyper-rustls = { version = "0.27.2", optional = true }

Expand All @@ -55,6 +78,10 @@ required-features = ["push-gateway"]
name = "prometheus_server"
required-features = ["http-listener"]

[[example]]
name = "prometheus_uds_server"
required-features = ["uds-listener"]

[package.metadata.docs.rs]
all-features = true
rustdoc-args = ["--cfg", "docsrs"]
62 changes: 62 additions & 0 deletions metrics-exporter-prometheus/examples/prometheus_uds_server.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
use std::thread;
use std::time::Duration;

use metrics::{counter, describe_counter, describe_histogram, gauge, histogram};
use metrics_exporter_prometheus::PrometheusBuilder;
use metrics_util::MetricKindMask;

use quanta::Clock;
use rand::{thread_rng, Rng};

fn main() {
tracing_subscriber::fmt::init();

let builder = PrometheusBuilder::new().with_http_uds_listener("/tmp/metrics.sock");
builder
.idle_timeout(
MetricKindMask::COUNTER | MetricKindMask::HISTOGRAM,
Some(Duration::from_secs(10)),
)
.install()
.expect("failed to install Prometheus recorder");

// We register these metrics, which gives us a chance to specify a description for them. The
// Prometheus exporter records this description and adds it as HELP text when the endpoint is
// scraped.
//
// Registering metrics ahead of using them is not required, but is the only way to specify the
// description of a metric.
describe_counter!("tcp_server_loops", "The iterations of the TCP server event loop so far.");
describe_histogram!(
"tcp_server_loop_delta_secs",
"The time taken for iterations of the TCP server event loop."
);

let clock = Clock::new();
let mut last = None;

counter!("idle_metric").increment(1);
gauge!("testing").set(42.0);

// Loop over and over, pretending to do some work.
loop {
counter!("tcp_server_loops", "system" => "foo").increment(1);

if let Some(t) = last {
let delta: Duration = clock.now() - t;
histogram!("tcp_server_loop_delta_secs", "system" => "foo").record(delta);
}

let increment_gauge = thread_rng().gen_bool(0.75);
let gauge = gauge!("lucky_iterations");
if increment_gauge {
gauge.increment(1.0);
} else {
gauge.decrement(1.0);
}

last = Some(clock.now());

thread::sleep(Duration::from_millis(750));
}
}
30 changes: 30 additions & 0 deletions metrics-exporter-prometheus/src/exporter/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ pub struct PrometheusBuilder {
exporter_config: ExporterConfig,
#[cfg(feature = "http-listener")]
allowed_addresses: Option<Vec<IpNet>>,
#[cfg(feature = "uds-listener")]
listen_path: std::path::PathBuf,
quantiles: Vec<Quantile>,
bucket_duration: Option<Duration>,
bucket_count: Option<NonZeroU32>,
Expand All @@ -61,12 +63,17 @@ impl PrometheusBuilder {
#[cfg(not(feature = "http-listener"))]
let exporter_config = ExporterConfig::Unconfigured;

#[cfg(feature = "uds-listener")]
let listen_path = std::path::PathBuf::from("/tmp/metrics.sock");

let upkeep_timeout = Duration::from_secs(5);

Self {
exporter_config,
#[cfg(feature = "http-listener")]
allowed_addresses: None,
#[cfg(feature = "uds-listener")]
listen_path,
quantiles,
bucket_duration: None,
bucket_count: None,
Expand Down Expand Up @@ -133,6 +140,24 @@ impl PrometheusBuilder {
Ok(self)
}

/// Configures the exporter to expose an HTTP Unix Domain Socket listener that functions as a [scrape endpoint].
///
/// The HTTP listener that is spawned will respond to GET requests on any request path.
///
/// Running in HTTP listener mode is mutually exclusive with the push gateway i.e. enabling the
/// HTTP listener will disable the push gateway, and vise versa.
///
/// Defaults to disabled, if enabled, default listening at `/tmp/metrics.sock`
///
/// [scrape endpoint]: https://prometheus.io/docs/instrumenting/exposition_formats/#text-based-format
#[cfg(feature = "uds-listener")]
#[cfg_attr(docsrs, doc(cfg(feature = "uds-listener")))]
#[must_use]
pub fn with_http_uds_listener(mut self, addr: impl Into<std::path::PathBuf>) -> Self {
self.exporter_config = ExporterConfig::UdsListener { listen_path: addr.into() };
self
}

/// Adds an IP address or subnet to the allowlist for the scrape endpoint.
///
/// If a client makes a request to the scrape endpoint and their IP is not present in the
Expand Down Expand Up @@ -457,6 +482,11 @@ impl PrometheusBuilder {
endpoint, interval, username, password, handle,
)
}

#[cfg(feature = "uds-listener")]
ExporterConfig::UdsListener { listen_path } => {
super::uds_listener::new_http_uds_listener(handle, listen_path)?
}
},
))
}
Expand Down
8 changes: 8 additions & 0 deletions metrics-exporter-prometheus/src/exporter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ enum ExporterConfig {
#[cfg(feature = "http-listener")]
HttpListener { listen_address: SocketAddr },

#[cfg(feature = "uds-listener")]
UdsListener { listen_path: std::path::PathBuf },

// Run a push gateway task sending to the given `endpoint` after `interval` time has elapsed,
// infinitely.
#[cfg(feature = "push-gateway")]
Expand All @@ -43,6 +46,8 @@ impl ExporterConfig {
#[cfg(feature = "push-gateway")]
Self::PushGateway { .. } => "push-gateway",
Self::Unconfigured => "unconfigured,",
#[cfg(feature = "uds-listener")]
Self::UdsListener { .. } => "uds-listener",
}
}
}
Expand All @@ -53,4 +58,7 @@ mod http_listener;
#[cfg(feature = "push-gateway")]
mod push_gateway;

#[cfg(feature = "uds-listener")]
mod uds_listener;

pub(crate) mod builder;
96 changes: 96 additions & 0 deletions metrics-exporter-prometheus/src/exporter/uds_listener.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
use std::net::SocketAddr;

use http_body_util::Full;
use hyper::{
body::{self, Bytes, Incoming},
server::conn::http1::Builder as HyperHttpBuilder,
service::service_fn,
Request, Response, StatusCode,
};
use hyper_util::rt::TokioIo;
use ipnet::IpNet;
use std::path::PathBuf;
use tokio::net::{TcpListener, TcpStream, UnixListener, UnixStream};
use tracing::warn;

use crate::{common::BuildError, ExporterFuture, PrometheusHandle};

struct UnixListeningExporter {
handle: PrometheusHandle,
}

impl UnixListeningExporter {
async fn serve(&self, listener: UnixListener) -> Result<(), hyper::Error> {
loop {
let stream = match listener.accept().await {
Ok((stream, _)) => stream,
Err(e) => {
warn!(error = ?e, "Error accepting connection. Ignoring request.");
continue;
}
};

self.process_stream(stream).await;
}
}

async fn process_stream(&self, stream: UnixStream) {
let handle = self.handle.clone();
let service = service_fn(move |req: Request<body::Incoming>| {
let handle = handle.clone();
async move { Ok::<_, hyper::Error>(Self::handle_http_request(&handle, &req)) }
});

tokio::spawn(async move {
if let Err(err) =
HyperHttpBuilder::new().serve_connection(TokioIo::new(stream), service).await
{
warn!(error = ?err, "Error serving connection.");
};
});
}

fn handle_http_request(
handle: &PrometheusHandle,
req: &Request<Incoming>,
) -> Response<Full<Bytes>> {
Response::new(match req.uri().path() {
"/health" => "OK".into(),
_ => handle.render().into(),
})
}

fn new_forbidden_response() -> Response<Full<Bytes>> {
// This unwrap should not fail because we don't use any function that
// can assign an Err to it's inner such as `Builder::header``. A unit test
// will have to suffice to detect if this fails to hold true.
Response::builder().status(StatusCode::FORBIDDEN).body(Full::<Bytes>::default()).unwrap()
}
}

/// Creates an `ExporterFuture` implementing a http listener that servies prometheus metrics.
///
/// # Errors
/// Will return Err if it cannot bind to the listen address
pub(crate) fn new_http_uds_listener(
handle: PrometheusHandle,
listen_path: PathBuf,
) -> Result<ExporterFuture, BuildError> {
let listener = UnixListener::bind(listen_path)
.and_then(|listener| Ok(listener))
.map_err(|e| BuildError::FailedToCreateHTTPListener(e.to_string()))?;

let exporter = UnixListeningExporter { handle };

Ok(Box::pin(async move { exporter.serve(listener).await }))
}

#[cfg(test)]
mod tests {
use crate::exporter::uds_listener::UnixListeningExporter;

#[test]
fn new_forbidden_response_always_succeeds() {
UnixListeningExporter::new_forbidden_response(); // doesn't panic
}
}

0 comments on commit c4c4848

Please sign in to comment.