Skip to content

Commit

Permalink
Add with_skip_validation flag to IPC StreamReader, FileReader a…
Browse files Browse the repository at this point in the history
…nd `FileDecoder`
  • Loading branch information
alamb committed Feb 12, 2025
1 parent 0c206c6 commit 77d3de5
Show file tree
Hide file tree
Showing 3 changed files with 385 additions and 87 deletions.
161 changes: 123 additions & 38 deletions arrow-ipc/benches/ipc_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,25 +24,34 @@ use arrow_ipc::writer::{FileWriter, IpcWriteOptions, StreamWriter};
use arrow_ipc::{root_as_footer, Block, CompressionType};
use arrow_schema::{DataType, Field, Schema};
use criterion::{criterion_group, criterion_main, Criterion};
use std::io::Cursor;
use std::io::{Cursor, Write};
use std::sync::Arc;
use tempfile::tempdir;

fn criterion_benchmark(c: &mut Criterion) {
let mut group = c.benchmark_group("arrow_ipc_reader");

group.bench_function("StreamReader/read_10", |b| {
let batch = create_batch(8192, true);
let mut buffer = Vec::with_capacity(2 * 1024 * 1024);
let mut writer = StreamWriter::try_new(&mut buffer, batch.schema().as_ref()).unwrap();
for _ in 0..10 {
writer.write(&batch).unwrap();
}
writer.finish().unwrap();
let buffer = ipc_stream();
b.iter(move || {
let projection = None;
let mut reader = StreamReader::try_new(buffer.as_slice(), projection).unwrap();
for _ in 0..10 {
reader.next().unwrap().unwrap();
}
assert!(reader.next().is_none());
})
});

group.bench_function("StreamReader/no_validation/read_10", |b| {
let buffer = ipc_stream();
b.iter(move || {
let projection = None;
let mut reader = StreamReader::try_new(buffer.as_slice(), projection).unwrap();
unsafe {
// safety: we created a valid IPC file
reader = reader.with_skip_validation(true);
}
for _ in 0..10 {
reader.next().unwrap().unwrap();
}
Expand All @@ -51,69 +60,100 @@ fn criterion_benchmark(c: &mut Criterion) {
});

group.bench_function("StreamReader/read_10/zstd", |b| {
let batch = create_batch(8192, true);
let mut buffer = Vec::with_capacity(2 * 1024 * 1024);
let options = IpcWriteOptions::default()
.try_with_compression(Some(CompressionType::ZSTD))
.unwrap();
let mut writer =
StreamWriter::try_new_with_options(&mut buffer, batch.schema().as_ref(), options)
.unwrap();
for _ in 0..10 {
writer.write(&batch).unwrap();
}
writer.finish().unwrap();
let buffer = ipc_stream_zstd();
b.iter(move || {
let projection = None;
let mut reader = StreamReader::try_new(buffer.as_slice(), projection).unwrap();
for _ in 0..10 {
reader.next().unwrap().unwrap();
}
assert!(reader.next().is_none());
})
});

group.bench_function("StreamReader/no_validation/read_10/zstd", |b| {
let buffer = ipc_stream_zstd();
b.iter(move || {
let projection = None;
let mut reader = StreamReader::try_new(buffer.as_slice(), projection).unwrap();
unsafe {
// safety: we created a valid IPC file
reader = reader.with_skip_validation(true);
}
for _ in 0..10 {
reader.next().unwrap().unwrap();
}
assert!(reader.next().is_none());
})
});

// --- Create IPC File ---
group.bench_function("FileReader/read_10", |b| {
let batch = create_batch(8192, true);
let mut buffer = Vec::with_capacity(2 * 1024 * 1024);
let mut writer = FileWriter::try_new(&mut buffer, batch.schema().as_ref()).unwrap();
for _ in 0..10 {
writer.write(&batch).unwrap();
}
writer.finish().unwrap();
let buffer = ipc_file();
b.iter(move || {
let projection = None;
let cursor = Cursor::new(buffer.as_slice());
let mut reader = FileReader::try_new(cursor, projection).unwrap();
for _ in 0..10 {
reader.next().unwrap().unwrap();
}
assert!(reader.next().is_none());
})
});

group.bench_function("FileReader/no_validation/read_10", |b| {
let buffer = ipc_file();
b.iter(move || {
let projection = None;
let cursor = Cursor::new(buffer.as_slice());
let mut reader = FileReader::try_new(cursor, projection).unwrap();
unsafe {
// safety: we created a valid IPC file
reader = reader.with_skip_validation(true);
}
for _ in 0..10 {
reader.next().unwrap().unwrap();
}
assert!(reader.next().is_none());
})
});

// write to an actual file
let dir = tempdir().unwrap();
let path = dir.path().join("test.arrow");
let mut file = std::fs::File::create(&path).unwrap();
file.write_all(&ipc_file()).unwrap();
drop(file);

group.bench_function("FileReader/read_10/mmap", |b| {
let batch = create_batch(8192, true);
// write to an actual file
let dir = tempdir().unwrap();
let path = dir.path().join("test.arrow");
let file = std::fs::File::create(&path).unwrap();
let mut writer = FileWriter::try_new(file, batch.schema().as_ref()).unwrap();
for _ in 0..10 {
writer.write(&batch).unwrap();
}
writer.finish().unwrap();
let path = &path;
b.iter(move || {
let ipc_file = std::fs::File::open(path).expect("failed to open file");
let mmap = unsafe { memmap2::Mmap::map(&ipc_file).expect("failed to mmap file") };

// Convert the mmap region to an Arrow `Buffer` to back the arrow arrays.
let bytes = bytes::Bytes::from_owner(mmap);
let buffer = Buffer::from(bytes);
let decoder = IPCBufferDecoder::new(buffer);
assert_eq!(decoder.num_batches(), 10);

for i in 0..decoder.num_batches() {
decoder.get_batch(i);
}
})
});

group.bench_function("FileReader/no_validation/read_10/mmap", |b| {
let path = &path;
b.iter(move || {
let ipc_file = std::fs::File::open(&path).expect("failed to open file");
let ipc_file = std::fs::File::open(path).expect("failed to open file");
let mmap = unsafe { memmap2::Mmap::map(&ipc_file).expect("failed to mmap file") };

// Convert the mmap region to an Arrow `Buffer` to back the arrow arrays.
let bytes = bytes::Bytes::from_owner(mmap);
let buffer = Buffer::from(bytes);
let decoder = IPCBufferDecoder::new(buffer);
let decoder = unsafe { decoder.with_skip_validation(true) };
assert_eq!(decoder.num_batches(), 10);

for i in 0..decoder.num_batches() {
Expand All @@ -123,6 +163,46 @@ fn criterion_benchmark(c: &mut Criterion) {
});
}

/// Return an IPC stream with 10 record batches
fn ipc_stream() -> Vec<u8> {
let batch = create_batch(8192, true);
let mut buffer = Vec::with_capacity(2 * 1024 * 1024);
let mut writer = StreamWriter::try_new(&mut buffer, batch.schema().as_ref()).unwrap();
for _ in 0..10 {
writer.write(&batch).unwrap();
}
writer.finish().unwrap();
buffer
}

/// Return an IPC stream with ZSTD compression with 10 record batches
fn ipc_stream_zstd() -> Vec<u8> {
let batch = create_batch(8192, true);
let mut buffer = Vec::with_capacity(2 * 1024 * 1024);
let options = IpcWriteOptions::default()
.try_with_compression(Some(CompressionType::ZSTD))
.unwrap();
let mut writer =
StreamWriter::try_new_with_options(&mut buffer, batch.schema().as_ref(), options).unwrap();
for _ in 0..10 {
writer.write(&batch).unwrap();
}
writer.finish().unwrap();
buffer
}

/// Return an IPC file with 10 record batches
fn ipc_file() -> Vec<u8> {
let batch = create_batch(8192, true);
let mut buffer = Vec::with_capacity(2 * 1024 * 1024);
let mut writer = FileWriter::try_new(&mut buffer, batch.schema().as_ref()).unwrap();
for _ in 0..10 {
writer.write(&batch).unwrap();
}
writer.finish().unwrap();
buffer
}

// copied from the zero_copy_ipc example.
// should we move this to an actual API?
/// Wrapper around the example in the `FileDecoder` which handles the
Expand Down Expand Up @@ -166,6 +246,11 @@ impl IPCBufferDecoder {
}
}

unsafe fn with_skip_validation(mut self, skip_validation: bool) -> Self {
self.decoder = self.decoder.with_skip_validation(skip_validation);
self
}

fn num_batches(&self) -> usize {
self.batches.len()
}
Expand Down
Loading

0 comments on commit 77d3de5

Please sign in to comment.