From 0352e90c062cb3a46d177395324befe4dc5b885a Mon Sep 17 00:00:00 2001 From: Pushkar Gupta Date: Tue, 30 Apr 2024 17:41:07 -0700 Subject: [PATCH] comments --- ikv/src/index/offset_store.rs | 3 +++ ikv/src/kafka/consumer.rs | 3 +++ 2 files changed, 6 insertions(+) diff --git a/ikv/src/index/offset_store.rs b/ikv/src/index/offset_store.rs index 2fb0d26..2406412 100644 --- a/ikv/src/index/offset_store.rs +++ b/ikv/src/index/offset_store.rs @@ -20,6 +20,9 @@ pub struct OffsetStore { file: File, } +/// NOTE - it is okay to store raw kafka offsets +/// An offset is always valid (w.r.t being b/w low/high watermark) even +/// with time/size based retention in play (auto expiry by kafka). impl OffsetStore { pub fn open_or_create(mount_directory: String) -> io::Result { let filename = format!("{}/kafka_offsets", mount_directory); diff --git a/ikv/src/kafka/consumer.rs b/ikv/src/kafka/consumer.rs index 414418a..86b006b 100644 --- a/ikv/src/kafka/consumer.rs +++ b/ikv/src/kafka/consumer.rs @@ -273,6 +273,9 @@ async fn initialize_stream_consumer( seek_consumer(&consumer, topic, partition, rdkafka::Offset::Beginning)?; // seek - using persisted offsets + // NOTE - it is okay to store raw kafka offsets + // An offset is always valid (w.r.t being b/w low/high watermark) even + // with time/size based retention in play (auto expiry by kafka). let stored_topic_partition_list = offset_store.read_all_offsets()?; for entry in stored_topic_partition_list.iter() { if (&entry.topic == topic) && (entry.partition == partition) {