Skip to content

Commit

Permalink
handle eoc
Browse files Browse the repository at this point in the history
  • Loading branch information
james-rms committed Dec 23, 2024
1 parent ffa6fd5 commit efdd0b7
Showing 1 changed file with 40 additions and 18 deletions.
58 changes: 40 additions & 18 deletions rust/src/sans_io/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -362,11 +362,12 @@ impl LinearReader {
&mut self.decompressed_content,
$remaining,
) {
Ok(None) => {
Ok(DecompressResult::Complete) => Some(
&self.decompressed_content.data
[self.decompressed_content.start..self.decompressed_content.start + $n]
}
Ok(Some(n)) => return Some(Ok(ReadAction::NeedMore(n))),
[self.decompressed_content.start..self.decompressed_content.start + $n],
),
Ok(DecompressResult::NeedMore(n)) => return Some(Ok(ReadAction::NeedMore(n))),
Ok(DecompressResult::EOF) => None,
Err(err) => return Some(Err(err)),
}
}};
Expand Down Expand Up @@ -555,11 +556,16 @@ impl LinearReader {
self.currently_reading = ChunkRecord;
continue;
}
let _ = decompress!(
uncompressed_len,
&mut state.compressed_remaining,
decompressor
);
if matches!(
decompress!(
uncompressed_len,
&mut state.compressed_remaining,
decompressor
),
None
) {
return Some(Err(McapError::UnexpectedEoc));
}
}
}
}
Expand Down Expand Up @@ -597,14 +603,21 @@ impl LinearReader {
self.currently_reading = PaddingAfterChunk;
continue;
}
let opcode_len_buf =
decompress!(9, &mut state.compressed_remaining, decompressor);
let Some(opcode_len_buf) =
decompress!(9, &mut state.compressed_remaining, decompressor)
else {
continue;
};
let opcode = opcode_len_buf[0];
let len = check!(len_as_usize(u64::from_le_bytes(
opcode_len_buf[1..9].try_into().unwrap(),
)));
let _ =
decompress!(9 + len, &mut state.compressed_remaining, decompressor);
if matches!(
decompress!(9 + len, &mut state.compressed_remaining, decompressor),
None
) {
return Some(Err(McapError::UnexpectedEoc));
}
self.decompressed_content.mark_read(9);
let (start, end) = (
self.decompressed_content.start,
Expand Down Expand Up @@ -698,6 +711,12 @@ fn get_decompressor(
}
}

enum DecompressResult {
NeedMore(usize),
Complete,
EOF,
}

// decompresses up to `n` bytes from `from` into `to`. Repeatedly calls `decompress` until
// either the input is exhausted or enough data has been written. Returns None if all required
// data has been decompressed, or Some(need) if more bytes need to be read from the input.
Expand All @@ -707,23 +726,26 @@ fn decompress_inner(
src_buf: &mut RwBuf,
dest_buf: &mut RwBuf,
compressed_remaining: &mut u64,
) -> McapResult<Option<usize>> {
) -> McapResult<DecompressResult> {
if dest_buf.len() >= n {
return Ok(None);
return Ok(DecompressResult::Complete);
}
dest_buf.data.resize(dest_buf.start + n, 0);
loop {
let need = decompressor.next_read_size();
let have = src_buf.len();
if need > have {
return Ok(Some(need - have));
return Ok(DecompressResult::NeedMore(need - have));
}
let dst = &mut dest_buf.data[dest_buf.end..];
if dst.is_empty() {
return Ok(None);
return Ok(DecompressResult::Complete);
}
let src_len = std::cmp::min(have, clamp_to_usize(*compressed_remaining));
let src = &src_buf.data[src_buf.start..src_buf.start + src_len];
if src.is_empty() {
return Ok(DecompressResult::EOF);
}
let res = decompressor.decompress(src, dst)?;
src_buf.mark_read(res.consumed);
dest_buf.end += res.wrote;
Expand Down Expand Up @@ -1104,7 +1126,7 @@ mod tests {

#[test]
fn test_decompression_does_not_fail() {
let mut f = std::fs::File::open("tests/data/break_zstd_decompression.mcap")
let mut f = std::fs::File::open("tests/data/zstd_chunk_with_padding.mcap")
.expect("failed to open file");
let blocksize: usize = 1024;
let mut reader = LinearReader::new();
Expand Down

0 comments on commit efdd0b7

Please sign in to comment.