diff --git a/crates/dekaf/src/session.rs b/crates/dekaf/src/session.rs index f2d483d407..bc32e56ad9 100644 --- a/crates/dekaf/src/session.rs +++ b/crates/dekaf/src/session.rs @@ -847,6 +847,14 @@ impl Session { .connect_to_group_coordinator(req.group_id.as_str()) .await?; + let flow_client = self + .auth + .as_mut() + .ok_or(anyhow::anyhow!("Session not authenticated"))? + .authenticated_client() + .await? + .clone(); + client .ensure_topics( mutated_req @@ -861,6 +869,42 @@ impl Session { for topic in resp.topics.iter_mut() { topic.name = self.decrypt_topic_name(topic.name.to_owned()); + + let collection_partitions = Collection::new(&flow_client, topic.name.as_str()) + .await? + .context(format!("unable to look up partitions for {:?}", topic.name))? + .partitions; + + for partition in &topic.partitions { + if let Some(error) = partition.error_code.err() { + tracing::warn!(topic=?topic.name,partition=partition.partition_index,?error,"Got error from upstream Kafka when trying to commit offsets"); + } else { + let journal_name = collection_partitions + .get(partition.partition_index as usize) + .context(format!( + "unable to find partition {} in collection {:?}", + partition.partition_index, topic.name + ))? + .spec + .name + .to_owned(); + + let committed_offset = req + .topics + .iter() + .find(|req_topic| req_topic.name == topic.name) + .context(format!("unable to find topic in request {:?}", topic.name))? + .partitions + .get(partition.partition_index as usize) + .context(format!( + "unable to find partition {}", + partition.partition_index + ))? + .committed_offset; + + metrics::gauge!("dekaf_committed_offset", "group_id"=>req.group_id.to_string(),"journal_name"=>journal_name).set(committed_offset as f64); + } + } } Ok(resp)