From 1b71f874bc8ba295e5a528e80509160f93e74cbe Mon Sep 17 00:00:00 2001 From: Joseph Shearer Date: Tue, 15 Oct 2024 17:02:17 -0400 Subject: [PATCH] dekaf: Add support for reading backwards in order to properly handle data preview UIs --- crates/dekaf/src/lib.rs | 1 + crates/dekaf/src/main.rs | 12 +- crates/dekaf/src/read.rs | 475 +++++++++++++++++++++++++++--------- crates/dekaf/src/session.rs | 63 ++++- 4 files changed, 429 insertions(+), 122 deletions(-) diff --git a/crates/dekaf/src/lib.rs b/crates/dekaf/src/lib.rs index 328864d77e..c81c484962 100644 --- a/crates/dekaf/src/lib.rs +++ b/crates/dekaf/src/lib.rs @@ -194,6 +194,7 @@ async fn handle_api( // https://github.com/confluentinc/librdkafka/blob/e03d3bb91ed92a38f38d9806b8d8deffe78a1de5/src/rdkafka_request.c#L2823 let (header, request) = dec_request(frame, version)?; tracing::debug!(client_id=?header.client_id, "Got client ID!"); + session.client_id = header.client_id.clone().map(|id| id.to_string()); Ok(enc_resp(out, &header, session.api_versions(request).await?)) } ApiKey::SaslHandshakeKey => { diff --git a/crates/dekaf/src/main.rs b/crates/dekaf/src/main.rs index 20fac234ce..04d87b3049 100644 --- a/crates/dekaf/src/main.rs +++ b/crates/dekaf/src/main.rs @@ -12,12 +12,16 @@ use futures::{FutureExt, TryStreamExt}; use rsasl::config::SASLConfig; use rustls::pki_types::CertificateDer; use std::{ + collections::HashMap, fs::File, io, path::{Path, PathBuf}, sync::Arc, }; -use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt}; +use tokio::{ + io::{split, AsyncRead, AsyncWrite, AsyncWriteExt}, + sync::RwLock, +}; use tracing_subscriber::{filter::LevelFilter, EnvFilter}; use url::Url; @@ -113,6 +117,8 @@ async fn main() -> anyhow::Result<()> { let cli = Cli::parse(); tracing::info!("Starting dekaf"); + let offset_map = Arc::new(RwLock::new(HashMap::new())); + let (api_endpoint, api_key) = if cli.local { (LOCAL_PG_URL.to_owned(), LOCAL_PG_PUBLIC_TOKEN.to_string()) } else { @@ -219,7 +225,7 @@ async fn main() -> anyhow::Result<()> { continue }; - tokio::spawn(serve(Session::new(app.clone(), cli.encryption_secret.to_owned()), socket, addr, stop.clone())); + tokio::spawn(serve(Session::new(app.clone(), cli.encryption_secret.to_owned(), offset_map.clone()), socket, addr, stop.clone())); } _ = &mut stop => break, } @@ -240,7 +246,7 @@ async fn main() -> anyhow::Result<()> { }; socket.set_nodelay(true)?; - tokio::spawn(serve(Session::new(app.clone(), cli.encryption_secret.to_owned()), socket, addr, stop.clone())); + tokio::spawn(serve(Session::new(app.clone(), cli.encryption_secret.to_owned(), offset_map.clone()), socket, addr, stop.clone())); } _ = &mut stop => break, } diff --git a/crates/dekaf/src/read.rs b/crates/dekaf/src/read.rs index 8fa4e2dff7..669e82ad91 100644 --- a/crates/dekaf/src/read.rs +++ b/crates/dekaf/src/read.rs @@ -1,13 +1,18 @@ use super::{Collection, Partition}; use anyhow::bail; use bytes::{Buf, BufMut, BytesMut}; -use doc::AsNode; -use futures::StreamExt; +use doc::{AsNode, OwnedArchivedNode}; +use futures::{Future, FutureExt, Stream, StreamExt}; use gazette::journal::{ReadJsonLine, ReadJsonLines}; use gazette::{broker, journal, uuid}; -use kafka_protocol::records::Compression; +use itertools::Itertools; +use kafka_protocol::records::{Compression, Record, TimestampType}; use lz4_flex::frame::BlockMode; -use std::time::{Duration, Instant}; +use std::{ + cmp::{max, min}, + pin::Pin, + time::{Duration, Instant}, +}; pub struct Read { /// Journal offset to be served by this Read. @@ -28,6 +33,7 @@ pub struct Read { // Keep these details around so we can create a new ReadRequest if we need to skip forward journal_name: String, + journal_client: journal::Client, } pub enum BatchResult { @@ -39,6 +45,14 @@ pub enum BatchResult { TimeoutNoData, } +#[derive(Copy, Clone)] +pub enum ReadTarget { + Bytes(usize), + Docs(usize), +} + +const READ_BACKWARDS_CHUNK_SIZE: usize = 2 << 21; // 4mb + impl Read { pub fn new( client: journal::Client, @@ -79,17 +93,226 @@ impl Read { value_schema_id, journal_name: partition.spec.name.clone(), + journal_client: client, } } #[tracing::instrument(skip_all,fields(journal_name=self.journal_name))] - pub async fn next_batch( + pub async fn next_batch_backward( + mut self, + target: ReadTarget, + timeout: Instant, + transform_offsets: bool, + ) -> anyhow::Result<(Self, BatchResult)> { + use kafka_protocol::records::{Compression, RecordBatchEncoder, RecordEncodeOptions}; + + let starting_offset = self.offset; + + let mut records = vec![]; + let mut records_bytes = 0; + let (not_before_sec, _) = self.not_before.to_unix(); + + let mut transient_errors = 0; + + let mut tmp = Vec::new(); + let mut buf = bytes::BytesMut::new(); + + let timeout = tokio::time::sleep_until(timeout.into()); + let timeout = futures::future::maybe_done(timeout); + tokio::pin!(timeout); + + let mut did_timeout = false; + + while match target { + ReadTarget::Bytes(target_bytes) => records_bytes < target_bytes, + ReadTarget::Docs(target_docs) => records.len() < target_docs, + } { + if self.offset <= 0 || did_timeout { + break; + } + + let mut chunk_lines = vec![]; + let mut chunk_bytes = 0; + self.offset -= READ_BACKWARDS_CHUNK_SIZE as i64; + + // Ensure offset doesn't go negative + if self.offset < 0 { + self.offset = 0; + } + + // Treat self.offset as the upper bound + let mut stream = self.journal_client.clone().read_json_lines( + broker::ReadRequest { + offset: self.offset, + block: false, + journal: self.journal_name.clone(), + begin_mod_time: not_before_sec as i64, + ..Default::default() + }, + 0, + ); + + loop { + if chunk_bytes > READ_BACKWARDS_CHUNK_SIZE { + tracing::debug!("Reached the end of this chunk, moving backwards"); + break; + } + match tokio::select! { + biased; // Attempt to read before yielding. + + read = stream.next() => read, + + _ = &mut timeout => { + did_timeout = true; + tracing::debug!("Timed out"); + break; // Yield if we reach a timeout + }, + } { + Some(Ok(ReadJsonLine::Meta(resp))) + if matches!( + resp.status(), + proto_gazette::broker::Status::OffsetNotYetAvailable + ) => + { + tracing::debug!("Reached the end of this collection, moving backwards"); + break; + } + Some(Ok(ReadJsonLine::Meta(resp))) => { + chunk_bytes += resp.content.len(); + chunk_lines.push(ReadJsonLine::Meta(resp)); + } + Some(Ok(ReadJsonLine::Doc { root, next_offset })) => { + chunk_bytes += root.bytes().len(); + chunk_lines.push(ReadJsonLine::Doc { root, next_offset }); + } + Some(Err(err)) if err.is_transient() && transient_errors < 5 => { + use rand::Rng; + + transient_errors = transient_errors + 1; + + tracing::warn!(error = ?err, "Retrying transient read error"); + let delay = Duration::from_millis(rand::thread_rng().gen_range(300..2000)); + tokio::time::sleep(delay).await; + // We can retry transient errors just by continuing to poll the stream + continue; + } + Some(Err(gazette::Error::Parsing { .. })) => { + continue; + } + Some(Err(err)) => return Err(err.into()), + // Is this true? + None => bail!("gazette client read never returns EOF"), + } + } + + tracing::debug!(lines = chunk_lines.len(), "Got lines before filter"); + let new_lines = match target { + ReadTarget::Bytes(_) => chunk_lines, + ReadTarget::Docs(target_docs) => { + let max_lines = max(0, target_docs - records.len()); + let max_new_lines = min(chunk_lines.len(), max_lines); + let start_offset = chunk_lines.len() - max_new_lines; + + tracing::debug!( + target_docs, + max_lines, + max_new_lines, + start_offset, + "Filtering" + ); + + if start_offset > 0 { + chunk_lines.drain(0..start_offset); + } + + chunk_lines + } + }; + tracing::debug!(lines = new_lines.len(), "Got lines after filter"); + + let mut temp_records = vec![]; + for line in new_lines.into_iter().rev() { + let (root, next_offset) = match line { + ReadJsonLine::Meta(_) => { + continue; + } + ReadJsonLine::Doc { root, next_offset } => (root, next_offset), + }; + let (record, size) = match self.to_record(&mut tmp, &mut buf, root, next_offset)? { + Some(resp) => resp, + None => { + continue; + } + }; + + temp_records.push(record); + records_bytes += size; + if matches!(target, ReadTarget::Bytes(target_bytes) if records_bytes > target_bytes) + { + break; + } + } + temp_records.reverse(); + temp_records.append(&mut records); + records = temp_records; + } + + let opts = RecordEncodeOptions { + compression: Compression::None, + version: 2, + }; + + let records = if transform_offsets { + let record_count = records.len(); + records.iter_mut().enumerate().map(|(idx, record)| { + *record.offset = record_count - idx; + record + }) + } else { + records.iter() + }; + + RecordBatchEncoder::encode(&mut buf, records, &opts, Some(compressor)) + .expect("record encoding cannot fail"); + + tracing::debug!( + count = records.len(), + first_offset = records.first().map(|r| r.offset).unwrap_or_default(), + last_offset = records.last().map(|r| r.offset).unwrap_or_default(), + last_write_head = self.last_write_head, + ratio = buf.len() as f64 / (records_bytes + 1) as f64, + records_bytes, + "returning records" + ); + + metrics::counter!("dekaf_documents_read", "journal_name" => self.journal_name.to_owned()) + .increment(records.len() as u64); + metrics::counter!("dekaf_bytes_read", "journal_name" => self.journal_name.to_owned()) + .increment(records_bytes as u64); + + let frozen = buf.freeze(); + + Ok(( + self, + match (records.len() > 0, did_timeout) { + (false, true) => BatchResult::TimeoutNoData, + (true, true) => BatchResult::TimeoutExceededBeforeTarget(frozen), + (true, false) => BatchResult::TargetExceededBeforeTimeout(frozen), + (false, false) => { + unreachable!("shouldn't be able see no documents, and also not timeout") + } + }, + )) + } + + #[tracing::instrument(skip_all,fields(journal_name=self.journal_name))] + pub async fn next_batch_forward( mut self, - target_bytes: usize, + target: ReadTarget, timeout: Instant, ) -> anyhow::Result<(Self, BatchResult)> { use kafka_protocol::records::{ - Compression, Record, RecordBatchEncoder, RecordEncodeOptions, TimestampType, + Compression, Record, RecordBatchEncoder, RecordEncodeOptions, }; let mut records: Vec = Vec::new(); @@ -109,7 +332,10 @@ impl Read { let mut did_timeout = false; - while records_bytes < target_bytes { + while match target { + ReadTarget::Bytes(target_bytes) => records_bytes < target_bytes, + ReadTarget::Docs(target_docs) => records.len() < target_docs, + } { let read = match tokio::select! { biased; // Attempt to read before yielding. @@ -169,110 +395,15 @@ impl Read { ReadJsonLine::Doc { root, next_offset } => (root, next_offset), }; - let Some(doc::ArchivedNode::String(uuid)) = self.uuid_ptr.query(root.get()) else { - let serialized_doc = root.get().to_debug_json_value(); - anyhow::bail!( - "document at offset {} does not have a valid UUID: {:?}", - self.offset, - serialized_doc - ); - }; - let (producer, clock, flags) = gazette::uuid::parse_str(uuid.as_str())?; - - if clock < self.not_before { - continue; - } - - // Is this a non-content control document, such as a transaction ACK? - let is_control = flags.is_ack(); - // Is this a deletion? - let is_deletion = matches!( - self.meta_op_ptr.query(root.get()), - Some(doc::ArchivedNode::String(op)) if op.as_str() == "d", - ); - - tmp.reserve(root.bytes().len()); // Avoid small allocations. - let (unix_seconds, unix_nanos) = clock.to_unix(); - - // Encode the key. - let key = if is_control { - // From https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging - // Also from https://docs.google.com/document/d/11Jqy_GjUGtdXJK94XGsEIK7CP1SnQGdp2eF0wSw9ra8/edit - // Control messages will always have a non-null key, which is used to - // indicate the type of control message type with the following schema: - // ControlMessageKey => Version ControlMessageType - // Version => int16 - // ControlMessageType => int16 - // Skip control messages with version != 0: - // if (ctrl_data.Version != 0) { - // rd_kafka_buf_skip_to(rkbuf, message_end); - // return RD_KAFKA_RESP_ERR_NO_ERROR; /* Continue with next msg */ - // } - // https://github.com/confluentinc/librdkafka/blob/master/src/rdkafka_msgset_reader.c#L777-L824 - - // Control Message keys are always 4 bytes: - // Version: Any value != 0: i16 - buf.put_i16(9999); - // ControlMessageType: unused: i16 - buf.put_i16(9999); - records_bytes += 4; - Some(buf.split().freeze()) - } else { - tmp.push(0); - tmp.extend(self.key_schema_id.to_be_bytes()); - () = avro::encode_key(&mut tmp, &self.key_schema, root.get(), &self.key_ptr)?; - - records_bytes += tmp.len(); - buf.extend_from_slice(&tmp); - tmp.clear(); - Some(buf.split().freeze()) - }; - - // Encode the value. - let value = if is_control || is_deletion { - None - } else { - tmp.push(0); - tmp.extend(self.value_schema_id.to_be_bytes()); - () = avro::encode(&mut tmp, &self.value_schema, root.get())?; - - records_bytes += tmp.len(); - buf.extend_from_slice(&tmp); - tmp.clear(); - Some(buf.split().freeze()) + let (record, size) = match self.to_record(&mut tmp, &mut buf, root, next_offset)? { + Some(resp) => resp, + None => { + continue; + } }; - self.offset = next_offset; - - // Map documents into a Kafka offset which is their last - // inclusive byte index within the document. - // - // Kafka adds one for its next fetch_offset, and this behavior - // means its next fetch will be a valid document begin offset. - // - // This behavior also lets us subtract one from the journal - // write head or a fragment end offset to arrive at a - // logically correct Kafka high water mark which a client - // can expect to read through. - // - // Note that sequence must increment at the same rate - // as offset for efficient record batch packing. - let kafka_offset = next_offset - 1; - - records.push(Record { - control: is_control, - headers: Default::default(), - key, - offset: kafka_offset, - partition_leader_epoch: 1, - producer_epoch: 1, - producer_id: producer.as_i64(), - sequence: kafka_offset as i32, - timestamp: unix_seconds as i64 * 1000 + unix_nanos as i64 / 1_000_000, // Map into millis. - timestamp_type: TimestampType::LogAppend, - transactional: false, - value, - }); + records.push(record); + records_bytes += size; } let opts = RecordEncodeOptions { @@ -297,18 +428,138 @@ impl Read { metrics::counter!("dekaf_bytes_read", "journal_name" => self.journal_name.to_owned()) .increment(records_bytes as u64); + let frozen = buf.freeze(); + Ok(( self, match (records.len() > 0, did_timeout) { (false, true) => BatchResult::TimeoutNoData, - (true, true) => BatchResult::TimeoutExceededBeforeTarget(buf.freeze()), - (true, false) => BatchResult::TargetExceededBeforeTimeout(buf.freeze()), + (true, true) => BatchResult::TimeoutExceededBeforeTarget(frozen), + (true, false) => BatchResult::TargetExceededBeforeTimeout(frozen), (false, false) => { unreachable!("shouldn't be able see no documents, and also not timeout") } }, )) } + + fn to_record( + &mut self, + tmp: &mut Vec, + buf: &mut BytesMut, + root: OwnedArchivedNode, + next_offset: i64, + ) -> anyhow::Result> { + let mut record_bytes: usize = 0; + + let Some(doc::ArchivedNode::String(uuid)) = self.uuid_ptr.query(root.get()) else { + let serialized_doc = root.get().to_debug_json_value(); + anyhow::bail!( + "document at offset {} does not have a valid UUID: {:?}", + self.offset, + serialized_doc + ); + }; + let (producer, clock, flags) = gazette::uuid::parse_str(uuid.as_str())?; + + if clock < self.not_before { + return Ok(None); + } + + // Is this a non-content control document, such as a transaction ACK? + let is_control = flags.is_ack(); + // Is this a deletion? + let is_deletion = matches!( + self.meta_op_ptr.query(root.get()), + Some(doc::ArchivedNode::String(op)) if op.as_str() == "d", + ); + + tmp.reserve(root.bytes().len()); // Avoid small allocations. + let (unix_seconds, unix_nanos) = clock.to_unix(); + + // Encode the key. + let key = if is_control { + // From https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging + // Also from https://docs.google.com/document/d/11Jqy_GjUGtdXJK94XGsEIK7CP1SnQGdp2eF0wSw9ra8/edit + // Control messages will always have a non-null key, which is used to + // indicate the type of control message type with the following schema: + // ControlMessageKey => Version ControlMessageType + // Version => int16 + // ControlMessageType => int16 + // Skip control messages with version != 0: + // if (ctrl_data.Version != 0) { + // rd_kafka_buf_skip_to(rkbuf, message_end); + // return RD_KAFKA_RESP_ERR_NO_ERROR; /* Continue with next msg */ + // } + // https://github.com/confluentinc/librdkafka/blob/master/src/rdkafka_msgset_reader.c#L777-L824 + + // Control Message keys are always 4 bytes: + // Version: Any value != 0: i16 + buf.put_i16(9999); + // ControlMessageType: unused: i16 + buf.put_i16(9999); + record_bytes += 4; + Some(buf.split().freeze()) + } else { + tmp.push(0); + tmp.extend(self.key_schema_id.to_be_bytes()); + () = avro::encode_key(tmp, &self.key_schema, root.get(), &self.key_ptr)?; + + record_bytes += tmp.len(); + buf.extend_from_slice(&tmp); + tmp.clear(); + Some(buf.split().freeze()) + }; + + // Encode the value. + let value = if is_control || is_deletion { + None + } else { + tmp.push(0); + tmp.extend(self.value_schema_id.to_be_bytes()); + () = avro::encode(tmp, &self.value_schema, root.get())?; + + record_bytes += tmp.len(); + buf.extend_from_slice(&tmp); + tmp.clear(); + Some(buf.split().freeze()) + }; + + self.offset = next_offset; + + // Map documents into a Kafka offset which is their last + // inclusive byte index within the document. + // + // Kafka adds one for its next fetch_offset, and this behavior + // means its next fetch will be a valid document begin offset. + // + // This behavior also lets us subtract one from the journal + // write head or a fragment end offset to arrive at a + // logically correct Kafka high water mark which a client + // can expect to read through. + // + // Note that sequence must increment at the same rate + // as offset for efficient record batch packing. + let kafka_offset = next_offset - 1; + + Ok(Some(( + Record { + control: is_control, + headers: Default::default(), + key, + offset: kafka_offset, + partition_leader_epoch: 1, + producer_epoch: 1, + producer_id: producer.as_i64(), + sequence: kafka_offset as i32, + timestamp: unix_seconds as i64 * 1000 + unix_nanos as i64 / 1_000_000, // Map into millis. + timestamp_type: TimestampType::LogAppend, + transactional: false, + value, + }, + record_bytes, + ))) + } } fn compressor( diff --git a/crates/dekaf/src/session.rs b/crates/dekaf/src/session.rs index f2d483d407..57c56da831 100644 --- a/crates/dekaf/src/session.rs +++ b/crates/dekaf/src/session.rs @@ -22,6 +22,7 @@ use std::{ time::{SystemTime, UNIX_EPOCH}, }; use std::{sync::Arc, time::Duration}; +use tokio::sync::RwLock; use tracing::instrument; struct PendingRead { @@ -35,15 +36,23 @@ pub struct Session { reads: HashMap<(TopicName, i32), PendingRead>, secret: String, auth: Option, + partition_offsets: Arc>>, + pub client_id: Option, } impl Session { - pub fn new(app: Arc, secret: String) -> Self { + pub fn new( + app: Arc, + secret: String, + offsets: Arc>>, + ) -> Self { Self { app, reads: HashMap::new(), auth: None, secret, + partition_offsets: offsets, + client_id: None, } } @@ -311,6 +320,8 @@ impl Session { ListOffsetsPartitionResponse, ListOffsetsTopicResponse, }; + let mut new_offsets = HashMap::new(); + // Map topics, partition indices, and fetched offsets into a comprehensive response. let response = collections .into_iter() @@ -324,6 +335,8 @@ impl Session { .with_error_code(ResponseError::UnknownTopicOrPartition.code()); }; + new_offsets.insert((topic_name.to_owned(), partition_index), offset); + ListOffsetsPartitionResponse::default() .with_partition_index(partition_index) .with_offset(offset) @@ -337,6 +350,8 @@ impl Session { }) .collect(); + self.partition_offsets.write().await.extend(new_offsets); + Ok(messages::ListOffsetsResponse::default().with_topics(response)) } @@ -363,8 +378,7 @@ impl Session { .authenticated_client() .await?; - let timeout_at = - std::time::Instant::now() + std::time::Duration::from_millis(max_wait_ms as u64); + let timeout = std::time::Duration::from_millis(max_wait_ms as u64); let mut hit_timeout = false; @@ -373,6 +387,19 @@ impl Session { let mut key = (from_downstream_topic_name(topic_request.topic.clone()), 0); for partition_request in &topic_request.partitions { + let read_guard = self.partition_offsets.read().await; + let fetched_offset = read_guard + .get(&(key.0.to_owned(), partition_request.partition)) + .copied(); + + drop(read_guard); + + let diff = if let Some(offset) = fetched_offset { + Some(offset - partition_request.fetch_offset) + } else { + None + }; + key.1 = partition_request.partition; let fetch_offset = partition_request.fetch_offset; @@ -406,9 +433,26 @@ impl Session { let pending = PendingRead { offset: fetch_offset, last_write_head: fetch_offset, - handle: tokio_util::task::AbortOnDropHandle::new(tokio::spawn( - read.next_batch(partition_request.partition_max_bytes as usize, timeout_at), - )), + handle: tokio_util::task::AbortOnDropHandle::new(match diff { + // Startree + Some(d) if d == 0 => tokio::spawn(read.next_batch_backward( + crate::read::ReadTarget::Docs(1), + std::time::Instant::now() + timeout * 2, + false, + )), + // Startree + Some(d) if d == 12 => tokio::spawn(read.next_batch_backward( + crate::read::ReadTarget::Docs(12), + std::time::Instant::now() + timeout * 2, + true, + )), + _ => tokio::spawn(read.next_batch_forward( + crate::read::ReadTarget::Bytes( + partition_request.partition_max_bytes as usize, + ), + std::time::Instant::now() + timeout, + )), + }), }; tracing::info!( @@ -454,7 +498,12 @@ impl Session { pending.offset = read.offset; pending.last_write_head = read.last_write_head; pending.handle = tokio_util::task::AbortOnDropHandle::new(tokio::spawn( - read.next_batch(partition_request.partition_max_bytes as usize, timeout_at), + read.next_batch_forward( + crate::read::ReadTarget::Bytes( + partition_request.partition_max_bytes as usize, + ), + std::time::Instant::now() + timeout, + ), )); let (timeout, batch) = match batch {