Skip to content

Commit

Permalink
dekaf: Refactor DeletionMode to emit an additional field `/_meta/is…
Browse files Browse the repository at this point in the history
…_deleted` for deletions

Also needed to add this field to the generated Avro schemas
  • Loading branch information
jshearer committed Oct 31, 2024
1 parent 777344a commit 34e8d07
Show file tree
Hide file tree
Showing 8 changed files with 100 additions and 58 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 14 additions & 6 deletions crates/avro/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,19 +42,27 @@ pub enum Error {
ParseFloat(String, #[source] std::num::ParseFloatError),
}

/// Map a [`doc::Shape`] and key pointers into its equivalent AVRO schema.
pub fn shape_to_avro(
shape: doc::Shape,
key: &[doc::Pointer],
) -> (apache_avro::Schema, apache_avro::Schema) {
(
schema::key_to_avro(key, shape.clone()),
schema::shape_to_avro(json::Location::Root, shape, true),
)
}

/// Map a JSON schema bundle and key pointers into its equivalent AVRO schema.
pub fn json_schema_to_avro(
json_schema: &str,
schema: &str,
key: &[doc::Pointer],
) -> Result<(apache_avro::Schema, apache_avro::Schema), Error> {
let json_schema = doc::validation::build_bundle(json_schema)?;
let json_schema = doc::validation::build_bundle(schema)?;
let validator = doc::Validator::new(json_schema)?;
let shape = doc::Shape::infer(&validator.schemas()[0], validator.schema_index());

Ok((
schema::key_to_avro(key, shape.clone()),
schema::shape_to_avro(json::Location::Root, shape, true),
))
Ok(shape_to_avro(shape, key))
}

/// Encode a document into a binary AVRO representation using the given schema.
Expand Down
3 changes: 2 additions & 1 deletion crates/dekaf/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,12 @@ ops = { path = "../ops" }
proto-flow = { path = "../proto-flow" }
proto-gazette = { path = "../proto-gazette" }
simd-doc = { path = "../simd-doc" }

anyhow = { workspace = true }
axum = { workspace = true }
axum-extra = { workspace = true }
axum-server = { workspace = true }
base64 = { workspace = true }
bumpalo = { workspace = true }
bytes = { workspace = true }
clap = { workspace = true }
crypto-common = { workspace = true }
Expand All @@ -37,6 +37,7 @@ hex = { workspace = true }
hexdump = { workspace = true }
itertools = { workspace = true }
kafka-protocol = { workspace = true }
lazy_static = { workspace = true }
lz4_flex = { workspace = true }
md5 = { workspace = true }
metrics = { workspace = true }
Expand Down
12 changes: 9 additions & 3 deletions crates/dekaf/src/connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,19 @@ use std::collections::BTreeMap;
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, JsonSchema, Copy)]
#[serde(rename_all = "snake_case")]
pub enum DeletionMode {
Default,
Header,
// Handles deletions using the regular Kafka upsert envelope, where a deletion
// is represented by a record containing the key that was deleted, and a null value.
Kafka,
// Handles deletions by passing through the full deletion document as it exists
// in the source collection, as well as including a new field `_meta/is_deleted`
// which is defined as the number `1` on deletions, and `0` otherwise.
#[serde(rename = "cdc")]
CDC,
}

impl Default for DeletionMode {
fn default() -> Self {
Self::Default
Self::Kafka
}
}

Expand Down
45 changes: 23 additions & 22 deletions crates/dekaf/src/read.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
use super::{Collection, Partition};
use crate::connector::DeletionMode;
use anyhow::bail;
use anyhow::{bail, Context};
use bytes::{Buf, BufMut, BytesMut};
use doc::AsNode;
use doc::{heap::ArchivedNode, AsNode, HeapNode, OwnedArchivedNode};
use futures::StreamExt;
use gazette::journal::{ReadJsonLine, ReadJsonLines};
use gazette::{broker, journal, uuid};
use kafka_protocol::{
protocol::StrBytes,
records::{Compression, TimestampType},
};
use lazy_static::lazy_static;
use lz4_flex::frame::BlockMode;

pub struct Read {
Expand All @@ -36,6 +37,7 @@ pub struct Read {
offset_start: i64,

deletes: DeletionMode,
alloc: bumpalo::Bump,

pub(crate) rewrite_offsets_from: Option<i64>,
}
Expand All @@ -55,9 +57,9 @@ pub enum ReadTarget {
Docs(usize),
}

const DELETION_HEADER: &str = "_is_deleted";
const DELETION_VAL_DELETED: &[u8] = &[1u8];
const DELETION_VAL_NOT_DELETED: &[u8] = &[0u8];
lazy_static! {
static ref DELETION_INDICATOR_PTR: doc::Pointer = doc::Pointer::from_str("/_meta/is_deleted");
}

impl Read {
pub fn new(
Expand Down Expand Up @@ -103,6 +105,7 @@ impl Read {
journal_name: partition.spec.name.clone(),
rewrite_offsets_from,
deletes,
alloc: Default::default(),
offset_start: offset,
}
}
Expand Down Expand Up @@ -267,12 +270,24 @@ impl Read {

// Encode the value.
let value =
if is_control || (is_deletion && matches!(self.deletes, DeletionMode::Default)) {
if is_control || (is_deletion && matches!(self.deletes, DeletionMode::Kafka)) {
None
} else {
tmp.push(0);
tmp.extend(self.value_schema_id.to_be_bytes());
() = avro::encode(&mut tmp, &self.value_schema, root.get())?;

if matches!(self.deletes, DeletionMode::CDC) {
let mut heap_node = HeapNode::from_node(root.get(), &self.alloc);
let foo = DELETION_INDICATOR_PTR
.create_heap_node(&mut heap_node, &self.alloc)
.context("Unable to add deletion meta indicator")?;

*foo = HeapNode::PosInt(if is_deletion { 1 } else { 0 });

() = avro::encode(&mut tmp, &self.value_schema, &heap_node)?;
} else {
() = avro::encode(&mut tmp, &self.value_schema, root.get())?;
}

record_bytes += tmp.len();
buf.extend_from_slice(&tmp);
Expand Down Expand Up @@ -303,21 +318,7 @@ impl Read {

records.push(Record {
control: is_control,
headers: if matches!(self.deletes, DeletionMode::Header) {
let deletion_val = if is_deletion {
DELETION_VAL_DELETED
} else {
DELETION_VAL_NOT_DELETED
};
let mut headers = kafka_protocol::indexmap::IndexMap::new();
headers.insert(
StrBytes::from_static_str(DELETION_HEADER),
Some(bytes::Bytes::from_static(&deletion_val)),
);
headers
} else {
Default::default()
},
headers: Default::default(),
key,
offset: kafka_offset,
partition_leader_epoch: 1,
Expand Down
8 changes: 6 additions & 2 deletions crates/dekaf/src/registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,11 @@ async fn get_subject_latest(
axum::extract::Path(subject): axum::extract::Path<String>,
) -> Response {
wrap(async move {
let Authenticated { client, .. } =
app.authenticate(auth.username(), auth.password()).await?;
let Authenticated {
client,
task_config,
..
} = app.authenticate(auth.username(), auth.password()).await?;

let (is_key, collection) = if subject.ends_with("-value") {
(false, &subject[..subject.len() - 6])
Expand All @@ -89,6 +92,7 @@ async fn get_subject_latest(
&from_downstream_topic_name(TopicName::from(StrBytes::from_string(
collection.to_string(),
))),
task_config.deletions,
)
.await
.context("failed to fetch collection metadata")?
Expand Down
55 changes: 32 additions & 23 deletions crates/dekaf/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ pub struct Session {
secret: String,
auth: Option<Authenticated>,
data_preview_state: SessionDataPreviewState,
alloc: bumpalo::Bump,
pub client_id: Option<String>,
}

Expand All @@ -57,6 +58,7 @@ impl Session {
auth: None,
secret,
client_id: None,
alloc: Default::default(),
data_preview_state: SessionDataPreviewState::Unknown,
}
}
Expand Down Expand Up @@ -176,12 +178,13 @@ impl Session {
&mut self,
requests: Vec<messages::metadata_request::MetadataRequestTopic>,
) -> anyhow::Result<Vec<MetadataResponseTopic>> {
let client = self
let auth = self
.auth
.as_mut()
.ok_or(anyhow::anyhow!("Session not authenticated"))?
.authenticated_client()
.await?;
.ok_or(anyhow::anyhow!("Session not authenticated"))?;

let deletions = auth.task_config.deletions.to_owned();
let client = auth.authenticated_client().await?;

// Concurrently fetch Collection instances for all requested topics.
let collections: anyhow::Result<Vec<(TopicName, Option<Collection>)>> =
Expand All @@ -192,6 +195,7 @@ impl Session {
client,
from_downstream_topic_name(topic.name.to_owned().unwrap_or_default())
.as_str(),
deletions,
)
.await?;
Ok((topic.name.unwrap_or_default(), maybe_collection))
Expand Down Expand Up @@ -263,12 +267,13 @@ impl Session {
&mut self,
request: messages::ListOffsetsRequest,
) -> anyhow::Result<messages::ListOffsetsResponse> {
let client = self
let auth = self
.auth
.as_mut()
.ok_or(anyhow::anyhow!("Session not authenticated"))?
.authenticated_client()
.await?;
.ok_or(anyhow::anyhow!("Session not authenticated"))?;

let deletions = auth.task_config.deletions.to_owned();
let client = auth.authenticated_client().await?;

// Concurrently fetch Collection instances and offsets for all requested topics and partitions.
// Map each "topic" into Vec<(Partition Index, Option<PartitionOffset>.
Expand All @@ -277,6 +282,7 @@ impl Session {
let maybe_collection = Collection::new(
client,
from_downstream_topic_name(topic.name.clone()).as_str(),
deletions,
)
.await?;

Expand Down Expand Up @@ -480,7 +486,8 @@ impl Session {
_ => {}
}

let Some(collection) = Collection::new(&client, &key.0).await? else {
let Some(collection) = Collection::new(&client, &key.0, config.deletions).await?
else {
metrics::counter!(
"dekaf_fetch_requests",
"topic_name" => key.0.to_string(),
Expand Down Expand Up @@ -1038,13 +1045,13 @@ impl Session {
.connect_to_group_coordinator(req.group_id.as_str())
.await?;

let flow_client = self
let auth = self
.auth
.as_mut()
.ok_or(anyhow::anyhow!("Session not authenticated"))?
.authenticated_client()
.await?
.clone();
.ok_or(anyhow::anyhow!("Session not authenticated"))?;

let deletions = auth.task_config.deletions.to_owned();
let flow_client = auth.authenticated_client().await?.clone();

client
.ensure_topics(
Expand All @@ -1061,10 +1068,11 @@ impl Session {
for topic in resp.topics.iter_mut() {
topic.name = self.decrypt_topic_name(topic.name.to_owned());

let collection_partitions = Collection::new(&flow_client, topic.name.as_str())
.await?
.context(format!("unable to look up partitions for {:?}", topic.name))?
.partitions;
let collection_partitions =
Collection::new(&flow_client, topic.name.as_str(), deletions)
.await?
.context(format!("unable to look up partitions for {:?}", topic.name))?
.partitions;

for partition in &topic.partitions {
if let Some(error) = partition.error_code.err() {
Expand Down Expand Up @@ -1258,17 +1266,18 @@ impl Session {
partition: i32,
fetch_offset: i64,
) -> anyhow::Result<Option<PartitionOffset>> {
let client = self
let auth = self
.auth
.as_mut()
.ok_or(anyhow::anyhow!("Session not authenticated"))?
.authenticated_client()
.await?;
.ok_or(anyhow::anyhow!("Session not authenticated"))?;

let deletions = auth.task_config.deletions.to_owned();
let client = auth.authenticated_client().await?;

tracing::debug!(
"Loading latest offset for this partition to check if session is data-preview"
);
let collection = Collection::new(&client, collection_name.as_str())
let collection = Collection::new(&client, collection_name.as_str(), deletions)
.await?
.ok_or(anyhow::anyhow!("Collection {} not found", collection_name))?;

Expand Down
13 changes: 12 additions & 1 deletion crates/dekaf/src/topology.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::connector::DeletionMode;
use anyhow::Context;
use futures::{StreamExt, TryFutureExt, TryStreamExt};
use gazette::{broker, journal, uuid};
Expand Down Expand Up @@ -61,6 +62,7 @@ impl Collection {
pub async fn new(
client: &flow_client::Client,
collection: &str,
deletion_mode: DeletionMode,
) -> anyhow::Result<Option<Self>> {
let not_before = uuid::Clock::default();
let pg_client = client.pg_client();
Expand All @@ -87,7 +89,16 @@ impl Collection {
} else {
&spec.read_schema_json
};
let (key_schema, value_schema) = avro::json_schema_to_avro(json_schema, &key_ptr)?;

let json_schema = doc::validation::build_bundle(json_schema)?;
let validator = doc::Validator::new(json_schema)?;
let mut shape = doc::Shape::infer(&validator.schemas()[0], validator.schema_index());

if matches!(deletion_mode, DeletionMode::CDC) {
shape.widen(&serde_json::json!({"_meta":{"is_deleted":1}}));
}

let (key_schema, value_schema) = avro::shape_to_avro(shape, &key_ptr);

tracing::debug!(
collection,
Expand Down

0 comments on commit 34e8d07

Please sign in to comment.