Skip to content

Commit

Permalink
Merge pull request #179 from pushkarmoi/pugupta/comments
Browse files Browse the repository at this point in the history
raw kafka offset storage comments
  • Loading branch information
pushkarmoi authored May 1, 2024
2 parents e9ec408 + 0352e90 commit ab17a31
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 0 deletions.
3 changes: 3 additions & 0 deletions ikv/src/index/offset_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ pub struct OffsetStore {
file: File,
}

/// NOTE - it is okay to store raw kafka offsets
/// An offset is always valid (w.r.t being b/w low/high watermark) even
/// with time/size based retention in play (auto expiry by kafka).
impl OffsetStore {
pub fn open_or_create(mount_directory: String) -> io::Result<Self> {
let filename = format!("{}/kafka_offsets", mount_directory);
Expand Down
3 changes: 3 additions & 0 deletions ikv/src/kafka/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,9 @@ async fn initialize_stream_consumer(
seek_consumer(&consumer, topic, partition, rdkafka::Offset::Beginning)?;

// seek - using persisted offsets
// NOTE - it is okay to store raw kafka offsets
// An offset is always valid (w.r.t being b/w low/high watermark) even
// with time/size based retention in play (auto expiry by kafka).
let stored_topic_partition_list = offset_store.read_all_offsets()?;
for entry in stored_topic_partition_list.iter() {
if (&entry.topic == topic) && (entry.partition == partition) {
Expand Down

0 comments on commit ab17a31

Please sign in to comment.