Skip to content

Commit

Permalink
Fix polling when batch size is greater than messages_required_to_save
Browse files Browse the repository at this point in the history
The messages_required_to_save config was incorrectly used as batch size limit,
causing messages to be split into smaller batches than requested. This fixes
polling issues where consumers couldn't retrieve their expected batch sizes.
  • Loading branch information
hubcio committed Feb 4, 2025
1 parent e566b46 commit 51d0ae8
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 17 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 12 additions & 2 deletions bench/src/actors/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ impl Consumer {
if should_warn {
warn!(
"Consumer #{} → Messages are empty for offset: {}, received {} of {} batches, retrying... ({} warnings skipped)",
self.consumer_id, offset, self.batches_left_to_receive.load(Ordering::Acquire), total_batches_to_receive,
self.consumer_id, offset, total_batches_to_receive - self.batches_left_to_receive.load(Ordering::Acquire), total_batches_to_receive,
skipped_warnings_count
);
last_warning_time = Some(Instant::now());
Expand All @@ -244,14 +244,24 @@ impl Consumer {
}

if polled_messages.messages.len() != messages_per_batch as usize {
warn!(
let should_warn = last_warning_time
.map(|t| t.elapsed() >= Duration::from_secs(1))
.unwrap_or(true);

if should_warn {
warn!(
"Consumer #{} → expected {} messages, but got {} messages ({} batches remaining), retrying... ({} warnings skipped)",
self.consumer_id,
messages_per_batch,
polled_messages.messages.len(),
self.batches_left_to_receive.load(Ordering::Acquire),
skipped_warnings_count
);
last_warning_time = Some(Instant::now());
skipped_warnings_count = 0;
} else {
skipped_warnings_count += 1;
}

continue;
}
Expand Down
2 changes: 1 addition & 1 deletion server/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "server"
version = "0.4.150"
version = "0.4.151"
edition = "2021"
build = "src/build.rs"
license = "Apache-2.0"
Expand Down
44 changes: 31 additions & 13 deletions server/src/streaming/segments/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ impl Segment {
}

let end_offset = offset + (count - 1) as u64;

// In case that the partition messages buffer is disabled, we need to check the unsaved messages buffer
if self.unsaved_messages.is_none() {
return self.load_messages_from_disk(offset, end_offset).await;
Expand All @@ -48,23 +49,37 @@ impl Segment {
return self.load_messages_from_disk(offset, end_offset).await;
}

let first_offset = batch_accumulator.batch_base_offset();
if end_offset < first_offset {
return self.load_messages_from_disk(offset, end_offset).await;
}
let first_buffer_offset = batch_accumulator.batch_base_offset();
let last_buffer_offset = batch_accumulator.batch_max_offset();

let last_offset = batch_accumulator.batch_max_offset();
if offset >= first_offset && end_offset <= last_offset {
// Case 1: All messages are in messages_require_to_save buffer
if offset >= first_buffer_offset && end_offset <= last_buffer_offset {
return Ok(self.load_messages_from_unsaved_buffer(offset, end_offset));
}

// Can this be somehow improved? maybe with chain iterators
let mut messages = self.load_messages_from_disk(offset, end_offset).await.with_error_context(|_| format!(
"STREAMING_SEGMENT - failed to load messages from disk, stream ID: {}, topic ID: {}, partition ID: {}, start offset: {}, end offset :{}",
self.stream_id, self.topic_id, self.partition_id, offset, end_offset,
// Case 2: All messages are on disk
if end_offset < first_buffer_offset {
return self.load_messages_from_disk(offset, end_offset).await;
}

// Case 3: Messages span disk and messages_require_to_save buffer boundary
let mut messages = Vec::new();

// Load messages from disk up to the messages_require_to_save buffer boundary
if offset < first_buffer_offset {
let disk_messages = self
.load_messages_from_disk(offset, first_buffer_offset - 1)
.await.with_error_context(|e| format!(
"STREAMING_SEGMENT - failed to load messages from disk, stream ID: {}, topic ID: {}, partition ID: {}, start offset: {}, end offset :{}, error: {}",
self.stream_id, self.topic_id, self.partition_id, offset, first_buffer_offset - 1, e
))?;
let mut buffered_messages = self.load_messages_from_unsaved_buffer(offset, last_offset);
messages.append(&mut buffered_messages);
messages.extend(disk_messages);
}

// Load remaining messages from messages_require_to_save buffer
let buffer_start = std::cmp::max(offset, first_buffer_offset);
let buffer_messages = self.load_messages_from_unsaved_buffer(buffer_start, end_offset);
messages.extend(buffer_messages);

Ok(messages)
}
Expand Down Expand Up @@ -212,7 +227,10 @@ impl Segment {
self.partition_id,
));
}
let messages_cap = self.config.partition.messages_required_to_save as usize;
let messages_cap = std::cmp::max(
self.config.partition.messages_required_to_save as usize,
batch.len(),
);
let batch_base_offset = batch.first().unwrap().offset;
let batch_accumulator = self
.unsaved_messages
Expand Down

0 comments on commit 51d0ae8

Please sign in to comment.