From db73eeba913fa52abb7ad5c4d6426dbcd5283877 Mon Sep 17 00:00:00 2001 From: Joseph Shearer Date: Fri, 18 Oct 2024 13:38:20 -0400 Subject: [PATCH] dekaf: Fix sanity check to also bail out if a data-preview session is then used to ask for documents newer than the write-head --- crates/dekaf/src/session.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/crates/dekaf/src/session.rs b/crates/dekaf/src/session.rs index 4a33e2c853..a28e7a5fd6 100644 --- a/crates/dekaf/src/session.rs +++ b/crates/dekaf/src/session.rs @@ -19,11 +19,11 @@ use kafka_protocol::{ }, protocol::{buf::ByteBuf, Decodable, Encodable, Message, StrBytes}, }; +use std::{cmp::max, sync::Arc, time::Duration}; use std::{ collections::{hash_map::Entry, HashMap}, time::{SystemTime, UNIX_EPOCH}, }; -use std::{sync::Arc, time::Duration, cmp::max}; use tracing::instrument; struct PendingRead { @@ -430,7 +430,9 @@ impl Session { // so long as the request is still a data preview request. If not, bail out Entry::Occupied(entry) => { let data_preview_state = entry.get(); - if data_preview_state.offset - fetch_offset > 12 { + if fetch_offset > data_preview_state.offset + || data_preview_state.offset - fetch_offset > 12 + { bail!("Session was used for fetching preview data, cannot be used for fetching non-preview data.") } Some(data_preview_state.to_owned())