From cc3b486263c30c8ddda836901df57d860ae4bdff Mon Sep 17 00:00:00 2001 From: Joseph Shearer Date: Fri, 25 Oct 2024 11:19:25 -0400 Subject: [PATCH 1/4] dekaf: Implement `DeletionMode` to allow representing deletions as a Kafka header instead of a tombstone --- crates/dekaf/src/connector.rs | 18 ++++++++++++ crates/dekaf/src/lib.rs | 12 ++++++-- crates/dekaf/src/read.rs | 54 ++++++++++++++++++++++++++--------- crates/dekaf/src/session.rs | 20 ++++++++----- 4 files changed, 80 insertions(+), 24 deletions(-) diff --git a/crates/dekaf/src/connector.rs b/crates/dekaf/src/connector.rs index 253bdbb2f9..4edcf44849 100644 --- a/crates/dekaf/src/connector.rs +++ b/crates/dekaf/src/connector.rs @@ -4,6 +4,19 @@ use schemars::JsonSchema; use serde::{Deserialize, Serialize}; use std::collections::BTreeMap; +#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, JsonSchema, Copy)] +#[serde(rename_all = "snake_case")] +pub enum DeletionMode { + Default, + Header, +} + +impl Default for DeletionMode { + fn default() -> Self { + Self::Default + } +} + /// Configures the behavior of a whole dekaf task #[derive(Serialize, Deserialize, Clone, Debug, PartialEq, JsonSchema)] pub struct DekafConfig { @@ -15,6 +28,11 @@ pub struct DekafConfig { // #[schemars(extend("secret" = true))] #[schemars(schema_with = "token_secret")] pub token: String, + /// How to handle deletion events. "Default" emits them as regular Kafka + /// tombstones with null values, and "Header" emits then as a kafka document + /// with empty string and `_is_deleted` header set to `1`. Setting this value + /// will also cause all other non-deletions to have an `_is_deleted` header of `0`. + pub deletions: DeletionMode, } /// Configures a particular binding in a Dekaf-type materialization diff --git a/crates/dekaf/src/lib.rs b/crates/dekaf/src/lib.rs index 3648532a93..069e7c4bc9 100644 --- a/crates/dekaf/src/lib.rs +++ b/crates/dekaf/src/lib.rs @@ -23,7 +23,7 @@ mod api_client; pub use api_client::KafkaApiClient; use aes_siv::{aead::Aead, Aes256SivAead, KeyInit, KeySizeUser}; -use connector::DekafConfig; +use connector::{DekafConfig, DeletionMode}; use flow_client::client::{refresh_authorizations, RefreshToken}; use percent_encoding::{percent_decode_str, utf8_percent_encode}; use serde::{Deserialize, Serialize}; @@ -42,10 +42,13 @@ pub struct App { pub client_base: flow_client::Client, } -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize, Copy)] +#[serde(deny_unknown_fields)] pub struct DeprecatedConfigOptions { #[serde(default = "bool::")] pub strict_topic_names: bool, + #[serde(default)] + pub deletions: DeletionMode, } pub struct Authenticated { @@ -103,7 +106,9 @@ impl App { let claims = flow_client::client::client_claims(&client)?; - if models::Materialization::regex().is_match(username.as_ref()) { + if models::Materialization::regex().is_match(username.as_ref()) + && !username.starts_with("{") + { Ok(Authenticated { client, access_token: access, @@ -119,6 +124,7 @@ impl App { client, task_config: DekafConfig { strict_topic_names: config.strict_topic_names, + deletions: config.deletions, token: "".to_string(), }, access_token: access, diff --git a/crates/dekaf/src/read.rs b/crates/dekaf/src/read.rs index 4b676bf44d..14f0a67891 100644 --- a/crates/dekaf/src/read.rs +++ b/crates/dekaf/src/read.rs @@ -1,11 +1,15 @@ use super::{Collection, Partition}; +use crate::connector::DeletionMode; use anyhow::bail; use bytes::{Buf, BufMut, BytesMut}; use doc::AsNode; use futures::StreamExt; use gazette::journal::{ReadJsonLine, ReadJsonLines}; use gazette::{broker, journal, uuid}; -use kafka_protocol::records::{Compression, TimestampType}; +use kafka_protocol::{ + protocol::StrBytes, + records::{Compression, TimestampType}, +}; use lz4_flex::frame::BlockMode; pub struct Read { @@ -31,6 +35,8 @@ pub struct Read { // Offset before which no documents should be emitted offset_start: i64, + deletes: DeletionMode, + pub(crate) rewrite_offsets_from: Option, } @@ -50,6 +56,9 @@ pub enum ReadTarget { } const OFFSET_READBACK: i64 = 2 << 25 + 1; // 64mb, single document max size +const DELETION_HEADER: &str = "_is_deleted"; +const DELETION_VAL_DELETED: &[u8] = &[1u8]; +const DELETION_VAL_NOT_DELETED: &[u8] = &[0u8]; impl Read { pub fn new( @@ -60,6 +69,7 @@ impl Read { key_schema_id: u32, value_schema_id: u32, rewrite_offsets_from: Option, + deletes: DeletionMode, ) -> Self { let (not_before_sec, _) = collection.not_before.to_unix(); @@ -94,6 +104,7 @@ impl Read { journal_name: partition.spec.name.clone(), rewrite_offsets_from, + deletes, offset_start: offset, } } @@ -257,18 +268,19 @@ impl Read { }; // Encode the value. - let value = if is_control || is_deletion { - None - } else { - tmp.push(0); - tmp.extend(self.value_schema_id.to_be_bytes()); - () = avro::encode(&mut tmp, &self.value_schema, root.get())?; - - record_bytes += tmp.len(); - buf.extend_from_slice(&tmp); - tmp.clear(); - Some(buf.split().freeze()) - }; + let value = + if is_control || (is_deletion && matches!(self.deletes, DeletionMode::Default)) { + None + } else { + tmp.push(0); + tmp.extend(self.value_schema_id.to_be_bytes()); + () = avro::encode(&mut tmp, &self.value_schema, root.get())?; + + record_bytes += tmp.len(); + buf.extend_from_slice(&tmp); + tmp.clear(); + Some(buf.split().freeze()) + }; self.offset = next_offset; @@ -293,7 +305,21 @@ impl Read { records.push(Record { control: is_control, - headers: Default::default(), + headers: if matches!(self.deletes, DeletionMode::Header) { + let deletion_val = if is_deletion { + DELETION_VAL_DELETED + } else { + DELETION_VAL_NOT_DELETED + }; + let mut headers = kafka_protocol::indexmap::IndexMap::new(); + headers.insert( + StrBytes::from_static_str(DELETION_HEADER), + Some(bytes::Bytes::from_static(&deletion_val)), + ); + headers + } else { + Default::default() + }, key, offset: kafka_offset, partition_leader_epoch: 1, diff --git a/crates/dekaf/src/session.rs b/crates/dekaf/src/session.rs index 9d6e816560..e83512726d 100644 --- a/crates/dekaf/src/session.rs +++ b/crates/dekaf/src/session.rs @@ -380,13 +380,17 @@ impl Session { .. } = request; - let mut client = self - .auth - .as_mut() - .ok_or(anyhow::anyhow!("Session not authenticated"))? - .authenticated_client() - .await? - .clone(); + let (mut client, config) = { + let auth = self + .auth + .as_mut() + .ok_or(anyhow::anyhow!("Session not authenticated"))?; + + ( + auth.authenticated_client().await?.clone(), + auth.task_config.to_owned(), + ) + }; let timeout = std::time::Duration::from_millis(max_wait_ms as u64); @@ -537,6 +541,7 @@ impl Session { key_schema_id, value_schema_id, Some(partition_request.fetch_offset - 1), + config.deletions, ) .next_batch( // Have to read at least 2 docs, as the very last doc @@ -564,6 +569,7 @@ impl Session { key_schema_id, value_schema_id, None, + config.deletions, ) .next_batch( crate::read::ReadTarget::Bytes( From cb3fd0327d16bd1769c02ad8bc26f235470a9b97 Mon Sep 17 00:00:00 2001 From: Joseph Shearer Date: Fri, 25 Oct 2024 14:32:17 -0400 Subject: [PATCH 2/4] dekaf: Also update to disable read-back behavior when rewriting offsets --- crates/dekaf/src/read.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/crates/dekaf/src/read.rs b/crates/dekaf/src/read.rs index 14f0a67891..21042ce174 100644 --- a/crates/dekaf/src/read.rs +++ b/crates/dekaf/src/read.rs @@ -76,7 +76,11 @@ impl Read { let stream = client.clone().read_json_lines( broker::ReadRequest { // Start reading at least 1 document in the past - offset: std::cmp::max(0, offset - OFFSET_READBACK), + offset: if rewrite_offsets_from.is_some() { + offset + } else { + std::cmp::max(0, offset - OFFSET_READBACK) + }, block: true, journal: partition.spec.name.clone(), begin_mod_time: not_before_sec as i64, From e49688a21d04482ba60e94eaf8dd2d280cf03f4f Mon Sep 17 00:00:00 2001 From: Joseph Shearer Date: Mon, 28 Oct 2024 11:41:38 -0400 Subject: [PATCH 3/4] dekaf: Bump `kafka-protocol` to `0.13.0` and upgrade the Fetch API to support flexible versions. I _believe_ this should fix the warning about `Message at offset xx might be too large to fetch, try increasing receive.message.max.bytes`. --- Cargo.lock | 5 +- Cargo.toml | 6 +- crates/dekaf/src/api_client.rs | 28 +++-- crates/dekaf/src/session.rs | 195 +++++++++++---------------------- 4 files changed, 86 insertions(+), 148 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index f39624b317..c42fb59e55 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3364,8 +3364,9 @@ dependencies = [ [[package]] name = "kafka-protocol" -version = "0.12.0" -source = "git+https://github.com/tychedelia/kafka-protocol-rs.git?rev=cabe835#cabe835a9cc87fe8ea64649ff599d96a61e1fb66" +version = "0.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d1edaf2fc3ecebe689bbc4fd97a6921cacd4cd09df8ebeda348a8e23c9fd48d4" dependencies = [ "anyhow", "bytes", diff --git a/Cargo.toml b/Cargo.toml index c1b96f0b35..9a37f9d18b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -59,7 +59,7 @@ flate2 = "1.0" futures = "0.3" futures-core = "0.3" futures-util = "0.3" -fxhash = "0.2" # Used in `json` crate. Replace with xxhash. +fxhash = "0.2" # Used in `json` crate. Replace with xxhash. hex = "0.4.3" hexdump = "0.1" humantime = "2.1" @@ -72,9 +72,7 @@ jemalloc-ctl = "0.3" json-patch = "0.3" jsonwebtoken = { version = "9", default-features = false } js-sys = "0.3.60" -# TODO(jshearer): Swap back after 0.13.0 is released, which includes -# https://github.com/tychedelia/kafka-protocol-rs/pull/81 -kafka-protocol = { git = "https://github.com/tychedelia/kafka-protocol-rs.git", rev = "cabe835" } +kafka-protocol = "0.13.0" lazy_static = "1.4" libc = "0.2" librocksdb-sys = { version = "0.16.0", default-features = false, features = [ diff --git a/crates/dekaf/src/api_client.rs b/crates/dekaf/src/api_client.rs index 17914fd12d..c8b13c37c5 100644 --- a/crates/dekaf/src/api_client.rs +++ b/crates/dekaf/src/api_client.rs @@ -513,7 +513,8 @@ impl KafkaApiClient { let controller = resp .brokers - .get(&resp.controller_id) + .iter() + .find(|broker| broker.node_id == resp.controller_id) .context("Failed to find controller")?; let controller_url = format!("tcp://{}:{}", controller.host.to_string(), controller.port); @@ -529,7 +530,8 @@ impl KafkaApiClient { let version = self .versions .api_keys - .get(&api_key) + .iter() + .find(|version| version.api_key == api_key) .context(format!("Unknown API key {api_key}"))?; Ok(version.to_owned()) @@ -553,18 +555,20 @@ impl KafkaApiClient { let resp = coord.send_request(req, None).await?; tracing::debug!(metadata=?resp, "Got metadata response"); - if resp - .topics - .iter() - .all(|(name, topic)| topic_names.contains(&name) && topic.error_code == 0) - { + if resp.topics.iter().all(|topic| { + topic + .name + .as_ref() + .map(|topic_name| topic_names.contains(topic_name) && topic.error_code == 0) + .unwrap_or(false) + }) { return Ok(()); } else { - let mut topics_map = kafka_protocol::indexmap::IndexMap::new(); + let mut topics_map = vec![]; for topic_name in topic_names.into_iter() { - topics_map.insert( - topic_name, + topics_map.push( messages::create_topics_request::CreatableTopic::default() + .with_name(topic_name) .with_replication_factor(2) .with_num_partitions(-1), ); @@ -573,11 +577,11 @@ impl KafkaApiClient { let create_resp = coord.send_request(create_req, None).await?; tracing::debug!(create_response=?create_resp, "Got create response"); - for (name, topic) in create_resp.topics { + for topic in create_resp.topics { if topic.error_code > 0 { let err = kafka_protocol::ResponseError::try_from_code(topic.error_code); tracing::warn!( - topic = name.to_string(), + topic = topic.name.to_string(), error = ?err, message = topic.error_message.map(|m|m.to_string()), "Failed to create topic" diff --git a/crates/dekaf/src/session.rs b/crates/dekaf/src/session.rs index e83512726d..960fc19ded 100644 --- a/crates/dekaf/src/session.rs +++ b/crates/dekaf/src/session.rs @@ -10,10 +10,11 @@ use anyhow::{bail, Context}; use bytes::{BufMut, Bytes, BytesMut}; use kafka_protocol::{ error::{ParseResponseErrorCode, ResponseError}, - indexmap::IndexMap, messages::{ self, - metadata_response::{MetadataResponsePartition, MetadataResponseTopic}, + metadata_response::{ + MetadataResponseBroker, MetadataResponsePartition, MetadataResponseTopic, + }, ConsumerProtocolAssignment, ConsumerProtocolSubscription, ListGroupsResponse, RequestHeader, TopicName, }, @@ -128,13 +129,10 @@ impl Session { }?; // We only ever advertise a single logical broker. - let mut brokers = kafka_protocol::indexmap::IndexMap::new(); - brokers.insert( - messages::BrokerId(1), - messages::metadata_response::MetadataResponseBroker::default() - .with_host(StrBytes::from_string(self.app.advertise_host.clone())) - .with_port(self.app.advertise_kafka_port as i32), - ); + let brokers = vec![MetadataResponseBroker::default() + .with_node_id(messages::BrokerId(1)) + .with_host(StrBytes::from_string(self.app.advertise_host.clone())) + .with_port(self.app.advertise_kafka_port as i32)]; Ok(messages::MetadataResponse::default() .with_brokers(brokers) @@ -144,9 +142,7 @@ impl Session { } // Lists all read-able collections as Kafka topics. Omits partition metadata. - async fn metadata_all_topics( - &mut self, - ) -> anyhow::Result> { + async fn metadata_all_topics(&mut self) -> anyhow::Result> { let collections = fetch_all_collection_names( &self .auth @@ -163,14 +159,12 @@ impl Session { let topics = collections .into_iter() .map(|name| { - ( - self.encode_topic_name(name), - MetadataResponseTopic::default() - .with_is_internal(false) - .with_partitions(vec![MetadataResponsePartition::default() - .with_partition_index(0) - .with_leader_id(0.into())]), - ) + MetadataResponseTopic::default() + .with_name(Some(self.encode_topic_name(name))) + .with_is_internal(false) + .with_partitions(vec![MetadataResponsePartition::default() + .with_partition_index(0) + .with_leader_id(0.into())]) }) .collect(); @@ -181,7 +175,7 @@ impl Session { async fn metadata_select_topics( &mut self, requests: Vec, - ) -> anyhow::Result> { + ) -> anyhow::Result> { let client = self .auth .as_mut() @@ -206,13 +200,13 @@ impl Session { .await .map_err(|e| anyhow::anyhow!("Timed out loading metadata {e}"))?; - let mut topics = IndexMap::new(); + let mut topics = vec![]; for (name, maybe_collection) in collections? { let Some(collection) = maybe_collection else { - topics.insert( - self.encode_topic_name(name.to_string()), + topics.push( MetadataResponseTopic::default() + .with_name(Some(self.encode_topic_name(name.to_string()))) .with_error_code(ResponseError::UnknownTopicOrPartition.code()), ); continue; @@ -231,9 +225,9 @@ impl Session { }) .collect(); - topics.insert( - name, + topics.push( MetadataResponseTopic::default() + .with_name(Some(name)) .with_is_internal(false) .with_partitions(partitions), ); @@ -738,21 +732,18 @@ impl Session { let responses = req .topic_data .into_iter() - .map(|(k, v)| { - ( - k, - TopicProduceResponse::default().with_partition_responses( - v.partition_data - .into_iter() - .map(|part| { - PartitionProduceResponse::default() - .with_index(part.index) - .with_error_code( - kafka_protocol::error::ResponseError::InvalidRequest.code(), - ) - }) - .collect(), - ), + .map(|v| { + TopicProduceResponse::default().with_partition_responses( + v.partition_data + .into_iter() + .map(|part| { + PartitionProduceResponse::default() + .with_index(part.index) + .with_error_code( + kafka_protocol::error::ResponseError::InvalidRequest.code(), + ) + }) + .collect(), ) }) .collect(); @@ -773,7 +764,7 @@ impl Session { .await?; let mut mutable_req = req.clone(); - for (_, protocol) in mutable_req.protocols.iter_mut() { + for protocol in mutable_req.protocols.iter_mut() { let mut consumer_protocol_subscription_raw = protocol.metadata.clone(); let consumer_protocol_subscription_version = consumer_protocol_subscription_raw @@ -950,7 +941,10 @@ impl Session { consumer_protocol_assignment_msg.assigned_partitions = consumer_protocol_assignment_msg .assigned_partitions .into_iter() - .map(|(name, item)| (self.encrypt_topic_name(name.to_owned().into()).into(), item)) + .map(|part| { + let transformed_topic = self.encrypt_topic_name(part.topic.to_owned()); + part.with_topic(transformed_topic) + }) .collect(); let mut new_protocol_assignment = BytesMut::new(); @@ -986,7 +980,10 @@ impl Session { consumer_protocol_assignment_msg.assigned_partitions = consumer_protocol_assignment_msg .assigned_partitions .into_iter() - .map(|(name, item)| (self.decrypt_topic_name(name.to_owned().into()).into(), item)) + .map(|part| { + let transformed_topic = self.decrypt_topic_name(part.topic.to_owned()); + part.with_topic(transformed_topic) + }) .collect(); let mut new_protocol_assignment = BytesMut::new(); @@ -1144,132 +1141,70 @@ impl Session { ) -> anyhow::Result { use kafka_protocol::messages::{api_versions_response::ApiVersion, *}; - fn version() -> ApiVersion { - let mut v = ApiVersion::default(); - v.max_version = T::VERSIONS.max; - v.min_version = T::VERSIONS.min; - v + fn version(api_key: ApiKey) -> ApiVersion { + ApiVersion::default() + .with_api_key(api_key as i16) + .with_max_version(T::VERSIONS.max) + .with_min_version(T::VERSIONS.min) } - let mut res = ApiVersionsResponse::default(); - - res.api_keys.insert( - ApiKey::ApiVersionsKey as i16, - version::(), - ); - res.api_keys.insert( - ApiKey::SaslHandshakeKey as i16, - version::(), - ); - res.api_keys.insert( - ApiKey::SaslAuthenticateKey as i16, - version::(), - ); - res.api_keys - .insert(ApiKey::MetadataKey as i16, version::()); - res.api_keys.insert( - ApiKey::FindCoordinatorKey as i16, + let res = ApiVersionsResponse::default().with_api_keys(vec![ + version::(ApiKey::ApiVersionsKey), + version::(ApiKey::SaslHandshakeKey), + version::(ApiKey::SaslAuthenticateKey), + version::(ApiKey::MetadataKey), ApiVersion::default() + .with_api_key(ApiKey::FindCoordinatorKey as i16) .with_min_version(0) .with_max_version(2), - ); - res.api_keys.insert( - ApiKey::ListOffsetsKey as i16, - version::(), - ); - res.api_keys.insert( - ApiKey::FetchKey as i16, + version::(ApiKey::ListOffsetsKey), ApiVersion::default() + .with_api_key(ApiKey::FetchKey as i16) // This is another non-obvious requirement in librdkafka. If we advertise <4 as a minimum here, some clients' // fetch requests will sit in a tight loop erroring over and over. This feels like a bug... but it's probably // just the consequence of convergent development, where some implicit requirement got encoded both in the client // and server without being explicitly documented anywhere. .with_min_version(4) - // I don't understand why, but some kafka clients don't seem to be able to send flexver fetch requests correctly - // For example, `kcat` sends an empty topic name when >= v12. I'm 99% sure there's more to this however. - .with_max_version(11), - ); - - // Needed by `kaf`. - res.api_keys.insert( - ApiKey::DescribeConfigsKey as i16, - version::(), - ); - - res.api_keys.insert( - ApiKey::ProduceKey as i16, + // Version >= 13 did away with topic names in favor of unique topic UUIDs, so we need to stick below that. + .with_max_version(12), + // Needed by `kaf`. + version::(ApiKey::DescribeConfigsKey), ApiVersion::default() + .with_api_key(ApiKey::ProduceKey as i16) .with_min_version(3) .with_max_version(9), - ); - - res.api_keys.insert( - ApiKey::JoinGroupKey as i16, self.app .kafka_client .supported_versions::()?, - ); - res.api_keys.insert( - ApiKey::LeaveGroupKey as i16, self.app .kafka_client .supported_versions::()?, - ); - res.api_keys.insert( - ApiKey::ListGroupsKey as i16, self.app .kafka_client .supported_versions::()?, - ); - res.api_keys.insert( - ApiKey::SyncGroupKey as i16, self.app .kafka_client .supported_versions::()?, - ); - res.api_keys.insert( - ApiKey::DeleteGroupsKey as i16, self.app .kafka_client .supported_versions::()?, - ); - res.api_keys.insert( - ApiKey::HeartbeatKey as i16, self.app .kafka_client .supported_versions::()?, - ); - - res.api_keys.insert( - ApiKey::OffsetCommitKey as i16, self.app .kafka_client .supported_versions::()?, - ); - res.api_keys.insert( - ApiKey::OffsetFetchKey as i16, ApiVersion::default() + .with_api_key(ApiKey::OffsetFetchKey as i16) .with_min_version(0) .with_max_version(7), - ); + ]); // UNIMPLEMENTED: /* - res.api_keys.insert( - ApiKey::LeaderAndIsrKey as i16, - version::(), - ); - res.api_keys.insert( - ApiKey::StopReplicaKey as i16, - version::(), - ); - res.api_keys.insert( - ApiKey::CreateTopicsKey as i16, - version::(), - ); - res.api_keys.insert( - ApiKey::DeleteTopicsKey as i16, - version::(), - ); + ApiKey::LeaderAndIsrKey, + ApiKey::StopReplicaKey, + ApiKey::CreateTopicsKey, + ApiKey::DeleteTopicsKey, */ Ok(res) From 3ce62f63a5d0eb2a1b2ce7953609275b89dec02d Mon Sep 17 00:00:00 2001 From: Joseph Shearer Date: Mon, 28 Oct 2024 12:58:54 -0400 Subject: [PATCH 4/4] dekaf: Remove readback, as it wasn't the solution to the lag issue, and it seems to be causing issues by allowing consumers to periodically read _just_ the last ack message --- crates/dekaf/src/read.rs | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/crates/dekaf/src/read.rs b/crates/dekaf/src/read.rs index 21042ce174..3508b0c8d7 100644 --- a/crates/dekaf/src/read.rs +++ b/crates/dekaf/src/read.rs @@ -55,7 +55,6 @@ pub enum ReadTarget { Docs(usize), } -const OFFSET_READBACK: i64 = 2 << 25 + 1; // 64mb, single document max size const DELETION_HEADER: &str = "_is_deleted"; const DELETION_VAL_DELETED: &[u8] = &[1u8]; const DELETION_VAL_NOT_DELETED: &[u8] = &[0u8]; @@ -75,12 +74,7 @@ impl Read { let stream = client.clone().read_json_lines( broker::ReadRequest { - // Start reading at least 1 document in the past - offset: if rewrite_offsets_from.is_some() { - offset - } else { - std::cmp::max(0, offset - OFFSET_READBACK) - }, + offset, block: true, journal: partition.spec.name.clone(), begin_mod_time: not_before_sec as i64, @@ -352,7 +346,8 @@ impl Read { last_write_head = self.last_write_head, ratio = buf.len() as f64 / (records_bytes + 1) as f64, records_bytes, - "returning records" + did_timeout, + "batch complete" ); metrics::counter!("dekaf_documents_read", "journal_name" => self.journal_name.to_owned())