diff --git a/crates/dekaf/src/session.rs b/crates/dekaf/src/session.rs index d68e0899e8..90e53a46ce 100644 --- a/crates/dekaf/src/session.rs +++ b/crates/dekaf/src/session.rs @@ -569,30 +569,45 @@ impl Session { hit_timeout = true } - partition_responses.push( - PartitionData::default() - .with_partition_index(partition_request.partition) - // `kafka-protocol` encodes None here using a length of -1, but librdkafka client library - // complains with: `Protocol parse failure for Fetch v11 ... invalid MessageSetSize -1` - // An empty Bytes will get encoded with a length of 0, which works fine. - .with_records(batch.or(Some(Bytes::new())).to_owned()) - .with_high_watermark(pending.last_write_head) // Map to kafka cursor. - .with_last_stable_offset(pending.last_write_head), - ); - - if read.rewrite_offsets_from.is_none() { - pending.offset = read.offset; - pending.last_write_head = read.last_write_head; - pending.handle = - tokio_util::task::AbortOnDropHandle::new(tokio::spawn(read.next_batch( - crate::read::ReadTarget::Bytes( - partition_request.partition_max_bytes as usize, + let mut partition_data = PartitionData::default() + .with_partition_index(partition_request.partition) + // `kafka-protocol` encodes None here using a length of -1, but librdkafka client library + // complains with: `Protocol parse failure for Fetch v11 ... invalid MessageSetSize -1` + // An empty Bytes will get encoded with a length of 0, which works fine. + .with_records(batch.or(Some(Bytes::new())).to_owned()); + + match &self.data_preview_state { + SessionDataPreviewState::Unknown => { + unreachable!("Must have already determined data-preview status of session") + } + SessionDataPreviewState::NotDataPreview => { + partition_data = partition_data + .with_high_watermark(pending.last_write_head) // Map to kafka cursor. + .with_last_stable_offset(pending.last_write_head); + + pending.offset = read.offset; + pending.last_write_head = read.last_write_head; + pending.handle = tokio_util::task::AbortOnDropHandle::new(tokio::spawn( + read.next_batch( + crate::read::ReadTarget::Bytes( + partition_request.partition_max_bytes as usize, + ), + std::time::Instant::now() + timeout, ), - std::time::Instant::now() + timeout, - ))); - } else { - self.reads.remove(&key); + )); + } + SessionDataPreviewState::DataPreview(data_preview_states) => { + let data_preview_state = data_preview_states + .get(&key) + .expect("should be able to find data preview state by this point"); + partition_data = partition_data + .with_high_watermark(data_preview_state.offset) // Map to kafka cursor. + .with_last_stable_offset(data_preview_state.offset); + self.reads.remove(&key); + } } + + partition_responses.push(partition_data); } topic_responses.push(