diff --git a/crates/dekaf/src/session.rs b/crates/dekaf/src/session.rs index bd76504baf..9d6e816560 100644 --- a/crates/dekaf/src/session.rs +++ b/crates/dekaf/src/session.rs @@ -41,7 +41,7 @@ enum SessionDataPreviewState { pub struct Session { app: Arc, - reads: HashMap<(TopicName, i32), PendingRead>, + reads: HashMap<(TopicName, i32), (PendingRead, std::time::Instant)>, secret: String, auth: Option, data_preview_state: SessionDataPreviewState, @@ -380,7 +380,7 @@ impl Session { .. } = request; - let client = self + let mut client = self .auth .as_mut() .ok_or(anyhow::anyhow!("Session not authenticated"))? @@ -454,16 +454,34 @@ impl Session { } }; - if matches!(self.reads.get(&key), Some(pending) if pending.offset == fetch_offset) { - metrics::counter!( - "dekaf_fetch_requests", - "topic_name" => key.0.to_string(), - "partition_index" => key.1.to_string(), - "state" => "read_pending" - ) - .increment(1); - continue; // Common case: fetch is at the pending offset. + match self.reads.get(&key) { + Some((_, started_at)) + if started_at.elapsed() > std::time::Duration::from_secs(60 * 5) => + { + metrics::counter!( + "dekaf_fetch_requests", + "topic_name" => key.0.to_string(), + "partition_index" => key.1.to_string(), + "state" => "read_expired" + ) + .increment(1); + tracing::debug!(lifetime=?started_at.elapsed(), topic_name=?key.0,partition_index=?key.1, "Restarting expired Read"); + self.reads.remove(&key); + client = client.with_fresh_gazette_client(); + } + Some(_) => { + metrics::counter!( + "dekaf_fetch_requests", + "topic_name" => key.0.to_string(), + "partition_index" => key.1.to_string(), + "state" => "read_pending" + ) + .increment(1); + continue; // Common case: fetch is at the pending offset. + } + _ => {} } + let Some(collection) = Collection::new(&client, &key.0).await? else { metrics::counter!( "dekaf_fetch_requests", @@ -566,12 +584,16 @@ impl Session { "started read", ); - if let Some(old) = self.reads.insert(key.clone(), pending) { + if let Some((old, started_at)) = self + .reads + .insert(key.clone(), (pending, std::time::Instant::now())) + { tracing::warn!( topic = topic_request.topic.as_str(), partition = partition_request.partition, old_offset = old.offset, new_offset = fetch_offset, + read_lifetime = ?started_at.elapsed(), "discarding pending read due to offset jump", ); } @@ -588,7 +610,7 @@ impl Session { for partition_request in &topic_request.partitions { key.1 = partition_request.partition; - let Some(pending) = self.reads.get_mut(&key) else { + let Some((pending, _)) = self.reads.get_mut(&key) else { partition_responses.push( PartitionData::default() .with_partition_index(partition_request.partition)