Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dekaf: Bump kafka-protocol to 0.13.0 #1740

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 2 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ flate2 = "1.0"
futures = "0.3"
futures-core = "0.3"
futures-util = "0.3"
fxhash = "0.2" # Used in `json` crate. Replace with xxhash.
fxhash = "0.2" # Used in `json` crate. Replace with xxhash.
hex = "0.4.3"
hexdump = "0.1"
humantime = "2.1"
Expand All @@ -72,9 +72,7 @@ jemalloc-ctl = "0.3"
json-patch = "0.3"
jsonwebtoken = { version = "9", default-features = false }
js-sys = "0.3.60"
# TODO(jshearer): Swap back after 0.13.0 is released, which includes
# https://github.com/tychedelia/kafka-protocol-rs/pull/81
kafka-protocol = { git = "https://github.com/tychedelia/kafka-protocol-rs.git", rev = "cabe835" }
kafka-protocol = "0.13.0"
lazy_static = "1.4"
libc = "0.2"
librocksdb-sys = { version = "0.16.0", default-features = false, features = [
Expand Down
28 changes: 16 additions & 12 deletions crates/dekaf/src/api_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -513,7 +513,8 @@ impl KafkaApiClient {

let controller = resp
.brokers
.get(&resp.controller_id)
.iter()
.find(|broker| broker.node_id == resp.controller_id)
.context("Failed to find controller")?;

let controller_url = format!("tcp://{}:{}", controller.host.to_string(), controller.port);
Expand All @@ -529,7 +530,8 @@ impl KafkaApiClient {
let version = self
.versions
.api_keys
.get(&api_key)
.iter()
.find(|version| version.api_key == api_key)
.context(format!("Unknown API key {api_key}"))?;

Ok(version.to_owned())
Expand All @@ -553,18 +555,20 @@ impl KafkaApiClient {
let resp = coord.send_request(req, None).await?;
tracing::debug!(metadata=?resp, "Got metadata response");

if resp
.topics
.iter()
.all(|(name, topic)| topic_names.contains(&name) && topic.error_code == 0)
{
if resp.topics.iter().all(|topic| {
topic
.name
.as_ref()
.map(|topic_name| topic_names.contains(topic_name) && topic.error_code == 0)
.unwrap_or(false)
}) {
return Ok(());
} else {
let mut topics_map = kafka_protocol::indexmap::IndexMap::new();
let mut topics_map = vec![];
for topic_name in topic_names.into_iter() {
topics_map.insert(
topic_name,
topics_map.push(
messages::create_topics_request::CreatableTopic::default()
.with_name(topic_name)
.with_replication_factor(2)
.with_num_partitions(-1),
);
Expand All @@ -573,11 +577,11 @@ impl KafkaApiClient {
let create_resp = coord.send_request(create_req, None).await?;
tracing::debug!(create_response=?create_resp, "Got create response");

for (name, topic) in create_resp.topics {
for topic in create_resp.topics {
if topic.error_code > 0 {
let err = kafka_protocol::ResponseError::try_from_code(topic.error_code);
tracing::warn!(
topic = name.to_string(),
topic = topic.name.to_string(),
error = ?err,
message = topic.error_message.map(|m|m.to_string()),
"Failed to create topic"
Expand Down
18 changes: 18 additions & 0 deletions crates/dekaf/src/connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,19 @@ use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use std::collections::BTreeMap;

#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, JsonSchema, Copy)]
#[serde(rename_all = "snake_case")]
pub enum DeletionMode {
Default,
Header,
}

impl Default for DeletionMode {
fn default() -> Self {
Self::Default
}
}

/// Configures the behavior of a whole dekaf task
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, JsonSchema)]
pub struct DekafConfig {
Expand All @@ -15,6 +28,11 @@ pub struct DekafConfig {
// #[schemars(extend("secret" = true))]
#[schemars(schema_with = "token_secret")]
pub token: String,
/// How to handle deletion events. "Default" emits them as regular Kafka
/// tombstones with null values, and "Header" emits then as a kafka document
/// with empty string and `_is_deleted` header set to `1`. Setting this value
/// will also cause all other non-deletions to have an `_is_deleted` header of `0`.
pub deletions: DeletionMode,
}

/// Configures a particular binding in a Dekaf-type materialization
Expand Down
12 changes: 9 additions & 3 deletions crates/dekaf/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ mod api_client;
pub use api_client::KafkaApiClient;

use aes_siv::{aead::Aead, Aes256SivAead, KeyInit, KeySizeUser};
use connector::DekafConfig;
use connector::{DekafConfig, DeletionMode};
use flow_client::client::{refresh_authorizations, RefreshToken};
use percent_encoding::{percent_decode_str, utf8_percent_encode};
use serde::{Deserialize, Serialize};
Expand All @@ -42,10 +42,13 @@ pub struct App {
pub client_base: flow_client::Client,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize, Copy)]
#[serde(deny_unknown_fields)]
pub struct DeprecatedConfigOptions {
#[serde(default = "bool::<false>")]
pub strict_topic_names: bool,
#[serde(default)]
pub deletions: DeletionMode,
}

pub struct Authenticated {
Expand Down Expand Up @@ -103,7 +106,9 @@ impl App {

let claims = flow_client::client::client_claims(&client)?;

if models::Materialization::regex().is_match(username.as_ref()) {
if models::Materialization::regex().is_match(username.as_ref())
&& !username.starts_with("{")
{
Ok(Authenticated {
client,
access_token: access,
Expand All @@ -119,6 +124,7 @@ impl App {
client,
task_config: DekafConfig {
strict_topic_names: config.strict_topic_names,
deletions: config.deletions,
token: "".to_string(),
},
access_token: access,
Expand Down
61 changes: 43 additions & 18 deletions crates/dekaf/src/read.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
use super::{Collection, Partition};
use crate::connector::DeletionMode;
use anyhow::bail;
use bytes::{Buf, BufMut, BytesMut};
use doc::AsNode;
use futures::StreamExt;
use gazette::journal::{ReadJsonLine, ReadJsonLines};
use gazette::{broker, journal, uuid};
use kafka_protocol::records::{Compression, TimestampType};
use kafka_protocol::{
protocol::StrBytes,
records::{Compression, TimestampType},
};
use lz4_flex::frame::BlockMode;

pub struct Read {
Expand All @@ -31,6 +35,8 @@ pub struct Read {
// Offset before which no documents should be emitted
offset_start: i64,

deletes: DeletionMode,

pub(crate) rewrite_offsets_from: Option<i64>,
}

Expand All @@ -49,7 +55,9 @@ pub enum ReadTarget {
Docs(usize),
}

const OFFSET_READBACK: i64 = 2 << 25 + 1; // 64mb, single document max size
const DELETION_HEADER: &str = "_is_deleted";
const DELETION_VAL_DELETED: &[u8] = &[1u8];
const DELETION_VAL_NOT_DELETED: &[u8] = &[0u8];

impl Read {
pub fn new(
Expand All @@ -60,13 +68,13 @@ impl Read {
key_schema_id: u32,
value_schema_id: u32,
rewrite_offsets_from: Option<i64>,
deletes: DeletionMode,
) -> Self {
let (not_before_sec, _) = collection.not_before.to_unix();

let stream = client.clone().read_json_lines(
broker::ReadRequest {
// Start reading at least 1 document in the past
offset: std::cmp::max(0, offset - OFFSET_READBACK),
offset,
block: true,
journal: partition.spec.name.clone(),
begin_mod_time: not_before_sec as i64,
Expand Down Expand Up @@ -94,6 +102,7 @@ impl Read {

journal_name: partition.spec.name.clone(),
rewrite_offsets_from,
deletes,
offset_start: offset,
}
}
Expand Down Expand Up @@ -257,18 +266,19 @@ impl Read {
};

// Encode the value.
let value = if is_control || is_deletion {
None
} else {
tmp.push(0);
tmp.extend(self.value_schema_id.to_be_bytes());
() = avro::encode(&mut tmp, &self.value_schema, root.get())?;

record_bytes += tmp.len();
buf.extend_from_slice(&tmp);
tmp.clear();
Some(buf.split().freeze())
};
let value =
if is_control || (is_deletion && matches!(self.deletes, DeletionMode::Default)) {
None
} else {
tmp.push(0);
tmp.extend(self.value_schema_id.to_be_bytes());
() = avro::encode(&mut tmp, &self.value_schema, root.get())?;

record_bytes += tmp.len();
buf.extend_from_slice(&tmp);
tmp.clear();
Some(buf.split().freeze())
};

self.offset = next_offset;

Expand All @@ -293,7 +303,21 @@ impl Read {

records.push(Record {
control: is_control,
headers: Default::default(),
headers: if matches!(self.deletes, DeletionMode::Header) {
let deletion_val = if is_deletion {
DELETION_VAL_DELETED
} else {
DELETION_VAL_NOT_DELETED
};
let mut headers = kafka_protocol::indexmap::IndexMap::new();
headers.insert(
StrBytes::from_static_str(DELETION_HEADER),
Some(bytes::Bytes::from_static(&deletion_val)),
);
headers
} else {
Default::default()
},
key,
offset: kafka_offset,
partition_leader_epoch: 1,
Expand Down Expand Up @@ -322,7 +346,8 @@ impl Read {
last_write_head = self.last_write_head,
ratio = buf.len() as f64 / (records_bytes + 1) as f64,
records_bytes,
"returning records"
did_timeout,
"batch complete"
);

metrics::counter!("dekaf_documents_read", "journal_name" => self.journal_name.to_owned())
Expand Down
Loading
Loading