Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rust: LinearReader: reclaim dead space when possible #1298

Merged
merged 5 commits into from
Dec 19, 2024
Merged
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 76 additions & 4 deletions rust/src/sans_io/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,14 @@ impl RwBuf {
// returns a mutable view of the un-written part of the buffer, resizing as needed to ensure
// N bytes are available to write into.
fn tail_with_size(&mut self, n: usize) -> &mut [u8] {
let unread_len = self.end - self.start;
// Compact the output buffer if there is sufficient free space and there is more free
// than used.
if self.start > 4096 && self.start > unread_len {
self.data.copy_within(self.start..self.end, 0);
self.start = 0;
self.end = unread_len;
}
let desired_end = self.end + n;
self.data.resize(desired_end, 0);
&mut self.data[self.end..]
Expand Down Expand Up @@ -318,10 +326,6 @@ impl LinearReader {
self.file_data.end += written;
}

if self.file_data.len() == 0 {
self.file_data.clear();
}

/// Macros for loading data into the reader. These return early with NeedMore(n) if
/// more data is needed.
///
Expand Down Expand Up @@ -1029,4 +1033,72 @@ mod tests {
);
Ok(())
}

// Ensures that the internal buffer for the linear reader gets compacted regularly and does not
// expand unbounded.
#[test]
fn test_buffer_compaction() -> McapResult<()> {
let mut buf = Vec::new();
{
let mut cursor = std::io::Cursor::new(buf);
let data = Vec::from_iter(std::iter::repeat(0x20u8).take(1024 * 1024 * 4));
let mut writer = crate::WriteOptions::new()
.compression(None)
.chunk_size(None)
.create(&mut cursor)?;
let channel = std::sync::Arc::new(crate::Channel {
topic: "chat".to_owned(),
schema: None,
message_encoding: "json".to_owned(),
metadata: BTreeMap::new(),
});
writer.add_channel(&channel)?;
for n in 0..3 {
writer.write(&crate::Message {
channel: channel.clone(),
sequence: n,
log_time: n as u64,
publish_time: n as u64,
data: std::borrow::Cow::Borrowed(&data[..]),
})?;
if n == 1 {
writer.flush()?;
}
}
writer.finish()?;
drop(writer);
buf = cursor.into_inner();
}
let mut reader = LinearReader::new();
let mut cursor = std::io::Cursor::new(buf);
let mut opcodes: Vec<u8> = Vec::new();
let mut iter_count = 0;
let mut max_needed: usize = 0;
while let Some(action) = reader.next_action() {
match action? {
ReadAction::NeedMore(n) => {
max_needed = std::cmp::max(max_needed, n);
// read slightly more than requested, such that the data in the buffer does not
// hit zero after the next action.
let written = cursor.read(reader.insert(n + 1))?;
reader.set_written(written);
let buffer_size = reader.file_data.data.len();
assert!(
buffer_size < std::cmp::max(max_needed * 2, 4096),
"max needed: {0}, buffer size: {1}",
max_needed,
buffer_size
);
}
ReadAction::GetRecord { data, opcode } => {
opcodes.push(opcode);
parse_record(opcode, data)?;
}
}
iter_count += 1;
// guard against infinite loop
assert!(iter_count < 10000);
}
Ok(())
}
}
Loading