Skip to content

Commit

Permalink
dekaf: Track and report additional metrics, and improve connection st…
Browse files Browse the repository at this point in the history
…ability/freshness

I observed some concerning behavior when switching `dekaf.estuary.dev` to advertise `dekaf.estuary-data.com`, namely all API requests to the upstream MSK broker were returning with error codes. A restart of Dekaf "fixed" the problem, and I ended up realizing that those connections were probably sitting open and idle for days.

The theory is that even though we set TCP keepalive, the connections had gotten into a bad state. So I introduced a couple different mitigations:

* All idle connections (that is, connections that were opened but have since been returned to the pool) get closed after a short period of time (60s in this case)
* All idle connections that were opened over 30m ago get closed. The stateless usage pattern of connections should mean that this results in no connection lasting longer than roughly 30m.
* A connection is "exercised" every time it's returned to the pool to check whether it's still open and able to communicate with the broker.

New metrics:
* `pool_size` (by broker): The number of connections open to this broker
* `pool_available` (by broker): The number of open, free connections available for use
* `pool_waiting` (by broker): The number of requests blocked waiting for a connection to become available
* `pool_connection_avg_age` (by broker): the average age of a pool connection to this broker
* `pool_connection_avg_idle` (by broker): the average time a connection to this broker has been idle
* `api_call_time` (by api method): A histogram of the time it takes to serve a kafka protocol request
  • Loading branch information
jshearer committed Sep 30, 2024
1 parent af95f7b commit 42e173f
Show file tree
Hide file tree
Showing 6 changed files with 92 additions and 32 deletions.
23 changes: 13 additions & 10 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ tokio = { version = "1", features = [
"sync",
"time",
] }
tokio-util = { version = "0.7", features = ["io", "compat"] }
tokio-util = { version = "0.7", features = ["io", "compat", "rt"] }
tonic = { version = "0.12", features = ["tls", "tls-roots"] }
hyper-util = "0.1"
tower = { version = "0.5", features = ["util"] }
Expand Down
87 changes: 70 additions & 17 deletions crates/dekaf/src/api_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@ use kafka_protocol::{
protocol::{self, Decodable, Encodable, Request},
};
use rsasl::{config::SASLConfig, mechname::Mechname, prelude::SASLClient};
use std::{boxed::Box, collections::HashMap, fmt::Debug, io, time::Duration};
use std::{boxed::Box, cell::Cell, collections::HashMap, fmt::Debug, io, time::Duration};
use std::{io::BufWriter, pin::Pin, sync::Arc};
use tokio::sync::RwLock;
use tokio_rustls::rustls;
use tokio_util::codec;
use tokio_util::{codec, task::AbortOnDropHandle};
use tracing::instrument;
use url::Url;

Expand Down Expand Up @@ -239,6 +239,25 @@ async fn sasl_auth(
Ok(())
}

async fn get_versions(
conn: &mut BoxedKafkaConnection,
) -> anyhow::Result<messages::ApiVersionsResponse> {
let versions = send_request(
conn,
messages::ApiVersionsRequest::default()
.with_client_software_name(protocol::StrBytes::from_static_str("Dekaf"))
.with_client_software_version(protocol::StrBytes::from_static_str("1.0")),
None,
)
.await?;
match versions.error_code.err() {
None => {}
Some(e) => bail!("Error connecting to broker: {e}"),
};

Ok(versions)
}

#[derive(Clone)]
struct KafkaConnectionParams {
broker_url: String,
Expand All @@ -260,10 +279,16 @@ impl deadpool::managed::Manager for KafkaConnectionParams {

async fn recycle(
&self,
_: &mut BoxedKafkaConnection,
conn: &mut BoxedKafkaConnection,
_: &deadpool::managed::Metrics,
) -> deadpool::managed::RecycleResult<anyhow::Error> {
Ok(())
// Other than auth, Kafka connections themselves are stateless
// so the only thing we need to do when recycling a connection
// is to confirm that it's still connected.
get_versions(conn).await.map(|_| ()).map_err(|e| {
tracing::warn!(err=?e, broker=self.broker_url, "Connection failed healthcheck");
deadpool::managed::RecycleError::Backend(e)
})
}
}

Expand All @@ -285,6 +310,7 @@ pub struct KafkaApiClient {
// and should be propagated to newly created clients
// when a new broker address is encounted.
clients: Arc<RwLock<HashMap<String, KafkaApiClient>>>,
_pool_connection_reaper: Arc<AbortOnDropHandle<()>>,
}

impl KafkaApiClient {
Expand Down Expand Up @@ -323,6 +349,44 @@ impl KafkaApiClient {
})
.build()?;

// Close idle connections, and any free connection older than 30m.
// It seems that after running for a while, connections can get into
// a broken state where every response returns an error. This, plus
// the healthcheck when recycling a connection solves that problem.
let reap_interval = Duration::from_secs(30);
let max_age = Duration::from_secs(60 * 30);
let max_idle = Duration::from_secs(60);
let reaper = tokio_util::task::AbortOnDropHandle::new(tokio::spawn({
let pool = pool.clone();
let broker_url = broker_url.to_string();
async move {
loop {
let pool_state = pool.status();

metrics::gauge!("pool_size", "upstream_broker" => broker_url.to_owned())
.set(pool_state.size as f64);
metrics::gauge!("pool_available", "upstream_broker" => broker_url.to_owned())
.set(pool_state.available as f64);
metrics::gauge!("pool_waiting", "upstream_broker" => broker_url.to_owned())
.set(pool_state.waiting as f64);

let age_sum = Cell::new(Duration::ZERO);
let idle_sum = Cell::new(Duration::ZERO);
let connections = Cell::new(0);
tokio::time::sleep(reap_interval).await;
pool.retain(|_, metrics: deadpool::managed::Metrics| {
age_sum.set(age_sum.get() + metrics.age());
idle_sum.set(idle_sum.get() + metrics.last_used());
connections.set(connections.get() + 1);
metrics.age() < max_age && metrics.last_used() < max_idle
});

metrics::gauge!("pool_connection_avg_age", "upstream_broker" => broker_url.to_owned()).set(if connections.get() > 0 { age_sum.get()/connections.get() } else { Duration::ZERO });
metrics::gauge!("pool_connection_avg_idle", "upstream_broker" => broker_url.to_owned()).set(if connections.get() > 0 { idle_sum.get()/connections.get() } else { Duration::ZERO });
}
}
}));

let mut conn = match pool.get().await {
Ok(c) => c,
Err(deadpool::managed::PoolError::Backend(e)) => return Err(e),
Expand All @@ -331,19 +395,7 @@ impl KafkaApiClient {
}
};

let versions = send_request(
conn.as_mut(),
messages::ApiVersionsRequest::default()
.with_client_software_name(protocol::StrBytes::from_static_str("Dekaf"))
.with_client_software_version(protocol::StrBytes::from_static_str("1.0")),
None,
)
.await?;
match versions.error_code.err() {
None => {}
Some(e) => bail!("Error connecting to broker: {e}"),
};
tracing::debug!(versions=?versions,"Got supported versions");
let versions = get_versions(conn.as_mut()).await?;
drop(conn);

Ok(Self {
Expand All @@ -352,6 +404,7 @@ impl KafkaApiClient {
sasl_config: sasl_config,
versions,
clients: Arc::new(RwLock::new(HashMap::new())),
_pool_connection_reaper: Arc::new(reaper),
})
}

Expand Down
10 changes: 7 additions & 3 deletions crates/dekaf/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ use read::Read;
mod session;
pub use session::Session;

pub mod metrics_server;
pub mod registry;
pub mod metrics;

mod api_client;
pub use api_client::KafkaApiClient;
Expand All @@ -26,6 +26,7 @@ use itertools::Itertools;
use percent_encoding::{percent_decode_str, utf8_percent_encode};
use serde::{Deserialize, Serialize};
use serde_json::de;
use std::time::SystemTime;

pub struct App {
/// Anonymous API client for the Estuary control plane.
Expand Down Expand Up @@ -178,7 +179,7 @@ async fn handle_api(
frame: bytes::BytesMut,
out: &mut bytes::BytesMut,
) -> anyhow::Result<()> {
tracing::trace!("Handling request");
let start_time = SystemTime::now();
use messages::*;
let ret = match api_key {
ApiKey::ApiVersionsKey => {
Expand Down Expand Up @@ -321,7 +322,10 @@ async fn handle_api(
*/
_ => anyhow::bail!("unsupported request type {api_key:?}"),
};
tracing::trace!("Response sent");
let handle_duration = SystemTime::now().duration_since(start_time)?;

metrics::histogram!("api_call_time", "api_key" => format!("{:?}",api_key))
.record(handle_duration.as_millis() as f64);

ret
}
Expand Down
2 changes: 1 addition & 1 deletion crates/dekaf/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ async fn main() -> anyhow::Result<()> {
.await
.context("failed to bind server port")?;

let metrics_router = dekaf::metrics::build_router(app.clone());
let metrics_router = dekaf::metrics_server::build_router(app.clone());
let metrics_server_task =
axum_server::bind(metrics_addr).serve(metrics_router.into_make_service());
tokio::spawn(async move { metrics_server_task.await.unwrap() });
Expand Down
File renamed without changes.

0 comments on commit 42e173f

Please sign in to comment.