Skip to content

Commit

Permalink
dekaf: Introduce IDLE_SESSION_TIMEOUT_MS to allow configuring the i…
Browse files Browse the repository at this point in the history
…dle session timeout
  • Loading branch information
jshearer committed Nov 13, 2024
1 parent 9e9899f commit 5ef5d67
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 5 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/dekaf/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ axum = { workspace = true }
axum-extra = { workspace = true }
axum-server = { workspace = true }
base64 = { workspace = true }
humantime = { workspace = true }
bumpalo = { workspace = true }
bytes = { workspace = true }
clap = { workspace = true }
Expand Down
13 changes: 8 additions & 5 deletions crates/dekaf/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,10 @@ pub struct Cli {
#[arg(long, env = "BROKER_CONNECTION_POOL_SIZE", default_value = "20")]
broker_connection_pool_size: usize,

/// How long to wait for a message before closing an idle connection
#[arg(long, env = "IDLE_SESSION_TIMEOUT", value_parser = humantime::parse_duration, default_value = "30s")]
idle_session_timeout: std::time::Duration,

#[command(flatten)]
tls: Option<TlsArgs>,
}
Expand Down Expand Up @@ -219,7 +223,7 @@ async fn main() -> anyhow::Result<()> {
continue
};

tokio::spawn(serve(Session::new(app.clone(), cli.encryption_secret.to_owned()), socket, addr, stop.clone()));
tokio::spawn(serve(Session::new(app.clone(), cli.encryption_secret.to_owned()), socket, addr, cli.idle_session_timeout_ms, stop.clone()));
}
_ = &mut stop => break,
}
Expand All @@ -240,7 +244,7 @@ async fn main() -> anyhow::Result<()> {
};
socket.set_nodelay(true)?;

tokio::spawn(serve(Session::new(app.clone(), cli.encryption_secret.to_owned()), socket, addr, stop.clone()));
tokio::spawn(serve(Session::new(app.clone(), cli.encryption_secret.to_owned()), socket, addr, cli.idle_session_timeout_ms, stop.clone()));
}
_ = &mut stop => break,
}
Expand All @@ -255,6 +259,7 @@ async fn serve<S>(
mut session: Session,
socket: S,
addr: std::net::SocketAddr,
idle_timeout: std::time::Duration,
_stop: impl futures::Future<Output = ()>, // TODO(johnny): stop.
) -> anyhow::Result<()>
where
Expand All @@ -278,7 +283,7 @@ where

let result = async {
loop {
let Some(frame) = tokio::time::timeout(SESSION_TIMEOUT, r.try_next())
let Some(frame) = tokio::time::timeout(idle_timeout, r.try_next())
.await
.context("timeout waiting for next session request")?
.context("failed to read next session request")?
Expand Down Expand Up @@ -336,5 +341,3 @@ fn validate_certificate_name(
}
return Ok(false);
}

const SESSION_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(30);

0 comments on commit 5ef5d67

Please sign in to comment.