diff --git a/Cargo.lock b/Cargo.lock index 3664200f89..38384916d6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3437,7 +3437,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4979f22fdb869068da03c9f7528f8297c6fd2606bc3a4affe42e6a823fdb8da4" dependencies = [ "cfg-if 1.0.0", - "windows-targets 0.52.6", + "windows-targets 0.48.5", ] [[package]] @@ -3566,18 +3566,19 @@ checksum = "a7a70ba024b9dc04c27ea2f0c0548feb474ec5c54bba33a7f72f873a39d07b24" [[package]] name = "lz4" -version = "1.27.0" +version = "1.26.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a231296ca742e418c43660cb68e082486ff2538e8db432bc818580f3965025ed" +checksum = "958b4caa893816eea05507c20cfe47574a43d9a697138a7872990bba8a0ece68" dependencies = [ + "libc", "lz4-sys", ] [[package]] name = "lz4-sys" -version = "1.11.0" +version = "1.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fcb44a01837a858d47e5a630d2ccf304c8efcc4b83b8f9f75b7a9ee4fcc6e57d" +checksum = "109de74d5d2353660401699a4174a4ff23fcc649caf553df71933c7fb45ad868" dependencies = [ "cc", "libc", @@ -6330,9 +6331,9 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.39.2" +version = "1.40.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "daa4fb1bc778bd6f04cbfc4bb2d06a7396a8f299dc33ea1900cedaa316f467b1" +checksum = "e2b070231665d27ad9ec9b8df639893f46727666c6767db40317fbe920a5d998" dependencies = [ "backtrace", "bytes", @@ -6412,14 +6413,16 @@ dependencies = [ [[package]] name = "tokio-util" -version = "0.7.11" +version = "0.7.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9cf6b47b3771c49ac75ad09a6162f53ad4b8088b76ac60e8ec1455b31a189fe1" +checksum = "61e7c3654c13bcd040d4a03abee2c75b1d14a37b423cf5a813ceae1cc903ec6a" dependencies = [ "bytes", "futures-core", "futures-io", "futures-sink", + "futures-util", + "hashbrown 0.14.5", "pin-project-lite", "tokio", ] @@ -6666,7 +6669,7 @@ version = "1.6.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "97fee6b57c6a41524a810daee9286c02d7752c4253064d0b05472833a438f675" dependencies = [ - "cfg-if 1.0.0", + "cfg-if 0.1.10", "static_assertions", ] diff --git a/Cargo.toml b/Cargo.toml index ea190b75a8..61dcd1dfe7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -175,7 +175,7 @@ tokio = { version = "1", features = [ "sync", "time", ] } -tokio-util = { version = "0.7", features = ["io", "compat"] } +tokio-util = { version = "0.7", features = ["io", "compat", "rt"] } tonic = { version = "0.12", features = ["tls", "tls-roots"] } hyper-util = "0.1" tower = { version = "0.5", features = ["util"] } diff --git a/crates/dekaf/src/api_client.rs b/crates/dekaf/src/api_client.rs index 80636fd80b..bab79307a6 100644 --- a/crates/dekaf/src/api_client.rs +++ b/crates/dekaf/src/api_client.rs @@ -7,11 +7,11 @@ use kafka_protocol::{ protocol::{self, Decodable, Encodable, Request}, }; use rsasl::{config::SASLConfig, mechname::Mechname, prelude::SASLClient}; -use std::{boxed::Box, collections::HashMap, fmt::Debug, io, time::Duration}; +use std::{boxed::Box, cell::Cell, collections::HashMap, fmt::Debug, io, time::Duration}; use std::{io::BufWriter, pin::Pin, sync::Arc}; use tokio::sync::RwLock; use tokio_rustls::rustls; -use tokio_util::codec; +use tokio_util::{codec, task::AbortOnDropHandle}; use tracing::instrument; use url::Url; @@ -239,6 +239,25 @@ async fn sasl_auth( Ok(()) } +async fn get_versions( + conn: &mut BoxedKafkaConnection, +) -> anyhow::Result { + let versions = send_request( + conn, + messages::ApiVersionsRequest::default() + .with_client_software_name(protocol::StrBytes::from_static_str("Dekaf")) + .with_client_software_version(protocol::StrBytes::from_static_str("1.0")), + None, + ) + .await?; + match versions.error_code.err() { + None => {} + Some(e) => bail!("Error connecting to broker: {e}"), + }; + + Ok(versions) +} + #[derive(Clone)] struct KafkaConnectionParams { broker_url: String, @@ -260,10 +279,16 @@ impl deadpool::managed::Manager for KafkaConnectionParams { async fn recycle( &self, - _: &mut BoxedKafkaConnection, + conn: &mut BoxedKafkaConnection, _: &deadpool::managed::Metrics, ) -> deadpool::managed::RecycleResult { - Ok(()) + // Other than auth, Kafka connections themselves are stateless + // so the only thing we need to do when recycling a connection + // is to confirm that it's still connected. + get_versions(conn).await.map(|_| ()).map_err(|e| { + tracing::warn!(err=?e, broker=self.broker_url, "Connection failed healthcheck"); + deadpool::managed::RecycleError::Backend(e) + }) } } @@ -285,6 +310,7 @@ pub struct KafkaApiClient { // and should be propagated to newly created clients // when a new broker address is encounted. clients: Arc>>, + _pool_connection_reaper: Arc>, } impl KafkaApiClient { @@ -323,6 +349,44 @@ impl KafkaApiClient { }) .build()?; + // Close idle connections, and any free connection older than 30m. + // It seems that after running for a while, connections can get into + // a broken state where every response returns an error. This, plus + // the healthcheck when recycling a connection solves that problem. + let reap_interval = Duration::from_secs(30); + let max_age = Duration::from_secs(60 * 30); + let max_idle = Duration::from_secs(60); + let reaper = tokio_util::task::AbortOnDropHandle::new(tokio::spawn({ + let pool = pool.clone(); + let broker_url = broker_url.to_string(); + async move { + loop { + let pool_state = pool.status(); + + metrics::gauge!("pool_size", "upstream_broker" => broker_url.to_owned()) + .set(pool_state.size as f64); + metrics::gauge!("pool_available", "upstream_broker" => broker_url.to_owned()) + .set(pool_state.available as f64); + metrics::gauge!("pool_waiting", "upstream_broker" => broker_url.to_owned()) + .set(pool_state.waiting as f64); + + let age_sum = Cell::new(Duration::ZERO); + let idle_sum = Cell::new(Duration::ZERO); + let connections = Cell::new(0); + tokio::time::sleep(reap_interval).await; + pool.retain(|_, metrics: deadpool::managed::Metrics| { + age_sum.set(age_sum.get() + metrics.age()); + idle_sum.set(idle_sum.get() + metrics.last_used()); + connections.set(connections.get() + 1); + metrics.age() < max_age && metrics.last_used() < max_idle + }); + + metrics::gauge!("pool_connection_avg_age", "upstream_broker" => broker_url.to_owned()).set(if connections.get() > 0 { age_sum.get()/connections.get() } else { Duration::ZERO }); + metrics::gauge!("pool_connection_avg_idle", "upstream_broker" => broker_url.to_owned()).set(if connections.get() > 0 { idle_sum.get()/connections.get() } else { Duration::ZERO }); + } + } + })); + let mut conn = match pool.get().await { Ok(c) => c, Err(deadpool::managed::PoolError::Backend(e)) => return Err(e), @@ -331,19 +395,7 @@ impl KafkaApiClient { } }; - let versions = send_request( - conn.as_mut(), - messages::ApiVersionsRequest::default() - .with_client_software_name(protocol::StrBytes::from_static_str("Dekaf")) - .with_client_software_version(protocol::StrBytes::from_static_str("1.0")), - None, - ) - .await?; - match versions.error_code.err() { - None => {} - Some(e) => bail!("Error connecting to broker: {e}"), - }; - tracing::debug!(versions=?versions,"Got supported versions"); + let versions = get_versions(conn.as_mut()).await?; drop(conn); Ok(Self { @@ -352,6 +404,7 @@ impl KafkaApiClient { sasl_config: sasl_config, versions, clients: Arc::new(RwLock::new(HashMap::new())), + _pool_connection_reaper: Arc::new(reaper), }) } diff --git a/crates/dekaf/src/lib.rs b/crates/dekaf/src/lib.rs index 66be12d691..538b6da59a 100644 --- a/crates/dekaf/src/lib.rs +++ b/crates/dekaf/src/lib.rs @@ -15,8 +15,8 @@ use read::Read; mod session; pub use session::Session; +pub mod metrics_server; pub mod registry; -pub mod metrics; mod api_client; pub use api_client::KafkaApiClient; @@ -26,6 +26,7 @@ use itertools::Itertools; use percent_encoding::{percent_decode_str, utf8_percent_encode}; use serde::{Deserialize, Serialize}; use serde_json::de; +use std::time::SystemTime; pub struct App { /// Anonymous API client for the Estuary control plane. @@ -178,7 +179,7 @@ async fn handle_api( frame: bytes::BytesMut, out: &mut bytes::BytesMut, ) -> anyhow::Result<()> { - tracing::trace!("Handling request"); + let start_time = SystemTime::now(); use messages::*; let ret = match api_key { ApiKey::ApiVersionsKey => { @@ -321,7 +322,10 @@ async fn handle_api( */ _ => anyhow::bail!("unsupported request type {api_key:?}"), }; - tracing::trace!("Response sent"); + let handle_duration = SystemTime::now().duration_since(start_time)?; + + metrics::histogram!("api_call_time", "api_key" => format!("{:?}",api_key)) + .record(handle_duration.as_millis() as f64); ret } diff --git a/crates/dekaf/src/main.rs b/crates/dekaf/src/main.rs index 8cb79219af..19286dd661 100644 --- a/crates/dekaf/src/main.rs +++ b/crates/dekaf/src/main.rs @@ -153,7 +153,7 @@ async fn main() -> anyhow::Result<()> { .await .context("failed to bind server port")?; - let metrics_router = dekaf::metrics::build_router(app.clone()); + let metrics_router = dekaf::metrics_server::build_router(app.clone()); let metrics_server_task = axum_server::bind(metrics_addr).serve(metrics_router.into_make_service()); tokio::spawn(async move { metrics_server_task.await.unwrap() }); diff --git a/crates/dekaf/src/metrics.rs b/crates/dekaf/src/metrics_server.rs similarity index 100% rename from crates/dekaf/src/metrics.rs rename to crates/dekaf/src/metrics_server.rs