diff --git a/Cargo.lock b/Cargo.lock index d9a363ec5..4862c0a44 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4481,7 +4481,7 @@ dependencies = [ [[package]] name = "server" -version = "0.4.150" +version = "0.4.151" dependencies = [ "ahash 0.8.11", "anyhow", diff --git a/bench/src/actors/consumer.rs b/bench/src/actors/consumer.rs index 3fa5747dd..acd02a47d 100644 --- a/bench/src/actors/consumer.rs +++ b/bench/src/actors/consumer.rs @@ -232,7 +232,7 @@ impl Consumer { if should_warn { warn!( "Consumer #{} → Messages are empty for offset: {}, received {} of {} batches, retrying... ({} warnings skipped)", - self.consumer_id, offset, self.batches_left_to_receive.load(Ordering::Acquire), total_batches_to_receive, + self.consumer_id, offset, total_batches_to_receive - self.batches_left_to_receive.load(Ordering::Acquire), total_batches_to_receive, skipped_warnings_count ); last_warning_time = Some(Instant::now()); @@ -244,7 +244,12 @@ impl Consumer { } if polled_messages.messages.len() != messages_per_batch as usize { - warn!( + let should_warn = last_warning_time + .map(|t| t.elapsed() >= Duration::from_secs(1)) + .unwrap_or(true); + + if should_warn { + warn!( "Consumer #{} → expected {} messages, but got {} messages ({} batches remaining), retrying... ({} warnings skipped)", self.consumer_id, messages_per_batch, @@ -252,6 +257,11 @@ impl Consumer { self.batches_left_to_receive.load(Ordering::Acquire), skipped_warnings_count ); + last_warning_time = Some(Instant::now()); + skipped_warnings_count = 0; + } else { + skipped_warnings_count += 1; + } continue; } diff --git a/server/Cargo.toml b/server/Cargo.toml index 6a0120b94..36283a484 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "server" -version = "0.4.150" +version = "0.4.151" edition = "2021" build = "src/build.rs" license = "Apache-2.0" diff --git a/server/src/streaming/segments/messages.rs b/server/src/streaming/segments/messages.rs index d564efb15..299489f69 100644 --- a/server/src/streaming/segments/messages.rs +++ b/server/src/streaming/segments/messages.rs @@ -38,6 +38,7 @@ impl Segment { } let end_offset = offset + (count - 1) as u64; + // In case that the partition messages buffer is disabled, we need to check the unsaved messages buffer if self.unsaved_messages.is_none() { return self.load_messages_from_disk(offset, end_offset).await; @@ -48,23 +49,37 @@ impl Segment { return self.load_messages_from_disk(offset, end_offset).await; } - let first_offset = batch_accumulator.batch_base_offset(); - if end_offset < first_offset { - return self.load_messages_from_disk(offset, end_offset).await; - } + let first_buffer_offset = batch_accumulator.batch_base_offset(); + let last_buffer_offset = batch_accumulator.batch_max_offset(); - let last_offset = batch_accumulator.batch_max_offset(); - if offset >= first_offset && end_offset <= last_offset { + // Case 1: All messages are in messages_require_to_save buffer + if offset >= first_buffer_offset && end_offset <= last_buffer_offset { return Ok(self.load_messages_from_unsaved_buffer(offset, end_offset)); } - // Can this be somehow improved? maybe with chain iterators - let mut messages = self.load_messages_from_disk(offset, end_offset).await.with_error_context(|_| format!( - "STREAMING_SEGMENT - failed to load messages from disk, stream ID: {}, topic ID: {}, partition ID: {}, start offset: {}, end offset :{}", - self.stream_id, self.topic_id, self.partition_id, offset, end_offset, + // Case 2: All messages are on disk + if end_offset < first_buffer_offset { + return self.load_messages_from_disk(offset, end_offset).await; + } + + // Case 3: Messages span disk and messages_require_to_save buffer boundary + let mut messages = Vec::new(); + + // Load messages from disk up to the messages_require_to_save buffer boundary + if offset < first_buffer_offset { + let disk_messages = self + .load_messages_from_disk(offset, first_buffer_offset - 1) + .await.with_error_context(|e| format!( + "STREAMING_SEGMENT - failed to load messages from disk, stream ID: {}, topic ID: {}, partition ID: {}, start offset: {}, end offset :{}, error: {}", + self.stream_id, self.topic_id, self.partition_id, offset, first_buffer_offset - 1, e ))?; - let mut buffered_messages = self.load_messages_from_unsaved_buffer(offset, last_offset); - messages.append(&mut buffered_messages); + messages.extend(disk_messages); + } + + // Load remaining messages from messages_require_to_save buffer + let buffer_start = std::cmp::max(offset, first_buffer_offset); + let buffer_messages = self.load_messages_from_unsaved_buffer(buffer_start, end_offset); + messages.extend(buffer_messages); Ok(messages) } @@ -212,7 +227,10 @@ impl Segment { self.partition_id, )); } - let messages_cap = self.config.partition.messages_required_to_save as usize; + let messages_cap = std::cmp::max( + self.config.partition.messages_required_to_save as usize, + batch.len(), + ); let batch_base_offset = batch.first().unwrap().offset; let batch_accumulator = self .unsaved_messages