diff --git a/rust/Cargo.toml b/rust/Cargo.toml index 25c1ed61b..d43c19223 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -7,7 +7,7 @@ categories = [ "science::robotics", "compression" ] repository = "https://github.com/foxglove/mcap" documentation = "https://docs.rs/mcap" readme = "README.md" -version = "0.13.1" +version = "0.13.2" edition = "2021" license = "MIT" diff --git a/rust/src/sans_io/read.rs b/rust/src/sans_io/read.rs index cb5f04c09..5d38293f7 100644 --- a/rust/src/sans_io/read.rs +++ b/rust/src/sans_io/read.rs @@ -96,6 +96,14 @@ impl RwBuf { // returns a mutable view of the un-written part of the buffer, resizing as needed to ensure // N bytes are available to write into. fn tail_with_size(&mut self, n: usize) -> &mut [u8] { + let unread_len = self.end - self.start; + // Compact the output buffer if there is sufficient free space and there is more free + // than used. + if self.start > 4096 && self.start > unread_len { + self.data.copy_within(self.start..self.end, 0); + self.start = 0; + self.end = unread_len; + } let desired_end = self.end + n; self.data.resize(desired_end, 0); &mut self.data[self.end..] @@ -318,10 +326,6 @@ impl LinearReader { self.file_data.end += written; } - if self.file_data.len() == 0 { - self.file_data.clear(); - } - /// Macros for loading data into the reader. These return early with NeedMore(n) if /// more data is needed. /// @@ -1029,4 +1033,72 @@ mod tests { ); Ok(()) } + + // Ensures that the internal buffer for the linear reader gets compacted regularly and does not + // expand unbounded. + #[test] + fn test_buffer_compaction() -> McapResult<()> { + let mut buf = Vec::new(); + { + let mut cursor = std::io::Cursor::new(buf); + let data = Vec::from_iter(std::iter::repeat(0x20u8).take(1024 * 1024 * 4)); + let mut writer = crate::WriteOptions::new() + .compression(None) + .chunk_size(None) + .create(&mut cursor)?; + let channel = std::sync::Arc::new(crate::Channel { + topic: "chat".to_owned(), + schema: None, + message_encoding: "json".to_owned(), + metadata: BTreeMap::new(), + }); + writer.add_channel(&channel)?; + for n in 0..3 { + writer.write(&crate::Message { + channel: channel.clone(), + sequence: n, + log_time: n as u64, + publish_time: n as u64, + data: std::borrow::Cow::Borrowed(&data[..]), + })?; + if n == 1 { + writer.flush()?; + } + } + writer.finish()?; + drop(writer); + buf = cursor.into_inner(); + } + let mut reader = LinearReader::new(); + let mut cursor = std::io::Cursor::new(buf); + let mut opcodes: Vec = Vec::new(); + let mut iter_count = 0; + let mut max_needed: usize = 0; + while let Some(action) = reader.next_action() { + match action? { + ReadAction::NeedMore(n) => { + max_needed = std::cmp::max(max_needed, n); + // read slightly more than requested, such that the data in the buffer does not + // hit zero after the next action. + let written = cursor.read(reader.insert(n + 1))?; + reader.set_written(written); + let buffer_size = reader.file_data.data.len(); + assert!( + buffer_size < std::cmp::max(max_needed * 2, 4096), + "max needed: {0}, buffer size: {1}", + max_needed, + buffer_size + ); + } + ReadAction::GetRecord { data, opcode } => { + opcodes.push(opcode); + parse_record(opcode, data)?; + } + } + iter_count += 1; + // guard against infinite loop + assert!(iter_count < 10000); + } + Ok(()) + } }