Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dekaf: Track latest committed offset by group and journal #1715

Merged
merged 1 commit into from
Oct 17, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 44 additions & 0 deletions crates/dekaf/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -847,6 +847,14 @@ impl Session {
.connect_to_group_coordinator(req.group_id.as_str())
.await?;

let flow_client = self
.auth
.as_mut()
.ok_or(anyhow::anyhow!("Session not authenticated"))?
.authenticated_client()
.await?
.clone();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: is this clone needed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AFAICT, it's needed either here or inside authenticated_client(). authenticated_client()'s signature desugars to

pub async fn authenticated_client<'a>(&'a mut self) -> anyhow::Result<&'a flow_client::Client>

AFAICT, because the returned reference &'a flow_client::Client, has the same lifetime as the mutable reference to self, it too represents a mutable borrow. This causes problems later on in offset_commit where we call self.decrypt_topic_name which takes a &self, which is disallowed because the flow_client holds a mut reference with lifetime >= &mut self, and we get the error

cannot borrow `*self` as immutable because it is also borrowed as mutable
immutable borrow occurs here // call to self.decrypt_topic_name()
session.rs(850, 27): mutable borrow occurs here // call to self.auth.authenticated_client()
session.rs(873, 57): mutable borrow later used here // call to Collection::new(&flow_client)

I could change authenticated_client() to return anyhow::Result<flow_client::Client>, but that would just mean .clone()ing the client in there instead. Maybe that's better ergonomically?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope, this is good. I just didn't recall that it already returned a reference. IMO it's better to let the caller decide whether to take a clone or use a reference, rather than taking that choice from them.


client
.ensure_topics(
mutated_req
Expand All @@ -861,6 +869,42 @@ impl Session {

for topic in resp.topics.iter_mut() {
topic.name = self.decrypt_topic_name(topic.name.to_owned());

let collection_partitions = Collection::new(&flow_client, topic.name.as_str())
.await?
.context(format!("unable to look up partitions for {:?}", topic.name))?
.partitions;

for partition in &topic.partitions {
if let Some(error) = partition.error_code.err() {
tracing::warn!(topic=?topic.name,partition=partition.partition_index,?error,"Got error from upstream Kafka when trying to commit offsets");
} else {
let journal_name = collection_partitions
.get(partition.partition_index as usize)
.context(format!(
"unable to find partition {} in collection {:?}",
partition.partition_index, topic.name
))?
.spec
.name
.to_owned();

let committed_offset = req
.topics
.iter()
.find(|req_topic| req_topic.name == topic.name)
.context(format!("unable to find topic in request {:?}", topic.name))?
.partitions
.get(partition.partition_index as usize)
.context(format!(
"unable to find partition {}",
partition.partition_index
))?
.committed_offset;

metrics::gauge!("dekaf_committed_offset", "group_id"=>req.group_id.to_string(),"journal_name"=>journal_name).set(committed_offset as f64);
}
}
}

Ok(resp)
Expand Down
Loading