Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dekaf: Track latest committed offset by group and journal #1715

Merged
merged 1 commit into from
Oct 17, 2024

Conversation

jshearer
Copy link
Contributor

@jshearer jshearer commented Oct 17, 2024

Looks like this:

# TYPE dekaf_committed_offset gauge
dekaf_committed_offset{group_id="joseph-dekaf-testing-1100-1",journal_name="demo/bots-humans-edits/pivot=00"} 73173569244
dekaf_committed_offset{group_id="joseph-dekaf-testing-1100-0",journal_name="demo/wikipedia/recentchange/pivot=00"} 2036875838205

In order to correlate this with the journal:gazette_write_head:max metric, we need the exact journal name. Unfortunately, Kafka only represents topic partitions by their index, so in order to expose a useful metric we have to go and look up the exact journal name given the collection name and partition index. Fortunately, OffsetCommit is a fairly infrequent API call, and is not expected to respond particularly quickly, so I don't see this as a huge issue. If it is, we could cache this mapping from the various places it's fetched elsewhere.


This change is Reviewable

Looks like this:

```
dekaf_committed_offset{group_id="joseph-dekaf-testing-1100-1",journal_name="demo/bots-humans-edits/pivot=00"} 73173569244
dekaf_committed_offset{group_id="joseph-dekaf-testing-1100-0",journal_name="demo/wikipedia/recentchange/pivot=00"} 2036875838205
```
Copy link
Member

@jgraettinger jgraettinger left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

.ok_or(anyhow::anyhow!("Session not authenticated"))?
.authenticated_client()
.await?
.clone();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: is this clone needed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AFAICT, it's needed either here or inside authenticated_client(). authenticated_client()'s signature desugars to

pub async fn authenticated_client<'a>(&'a mut self) -> anyhow::Result<&'a flow_client::Client>

AFAICT, because the returned reference &'a flow_client::Client, has the same lifetime as the mutable reference to self, it too represents a mutable borrow. This causes problems later on in offset_commit where we call self.decrypt_topic_name which takes a &self, which is disallowed because the flow_client holds a mut reference with lifetime >= &mut self, and we get the error

cannot borrow `*self` as immutable because it is also borrowed as mutable
immutable borrow occurs here // call to self.decrypt_topic_name()
session.rs(850, 27): mutable borrow occurs here // call to self.auth.authenticated_client()
session.rs(873, 57): mutable borrow later used here // call to Collection::new(&flow_client)

I could change authenticated_client() to return anyhow::Result<flow_client::Client>, but that would just mean .clone()ing the client in there instead. Maybe that's better ergonomically?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope, this is good. I just didn't recall that it already returned a reference. IMO it's better to let the caller decide whether to take a clone or use a reference, rather than taking that choice from them.

@jshearer jshearer merged commit 8c398b3 into master Oct 17, 2024
4 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants