diff --git a/Cargo.lock b/Cargo.lock index 5bad5e497c..f39624b317 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -124,6 +124,7 @@ dependencies = [ "jsonwebtoken", "labels", "lazy_static", + "log", "md5", "models", "ops", diff --git a/Cargo.toml b/Cargo.toml index f7aa5b2dc6..c1b96f0b35 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -81,6 +81,7 @@ librocksdb-sys = { version = "0.16.0", default-features = false, features = [ "snappy", "rtti", ] } +log = "0.4" # only used to configure logging of dependencies lz4 = "1.24.0" lz4_flex = "0.11.0" mime = "0.3" diff --git a/crates/agent/Cargo.toml b/crates/agent/Cargo.toml index dd7e7ec103..3013d36cf4 100644 --- a/crates/agent/Cargo.toml +++ b/crates/agent/Cargo.toml @@ -46,6 +46,7 @@ humantime-serde = { workspace = true } itertools = { workspace = true } jsonwebtoken = { workspace = true } lazy_static = { workspace = true } +log = { workspace = true } rand = { workspace = true } regex = { workspace = true } reqwest = { workspace = true } diff --git a/crates/agent/src/main.rs b/crates/agent/src/main.rs index aef6b40ef5..6b03a3852e 100644 --- a/crates/agent/src/main.rs +++ b/crates/agent/src/main.rs @@ -7,6 +7,7 @@ use clap::Parser; use derivative::Derivative; use futures::FutureExt; use rand::Rng; +use sqlx::{ConnectOptions, Connection}; /// Agent is a daemon which runs server-side tasks of the Flow control-plane. #[derive(Derivative, Parser)] @@ -96,12 +97,15 @@ async fn async_main(args: Args) -> Result<(), anyhow::Error> { .into_string() .expect("os path must be utf8"); + // The HOSTNAME variable will be set to the name of the pod in k8s + let application_name = std::env::var("HOSTNAME").unwrap_or_else(|_| "agent".to_string()); let mut pg_options = args .database_url .as_str() .parse::() .context("parsing database URL")? - .application_name("agent"); + .application_name(&application_name); + pg_options.log_slow_statements(log::LevelFilter::Warn, std::time::Duration::from_secs(10)); // If a database CA was provided, require that we use TLS with full cert verification. if let Some(ca) = &args.database_ca { @@ -113,10 +117,42 @@ async fn async_main(args: Args) -> Result<(), anyhow::Error> { pg_options = pg_options.ssl_mode(sqlx::postgres::PgSslMode::Prefer); } - let pg_pool = sqlx::postgres::PgPool::connect_with(pg_options) + let pg_pool = sqlx::postgres::PgPoolOptions::new() + .acquire_timeout(std::time::Duration::from_secs(5)) + .after_release(|conn, meta| { + let fut = async move { + let r =tokio::time::timeout(std::time::Duration::from_secs(5), async { + conn.ping() + }); + if let Err(err) = r.await { + tracing::warn!(error = ?err, conn_meta = ?meta, "connection was put back in a bad state, removing from the pool"); + Ok(false) + } else { + Ok(true) // connection is good + } + }; + fut.boxed() + }) + .connect_with(pg_options) .await .context("connecting to database")?; + // Periodically log information about the connection pool to aid in debugging. + let pool_copy = pg_pool.clone(); + tokio::spawn(async move { + let mut interval = tokio::time::interval(std::time::Duration::from_secs(120)); + loop { + interval.tick().await; + let total_connections = pool_copy.size(); + let idle_connections = pool_copy.num_idle(); + tracing::info!( + total_connections, + idle_connections, + "db connection pool stats" + ); + } + }); + let system_user_id = agent_sql::get_user_id_for_email(&args.accounts_email, &pg_pool) .await .context("querying for agent user id")?;