diff --git a/Cargo.lock b/Cargo.lock index a58bc08594..53550910e3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1869,6 +1869,7 @@ dependencies = [ "axum-extra", "axum-server", "base64 0.13.1", + "bumpalo", "bytes", "clap 4.5.17", "crypto-common", @@ -1882,6 +1883,7 @@ dependencies = [ "itertools 0.10.5", "kafka-protocol", "labels", + "lazy_static", "lz4_flex", "md5", "metrics", diff --git a/crates/avro/src/lib.rs b/crates/avro/src/lib.rs index ad35c0e0a8..78988c0404 100644 --- a/crates/avro/src/lib.rs +++ b/crates/avro/src/lib.rs @@ -42,19 +42,27 @@ pub enum Error { ParseFloat(String, #[source] std::num::ParseFloatError), } +/// Map a [`doc::Shape`] and key pointers into its equivalent AVRO schema. +pub fn shape_to_avro( + shape: doc::Shape, + key: &[doc::Pointer], +) -> (apache_avro::Schema, apache_avro::Schema) { + ( + schema::key_to_avro(key, shape.clone()), + schema::shape_to_avro(json::Location::Root, shape, true), + ) +} + /// Map a JSON schema bundle and key pointers into its equivalent AVRO schema. pub fn json_schema_to_avro( - json_schema: &str, + schema: &str, key: &[doc::Pointer], ) -> Result<(apache_avro::Schema, apache_avro::Schema), Error> { - let json_schema = doc::validation::build_bundle(json_schema)?; + let json_schema = doc::validation::build_bundle(schema)?; let validator = doc::Validator::new(json_schema)?; let shape = doc::Shape::infer(&validator.schemas()[0], validator.schema_index()); - Ok(( - schema::key_to_avro(key, shape.clone()), - schema::shape_to_avro(json::Location::Root, shape, true), - )) + Ok(shape_to_avro(shape, key)) } /// Encode a document into a binary AVRO representation using the given schema. diff --git a/crates/dekaf/Cargo.toml b/crates/dekaf/Cargo.toml index c33527eda7..292cecb1a3 100644 --- a/crates/dekaf/Cargo.toml +++ b/crates/dekaf/Cargo.toml @@ -22,12 +22,12 @@ ops = { path = "../ops" } proto-flow = { path = "../proto-flow" } proto-gazette = { path = "../proto-gazette" } simd-doc = { path = "../simd-doc" } - anyhow = { workspace = true } axum = { workspace = true } axum-extra = { workspace = true } axum-server = { workspace = true } base64 = { workspace = true } +bumpalo = { workspace = true } bytes = { workspace = true } clap = { workspace = true } crypto-common = { workspace = true } @@ -37,6 +37,7 @@ hex = { workspace = true } hexdump = { workspace = true } itertools = { workspace = true } kafka-protocol = { workspace = true } +lazy_static = { workspace = true } lz4_flex = { workspace = true } md5 = { workspace = true } metrics = { workspace = true } diff --git a/crates/dekaf/src/connector.rs b/crates/dekaf/src/connector.rs index 4edcf44849..e945966814 100644 --- a/crates/dekaf/src/connector.rs +++ b/crates/dekaf/src/connector.rs @@ -7,13 +7,19 @@ use std::collections::BTreeMap; #[derive(Serialize, Deserialize, Clone, Debug, PartialEq, JsonSchema, Copy)] #[serde(rename_all = "snake_case")] pub enum DeletionMode { - Default, - Header, + // Handles deletions using the regular Kafka upsert envelope, where a deletion + // is represented by a record containing the key that was deleted, and a null value. + Kafka, + // Handles deletions by passing through the full deletion document as it exists + // in the source collection, as well as including a new field `_meta/is_deleted` + // which is defined as the number `1` on deletions, and `0` otherwise. + #[serde(rename = "cdc")] + CDC, } impl Default for DeletionMode { fn default() -> Self { - Self::Default + Self::Kafka } } diff --git a/crates/dekaf/src/read.rs b/crates/dekaf/src/read.rs index 3508b0c8d7..7bc259c0b9 100644 --- a/crates/dekaf/src/read.rs +++ b/crates/dekaf/src/read.rs @@ -1,8 +1,8 @@ use super::{Collection, Partition}; use crate::connector::DeletionMode; -use anyhow::bail; +use anyhow::{bail, Context}; use bytes::{Buf, BufMut, BytesMut}; -use doc::AsNode; +use doc::{heap::ArchivedNode, AsNode, HeapNode, OwnedArchivedNode}; use futures::StreamExt; use gazette::journal::{ReadJsonLine, ReadJsonLines}; use gazette::{broker, journal, uuid}; @@ -10,6 +10,7 @@ use kafka_protocol::{ protocol::StrBytes, records::{Compression, TimestampType}, }; +use lazy_static::lazy_static; use lz4_flex::frame::BlockMode; pub struct Read { @@ -36,6 +37,7 @@ pub struct Read { offset_start: i64, deletes: DeletionMode, + alloc: bumpalo::Bump, pub(crate) rewrite_offsets_from: Option, } @@ -55,9 +57,9 @@ pub enum ReadTarget { Docs(usize), } -const DELETION_HEADER: &str = "_is_deleted"; -const DELETION_VAL_DELETED: &[u8] = &[1u8]; -const DELETION_VAL_NOT_DELETED: &[u8] = &[0u8]; +lazy_static! { + static ref DELETION_INDICATOR_PTR: doc::Pointer = doc::Pointer::from_str("/_meta/is_deleted"); +} impl Read { pub fn new( @@ -103,6 +105,7 @@ impl Read { journal_name: partition.spec.name.clone(), rewrite_offsets_from, deletes, + alloc: Default::default(), offset_start: offset, } } @@ -267,12 +270,24 @@ impl Read { // Encode the value. let value = - if is_control || (is_deletion && matches!(self.deletes, DeletionMode::Default)) { + if is_control || (is_deletion && matches!(self.deletes, DeletionMode::Kafka)) { None } else { tmp.push(0); tmp.extend(self.value_schema_id.to_be_bytes()); - () = avro::encode(&mut tmp, &self.value_schema, root.get())?; + + if matches!(self.deletes, DeletionMode::CDC) { + let mut heap_node = HeapNode::from_node(root.get(), &self.alloc); + let foo = DELETION_INDICATOR_PTR + .create_heap_node(&mut heap_node, &self.alloc) + .context("Unable to add deletion meta indicator")?; + + *foo = HeapNode::PosInt(if is_deletion { 1 } else { 0 }); + + () = avro::encode(&mut tmp, &self.value_schema, &heap_node)?; + } else { + () = avro::encode(&mut tmp, &self.value_schema, root.get())?; + } record_bytes += tmp.len(); buf.extend_from_slice(&tmp); @@ -303,21 +318,7 @@ impl Read { records.push(Record { control: is_control, - headers: if matches!(self.deletes, DeletionMode::Header) { - let deletion_val = if is_deletion { - DELETION_VAL_DELETED - } else { - DELETION_VAL_NOT_DELETED - }; - let mut headers = kafka_protocol::indexmap::IndexMap::new(); - headers.insert( - StrBytes::from_static_str(DELETION_HEADER), - Some(bytes::Bytes::from_static(&deletion_val)), - ); - headers - } else { - Default::default() - }, + headers: Default::default(), key, offset: kafka_offset, partition_leader_epoch: 1, diff --git a/crates/dekaf/src/registry.rs b/crates/dekaf/src/registry.rs index 45840e0897..aaf04f317c 100644 --- a/crates/dekaf/src/registry.rs +++ b/crates/dekaf/src/registry.rs @@ -73,8 +73,11 @@ async fn get_subject_latest( axum::extract::Path(subject): axum::extract::Path, ) -> Response { wrap(async move { - let Authenticated { client, .. } = - app.authenticate(auth.username(), auth.password()).await?; + let Authenticated { + client, + task_config, + .. + } = app.authenticate(auth.username(), auth.password()).await?; let (is_key, collection) = if subject.ends_with("-value") { (false, &subject[..subject.len() - 6]) @@ -89,6 +92,7 @@ async fn get_subject_latest( &from_downstream_topic_name(TopicName::from(StrBytes::from_string( collection.to_string(), ))), + task_config.deletions, ) .await .context("failed to fetch collection metadata")? diff --git a/crates/dekaf/src/session.rs b/crates/dekaf/src/session.rs index 960fc19ded..e9cc5bdbb3 100644 --- a/crates/dekaf/src/session.rs +++ b/crates/dekaf/src/session.rs @@ -46,6 +46,7 @@ pub struct Session { secret: String, auth: Option, data_preview_state: SessionDataPreviewState, + alloc: bumpalo::Bump, pub client_id: Option, } @@ -57,6 +58,7 @@ impl Session { auth: None, secret, client_id: None, + alloc: Default::default(), data_preview_state: SessionDataPreviewState::Unknown, } } @@ -176,12 +178,13 @@ impl Session { &mut self, requests: Vec, ) -> anyhow::Result> { - let client = self + let auth = self .auth .as_mut() - .ok_or(anyhow::anyhow!("Session not authenticated"))? - .authenticated_client() - .await?; + .ok_or(anyhow::anyhow!("Session not authenticated"))?; + + let deletions = auth.task_config.deletions.to_owned(); + let client = auth.authenticated_client().await?; // Concurrently fetch Collection instances for all requested topics. let collections: anyhow::Result)>> = @@ -192,6 +195,7 @@ impl Session { client, from_downstream_topic_name(topic.name.to_owned().unwrap_or_default()) .as_str(), + deletions, ) .await?; Ok((topic.name.unwrap_or_default(), maybe_collection)) @@ -263,12 +267,13 @@ impl Session { &mut self, request: messages::ListOffsetsRequest, ) -> anyhow::Result { - let client = self + let auth = self .auth .as_mut() - .ok_or(anyhow::anyhow!("Session not authenticated"))? - .authenticated_client() - .await?; + .ok_or(anyhow::anyhow!("Session not authenticated"))?; + + let deletions = auth.task_config.deletions.to_owned(); + let client = auth.authenticated_client().await?; // Concurrently fetch Collection instances and offsets for all requested topics and partitions. // Map each "topic" into Vec<(Partition Index, Option. @@ -277,6 +282,7 @@ impl Session { let maybe_collection = Collection::new( client, from_downstream_topic_name(topic.name.clone()).as_str(), + deletions, ) .await?; @@ -480,7 +486,8 @@ impl Session { _ => {} } - let Some(collection) = Collection::new(&client, &key.0).await? else { + let Some(collection) = Collection::new(&client, &key.0, config.deletions).await? + else { metrics::counter!( "dekaf_fetch_requests", "topic_name" => key.0.to_string(), @@ -1038,13 +1045,13 @@ impl Session { .connect_to_group_coordinator(req.group_id.as_str()) .await?; - let flow_client = self + let auth = self .auth .as_mut() - .ok_or(anyhow::anyhow!("Session not authenticated"))? - .authenticated_client() - .await? - .clone(); + .ok_or(anyhow::anyhow!("Session not authenticated"))?; + + let deletions = auth.task_config.deletions.to_owned(); + let flow_client = auth.authenticated_client().await?.clone(); client .ensure_topics( @@ -1061,10 +1068,11 @@ impl Session { for topic in resp.topics.iter_mut() { topic.name = self.decrypt_topic_name(topic.name.to_owned()); - let collection_partitions = Collection::new(&flow_client, topic.name.as_str()) - .await? - .context(format!("unable to look up partitions for {:?}", topic.name))? - .partitions; + let collection_partitions = + Collection::new(&flow_client, topic.name.as_str(), deletions) + .await? + .context(format!("unable to look up partitions for {:?}", topic.name))? + .partitions; for partition in &topic.partitions { if let Some(error) = partition.error_code.err() { @@ -1258,17 +1266,18 @@ impl Session { partition: i32, fetch_offset: i64, ) -> anyhow::Result> { - let client = self + let auth = self .auth .as_mut() - .ok_or(anyhow::anyhow!("Session not authenticated"))? - .authenticated_client() - .await?; + .ok_or(anyhow::anyhow!("Session not authenticated"))?; + + let deletions = auth.task_config.deletions.to_owned(); + let client = auth.authenticated_client().await?; tracing::debug!( "Loading latest offset for this partition to check if session is data-preview" ); - let collection = Collection::new(&client, collection_name.as_str()) + let collection = Collection::new(&client, collection_name.as_str(), deletions) .await? .ok_or(anyhow::anyhow!("Collection {} not found", collection_name))?; diff --git a/crates/dekaf/src/topology.rs b/crates/dekaf/src/topology.rs index 8b69c5d2ce..8948365af3 100644 --- a/crates/dekaf/src/topology.rs +++ b/crates/dekaf/src/topology.rs @@ -1,3 +1,4 @@ +use crate::connector::DeletionMode; use anyhow::Context; use futures::{StreamExt, TryFutureExt, TryStreamExt}; use gazette::{broker, journal, uuid}; @@ -61,6 +62,7 @@ impl Collection { pub async fn new( client: &flow_client::Client, collection: &str, + deletion_mode: DeletionMode, ) -> anyhow::Result> { let not_before = uuid::Clock::default(); let pg_client = client.pg_client(); @@ -87,7 +89,16 @@ impl Collection { } else { &spec.read_schema_json }; - let (key_schema, value_schema) = avro::json_schema_to_avro(json_schema, &key_ptr)?; + + let json_schema = doc::validation::build_bundle(json_schema)?; + let validator = doc::Validator::new(json_schema)?; + let mut shape = doc::Shape::infer(&validator.schemas()[0], validator.schema_index()); + + if matches!(deletion_mode, DeletionMode::CDC) { + shape.widen(&serde_json::json!({"_meta":{"is_deleted":1}})); + } + + let (key_schema, value_schema) = avro::shape_to_avro(shape, &key_ptr); tracing::debug!( collection,