Skip to content

Commit

Permalink
dekaf: respond with correct high_water_mark/last_stable_offset when s…
Browse files Browse the repository at this point in the history
…erving data preview responses
  • Loading branch information
jshearer committed Oct 17, 2024
1 parent d5ddea2 commit 2b5d961
Showing 1 changed file with 37 additions and 22 deletions.
59 changes: 37 additions & 22 deletions crates/dekaf/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -569,30 +569,45 @@ impl Session {
hit_timeout = true
}

partition_responses.push(
PartitionData::default()
.with_partition_index(partition_request.partition)
// `kafka-protocol` encodes None here using a length of -1, but librdkafka client library
// complains with: `Protocol parse failure for Fetch v11 ... invalid MessageSetSize -1`
// An empty Bytes will get encoded with a length of 0, which works fine.
.with_records(batch.or(Some(Bytes::new())).to_owned())
.with_high_watermark(pending.last_write_head) // Map to kafka cursor.
.with_last_stable_offset(pending.last_write_head),
);

if read.rewrite_offsets_from.is_none() {
pending.offset = read.offset;
pending.last_write_head = read.last_write_head;
pending.handle =
tokio_util::task::AbortOnDropHandle::new(tokio::spawn(read.next_batch(
crate::read::ReadTarget::Bytes(
partition_request.partition_max_bytes as usize,
let mut partition_data = PartitionData::default()
.with_partition_index(partition_request.partition)
// `kafka-protocol` encodes None here using a length of -1, but librdkafka client library
// complains with: `Protocol parse failure for Fetch v11 ... invalid MessageSetSize -1`
// An empty Bytes will get encoded with a length of 0, which works fine.
.with_records(batch.or(Some(Bytes::new())).to_owned());

match &self.data_preview_state {
SessionDataPreviewState::Unknown => {
unreachable!("Must have already determined data-preview status of session")
}
SessionDataPreviewState::NotDataPreview => {
partition_data = partition_data
.with_high_watermark(pending.last_write_head) // Map to kafka cursor.
.with_last_stable_offset(pending.last_write_head);

pending.offset = read.offset;
pending.last_write_head = read.last_write_head;
pending.handle = tokio_util::task::AbortOnDropHandle::new(tokio::spawn(
read.next_batch(
crate::read::ReadTarget::Bytes(
partition_request.partition_max_bytes as usize,
),
std::time::Instant::now() + timeout,
),
std::time::Instant::now() + timeout,
)));
} else {
self.reads.remove(&key);
));
}
SessionDataPreviewState::DataPreview(data_preview_states) => {
let data_preview_state = data_preview_states
.get(&key)
.expect("should be able to find data preview state by this point");
partition_data = partition_data
.with_high_watermark(data_preview_state.offset) // Map to kafka cursor.
.with_last_stable_offset(data_preview_state.offset);
self.reads.remove(&key);
}
}

partition_responses.push(partition_data);
}

topic_responses.push(
Expand Down

0 comments on commit 2b5d961

Please sign in to comment.