Skip to content

Commit

Permalink
dekaf: Detect special-case of data preview UIs, and serve documents
Browse files Browse the repository at this point in the history
where normally no documents would get served
  • Loading branch information
jshearer committed Oct 16, 2024
1 parent e6a78da commit 0077e34
Show file tree
Hide file tree
Showing 5 changed files with 123 additions and 39 deletions.
1 change: 1 addition & 0 deletions crates/dekaf/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ async fn handle_api(
// https://github.com/confluentinc/librdkafka/blob/e03d3bb91ed92a38f38d9806b8d8deffe78a1de5/src/rdkafka_request.c#L2823
let (header, request) = dec_request(frame, version)?;
tracing::debug!(client_id=?header.client_id, "Got client ID!");
session.client_id = header.client_id.clone().map(|id| id.to_string());
Ok(enc_resp(out, &header, session.api_versions(request).await?))
}
ApiKey::SaslHandshakeKey => {
Expand Down
12 changes: 9 additions & 3 deletions crates/dekaf/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,16 @@ use futures::{FutureExt, TryStreamExt};
use rsasl::config::SASLConfig;
use rustls::pki_types::CertificateDer;
use std::{
collections::HashMap,
fs::File,
io,
path::{Path, PathBuf},
sync::Arc,
};
use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt};
use tokio::{
io::{AsyncRead, AsyncWrite, AsyncWriteExt},
sync::RwLock,
};
use tracing_subscriber::{filter::LevelFilter, EnvFilter};
use url::Url;

Expand Down Expand Up @@ -113,6 +117,8 @@ async fn main() -> anyhow::Result<()> {
let cli = Cli::parse();
tracing::info!("Starting dekaf");

let offset_map = Arc::new(RwLock::new(HashMap::new()));

let (api_endpoint, api_key) = if cli.local {
(LOCAL_PG_URL.to_owned(), LOCAL_PG_PUBLIC_TOKEN.to_string())
} else {
Expand Down Expand Up @@ -219,7 +225,7 @@ async fn main() -> anyhow::Result<()> {
continue
};

tokio::spawn(serve(Session::new(app.clone(), cli.encryption_secret.to_owned()), socket, addr, stop.clone()));
tokio::spawn(serve(Session::new(app.clone(), cli.encryption_secret.to_owned(), offset_map.clone()), socket, addr, stop.clone()));
}
_ = &mut stop => break,
}
Expand All @@ -240,7 +246,7 @@ async fn main() -> anyhow::Result<()> {
};
socket.set_nodelay(true)?;

tokio::spawn(serve(Session::new(app.clone(), cli.encryption_secret.to_owned()), socket, addr, stop.clone()));
tokio::spawn(serve(Session::new(app.clone(), cli.encryption_secret.to_owned(), offset_map.clone()), socket, addr, stop.clone()));
}
_ = &mut stop => break,
}
Expand Down
39 changes: 29 additions & 10 deletions crates/dekaf/src/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use doc::AsNode;
use futures::StreamExt;
use gazette::journal::{ReadJsonLine, ReadJsonLines};
use gazette::{broker, journal, uuid};
use kafka_protocol::records::Compression;
use kafka_protocol::records::{Compression, TimestampType};
use lz4_flex::frame::BlockMode;
use std::time::{Duration, Instant};

Expand Down Expand Up @@ -39,6 +39,12 @@ pub enum BatchResult {
TimeoutNoData,
}

#[derive(Copy, Clone)]
pub enum ReadTarget {
Bytes(usize),
Docs(usize),
}

impl Read {
pub fn new(
client: journal::Client,
Expand Down Expand Up @@ -85,11 +91,12 @@ impl Read {
#[tracing::instrument(skip_all,fields(journal_name=self.journal_name))]
pub async fn next_batch(
mut self,
target_bytes: usize,
target: ReadTarget,
timeout: Instant,
rewrite_offsets_from: Option<i64>,
) -> anyhow::Result<(Self, BatchResult)> {
use kafka_protocol::records::{
Compression, Record, RecordBatchEncoder, RecordEncodeOptions, TimestampType,
Compression, Record, RecordBatchEncoder, RecordEncodeOptions,
};

let mut records: Vec<Record> = Vec::new();
Expand All @@ -109,7 +116,10 @@ impl Read {

let mut did_timeout = false;

while records_bytes < target_bytes {
while match target {
ReadTarget::Bytes(target_bytes) => records_bytes < target_bytes,
ReadTarget::Docs(target_docs) => records.len() < target_docs,
} {
let read = match tokio::select! {
biased; // Attempt to read before yielding.

Expand Down Expand Up @@ -169,6 +179,8 @@ impl Read {
ReadJsonLine::Doc { root, next_offset } => (root, next_offset),
};

let mut record_bytes: usize = 0;

let Some(doc::ArchivedNode::String(uuid)) = self.uuid_ptr.query(root.get()) else {
let serialized_doc = root.get().to_debug_json_value();
anyhow::bail!(
Expand Down Expand Up @@ -215,14 +227,14 @@ impl Read {
buf.put_i16(9999);
// ControlMessageType: unused: i16
buf.put_i16(9999);
records_bytes += 4;
record_bytes += 4;
Some(buf.split().freeze())
} else {
tmp.push(0);
tmp.extend(self.key_schema_id.to_be_bytes());
() = avro::encode_key(&mut tmp, &self.key_schema, root.get(), &self.key_ptr)?;

records_bytes += tmp.len();
record_bytes += tmp.len();
buf.extend_from_slice(&tmp);
tmp.clear();
Some(buf.split().freeze())
Expand All @@ -236,7 +248,7 @@ impl Read {
tmp.extend(self.value_schema_id.to_be_bytes());
() = avro::encode(&mut tmp, &self.value_schema, root.get())?;

records_bytes += tmp.len();
record_bytes += tmp.len();
buf.extend_from_slice(&tmp);
tmp.clear();
Some(buf.split().freeze())
Expand All @@ -257,7 +269,11 @@ impl Read {
//
// Note that sequence must increment at the same rate
// as offset for efficient record batch packing.
let kafka_offset = next_offset - 1;
let kafka_offset = if let Some(rewrite_from) = rewrite_offsets_from {
rewrite_from + records.len() as i64
} else {
next_offset - 1
};

records.push(Record {
control: is_control,
Expand All @@ -273,6 +289,7 @@ impl Read {
transactional: false,
value,
});
records_bytes += record_bytes;
}

let opts = RecordEncodeOptions {
Expand All @@ -297,12 +314,14 @@ impl Read {
metrics::counter!("dekaf_bytes_read", "journal_name" => self.journal_name.to_owned())
.increment(records_bytes as u64);

let frozen = buf.freeze();

Ok((
self,
match (records.len() > 0, did_timeout) {
(false, true) => BatchResult::TimeoutNoData,
(true, true) => BatchResult::TimeoutExceededBeforeTarget(buf.freeze()),
(true, false) => BatchResult::TargetExceededBeforeTimeout(buf.freeze()),
(true, true) => BatchResult::TimeoutExceededBeforeTarget(frozen),
(true, false) => BatchResult::TargetExceededBeforeTimeout(frozen),
(false, false) => {
unreachable!("shouldn't be able see no documents, and also not timeout")
}
Expand Down
98 changes: 78 additions & 20 deletions crates/dekaf/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use std::{
time::{SystemTime, UNIX_EPOCH},
};
use std::{sync::Arc, time::Duration};
use tokio::sync::RwLock;
use tracing::instrument;

struct PendingRead {
Expand All @@ -35,15 +36,23 @@ pub struct Session {
reads: HashMap<(TopicName, i32), PendingRead>,
secret: String,
auth: Option<Authenticated>,
partition_offsets: Arc<RwLock<HashMap<(TopicName, i32), (i64, i64)>>>,
pub client_id: Option<String>,
}

impl Session {
pub fn new(app: Arc<App>, secret: String) -> Self {
pub fn new(
app: Arc<App>,
secret: String,
offsets: Arc<RwLock<HashMap<(TopicName, i32), (i64, i64)>>>,
) -> Self {
Self {
app,
reads: HashMap::new(),
auth: None,
secret,
partition_offsets: offsets,
client_id: None,
}
}

Expand Down Expand Up @@ -265,7 +274,7 @@ impl Session {

// Concurrently fetch Collection instances and offsets for all requested topics and partitions.
// Map each "topic" into Vec<(Partition Index, Option<(Journal Offset, Timestamp))>.
let collections: anyhow::Result<Vec<(TopicName, Vec<(i32, Option<(i64, i64)>)>)>> =
let collections: anyhow::Result<Vec<(TopicName, Vec<(i32, Option<(i64, i64, i64)>)>)>> =
futures::future::try_join_all(request.topics.into_iter().map(|topic| async move {
let maybe_collection = Collection::new(
client,
Expand Down Expand Up @@ -311,19 +320,26 @@ impl Session {
ListOffsetsPartitionResponse, ListOffsetsTopicResponse,
};

let mut new_offsets = HashMap::new();

// Map topics, partition indices, and fetched offsets into a comprehensive response.
let response = collections
.into_iter()
.map(|(topic_name, offsets)| {
let partitions = offsets
.into_iter()
.map(|(partition_index, maybe_offset)| {
let Some((offset, timestamp)) = maybe_offset else {
let Some((offset, fragment_start, timestamp)) = maybe_offset else {
return ListOffsetsPartitionResponse::default()
.with_partition_index(partition_index)
.with_error_code(ResponseError::UnknownTopicOrPartition.code());
};

new_offsets.insert(
(topic_name.to_owned(), partition_index),
(offset, fragment_start),
);

ListOffsetsPartitionResponse::default()
.with_partition_index(partition_index)
.with_offset(offset)
Expand All @@ -337,6 +353,8 @@ impl Session {
})
.collect();

self.partition_offsets.write().await.extend(new_offsets);

Ok(messages::ListOffsetsResponse::default().with_topics(response))
}

Expand All @@ -363,8 +381,7 @@ impl Session {
.authenticated_client()
.await?;

let timeout_at =
std::time::Instant::now() + std::time::Duration::from_millis(max_wait_ms as u64);
let timeout = std::time::Duration::from_millis(max_wait_ms as u64);

let mut hit_timeout = false;

Expand All @@ -373,6 +390,19 @@ impl Session {
let mut key = (from_downstream_topic_name(topic_request.topic.clone()), 0);

for partition_request in &topic_request.partitions {
let read_guard = self.partition_offsets.read().await;
let fetched_offset = read_guard
.get(&(key.0.to_owned(), partition_request.partition))
.copied();

drop(read_guard);

let diff = if let Some((offset, fragment_start)) = fetched_offset {
Some((offset - partition_request.fetch_offset, fragment_start))
} else {
None
};

key.1 = partition_request.partition;

let fetch_offset = partition_request.fetch_offset;
Expand All @@ -394,21 +424,44 @@ impl Session {
let (key_schema_id, value_schema_id) = collection
.registered_schema_ids(&client.pg_client())
.await?;

let read: Read = Read::new(
collection.journal_client.clone(),
&collection,
partition,
fetch_offset,
key_schema_id,
value_schema_id,
);
let pending = PendingRead {
offset: fetch_offset,
last_write_head: fetch_offset,
handle: tokio_util::task::AbortOnDropHandle::new(tokio::spawn(
read.next_batch(partition_request.partition_max_bytes as usize, timeout_at),
)),
handle: tokio_util::task::AbortOnDropHandle::new(match diff {
// Startree: 0, Tinybird: 12
Some((diff, fragment_start)) if diff <= 12 => tokio::spawn(
Read::new(
collection.journal_client.clone(),
&collection,
partition,
fragment_start,
key_schema_id,
value_schema_id,
)
.next_batch(
crate::read::ReadTarget::Docs(diff as usize + 1),
std::time::Instant::now() + timeout,
Some(partition_request.fetch_offset - 1),
),
),
_ => tokio::spawn(
Read::new(
collection.journal_client.clone(),
&collection,
partition,
fetch_offset,
key_schema_id,
value_schema_id,
)
.next_batch(
crate::read::ReadTarget::Bytes(
partition_request.partition_max_bytes as usize,
),
std::time::Instant::now() + timeout,
None,
),
),
}),
};

tracing::info!(
Expand Down Expand Up @@ -453,9 +506,14 @@ impl Session {
let (read, batch) = (&mut pending.handle).await??;
pending.offset = read.offset;
pending.last_write_head = read.last_write_head;
pending.handle = tokio_util::task::AbortOnDropHandle::new(tokio::spawn(
read.next_batch(partition_request.partition_max_bytes as usize, timeout_at),
));
pending.handle =
tokio_util::task::AbortOnDropHandle::new(tokio::spawn(read.next_batch(
crate::read::ReadTarget::Bytes(
partition_request.partition_max_bytes as usize,
),
std::time::Instant::now() + timeout,
None,
)));

let (timeout, batch) = match batch {
BatchResult::TargetExceededBeforeTimeout(b) => (false, Some(b)),
Expand Down
12 changes: 6 additions & 6 deletions crates/dekaf/src/topology.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ impl Collection {
&self,
partition_index: usize,
timestamp_millis: i64,
) -> anyhow::Result<Option<(i64, i64)>> {
) -> anyhow::Result<Option<(i64, i64, i64)>> {
let Some(partition) = self.partitions.get(partition_index) else {
return Ok(None);
};
Expand All @@ -212,18 +212,18 @@ impl Collection {
};
let response = self.journal_client.list_fragments(request).await?;

let (offset, mod_time) = match response.fragments.get(0) {
let (offset, fragment_start, mod_time) = match response.fragments.get(0) {
Some(broker::fragments_response::Fragment {
spec: Some(spec), ..
}) => {
if timestamp_millis == -1 {
// Subtract one to reflect the largest fetch-able offset of the fragment.
(spec.end - 1, spec.mod_time)
(spec.end - 1, spec.begin, spec.mod_time)
} else {
(spec.begin, spec.mod_time)
(spec.begin, spec.begin, spec.mod_time)
}
}
_ => (0, 0),
_ => (0, 0, 0),
};

tracing::debug!(
Expand All @@ -235,7 +235,7 @@ impl Collection {
"fetched offset"
);

Ok(Some((offset, mod_time)))
Ok(Some((offset, fragment_start, mod_time)))
}

/// Build a journal client by resolving the collections data-plane gateway and an access token.
Expand Down

0 comments on commit 0077e34

Please sign in to comment.