Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add with_skip_validation flag to IPC StreamReader, FileReader and FileDecoder #7120

Merged
merged 10 commits into from
Feb 27, 2025
73 changes: 50 additions & 23 deletions arrow-data/src/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ use std::mem;
use std::ops::Range;
use std::sync::Arc;

use crate::data::private::UnsafeFlag;
use crate::{equal, validate_binary_view, validate_string_view};

#[inline]
Expand Down Expand Up @@ -1781,33 +1780,61 @@ impl PartialEq for ArrayData {
}
}

mod private {
/// A boolean flag that cannot be mutated outside of unsafe code.
/// A boolean flag that cannot be mutated outside of unsafe code.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I propose to make this UnsafeFlag public (and added examples and more docs) so I could use it across the two crates. However, I can also make a private copy of it in arrow-ipc if reviewers feel it would be better to avoid a new API

///
/// Defaults to a value of false.
///
/// This structure is used to enforce safety in the [`ArrayDataBuilder`]
///
/// [`ArrayDataBuilder`]: super::ArrayDataBuilder
///
/// # Example
/// ```rust
/// use arrow_data::UnsafeFlag;
/// assert!(!UnsafeFlag::default().get()); // default is false
/// let mut flag = UnsafeFlag::new();
/// assert!(!flag.get()); // defaults to false
/// // can only set it to true in unsafe code
/// unsafe { flag.set(true) };
/// assert!(flag.get()); // now true
/// ```
#[derive(Debug, Clone)]
pub struct UnsafeFlag(bool);

impl UnsafeFlag {
/// Creates a new `UnsafeFlag` with the value set to `false`.
///
/// See examples on [`Self::new`]
#[inline]
pub const fn new() -> Self {
Self(false)
}

/// Sets the value of the flag to the given value
///
/// Defaults to a value of false.
/// Note this can purposely only be done in `unsafe` code
///
/// This structure is used to enforce safety in the [`ArrayDataBuilder`]
/// # Safety
///
/// [`ArrayDataBuilder`]: super::ArrayDataBuilder
#[derive(Debug)]
pub struct UnsafeFlag(bool);

impl UnsafeFlag {
/// Creates a new `UnsafeFlag` with the value set to `false`
#[inline]
pub const fn new() -> Self {
Self(false)
}
/// If set, the flag will be set to the given value. There is nothing
/// immediately unsafe about doing so, however, the flag can be used to
/// subsequently bypass safety checks in the [`ArrayDataBuilder`].
#[inline]
pub unsafe fn set(&mut self, val: bool) {
self.0 = val;
}

#[inline]
pub unsafe fn set(&mut self, val: bool) {
self.0 = val;
}
/// Returns the value of the flag
#[inline]
pub fn get(&self) -> bool {
self.0
}
}

#[inline]
pub fn get(&self) -> bool {
self.0
}
// Manual impl to make it clear you can not construct unsafe with true
impl Default for UnsafeFlag {
fn default() -> Self {
Self::new()
}
}

Expand Down
161 changes: 123 additions & 38 deletions arrow-ipc/benches/ipc_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,25 +24,34 @@ use arrow_ipc::writer::{FileWriter, IpcWriteOptions, StreamWriter};
use arrow_ipc::{root_as_footer, Block, CompressionType};
use arrow_schema::{DataType, Field, Schema};
use criterion::{criterion_group, criterion_main, Criterion};
use std::io::Cursor;
use std::io::{Cursor, Write};
use std::sync::Arc;
use tempfile::tempdir;

fn criterion_benchmark(c: &mut Criterion) {
let mut group = c.benchmark_group("arrow_ipc_reader");

group.bench_function("StreamReader/read_10", |b| {
let batch = create_batch(8192, true);
let mut buffer = Vec::with_capacity(2 * 1024 * 1024);
let mut writer = StreamWriter::try_new(&mut buffer, batch.schema().as_ref()).unwrap();
for _ in 0..10 {
writer.write(&batch).unwrap();
}
writer.finish().unwrap();
let buffer = ipc_stream();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added new versions of each benchmark that work with disabled validation

b.iter(move || {
let projection = None;
let mut reader = StreamReader::try_new(buffer.as_slice(), projection).unwrap();
for _ in 0..10 {
reader.next().unwrap().unwrap();
}
assert!(reader.next().is_none());
})
});

group.bench_function("StreamReader/no_validation/read_10", |b| {
let buffer = ipc_stream();
b.iter(move || {
let projection = None;
let mut reader = StreamReader::try_new(buffer.as_slice(), projection).unwrap();
unsafe {
// safety: we created a valid IPC file
reader = reader.with_skip_validation(true);
}
for _ in 0..10 {
reader.next().unwrap().unwrap();
}
Expand All @@ -51,69 +60,100 @@ fn criterion_benchmark(c: &mut Criterion) {
});

group.bench_function("StreamReader/read_10/zstd", |b| {
let batch = create_batch(8192, true);
let mut buffer = Vec::with_capacity(2 * 1024 * 1024);
let options = IpcWriteOptions::default()
.try_with_compression(Some(CompressionType::ZSTD))
.unwrap();
let mut writer =
StreamWriter::try_new_with_options(&mut buffer, batch.schema().as_ref(), options)
.unwrap();
for _ in 0..10 {
writer.write(&batch).unwrap();
}
writer.finish().unwrap();
let buffer = ipc_stream_zstd();
b.iter(move || {
let projection = None;
let mut reader = StreamReader::try_new(buffer.as_slice(), projection).unwrap();
for _ in 0..10 {
reader.next().unwrap().unwrap();
}
assert!(reader.next().is_none());
})
});

group.bench_function("StreamReader/no_validation/read_10/zstd", |b| {
let buffer = ipc_stream_zstd();
b.iter(move || {
let projection = None;
let mut reader = StreamReader::try_new(buffer.as_slice(), projection).unwrap();
unsafe {
// safety: we created a valid IPC file
reader = reader.with_skip_validation(true);
}
for _ in 0..10 {
reader.next().unwrap().unwrap();
}
assert!(reader.next().is_none());
})
});

// --- Create IPC File ---
group.bench_function("FileReader/read_10", |b| {
let batch = create_batch(8192, true);
let mut buffer = Vec::with_capacity(2 * 1024 * 1024);
let mut writer = FileWriter::try_new(&mut buffer, batch.schema().as_ref()).unwrap();
for _ in 0..10 {
writer.write(&batch).unwrap();
}
writer.finish().unwrap();
let buffer = ipc_file();
b.iter(move || {
let projection = None;
let cursor = Cursor::new(buffer.as_slice());
let mut reader = FileReader::try_new(cursor, projection).unwrap();
for _ in 0..10 {
reader.next().unwrap().unwrap();
}
assert!(reader.next().is_none());
})
});

group.bench_function("FileReader/no_validation/read_10", |b| {
let buffer = ipc_file();
b.iter(move || {
let projection = None;
let cursor = Cursor::new(buffer.as_slice());
let mut reader = FileReader::try_new(cursor, projection).unwrap();
unsafe {
// safety: we created a valid IPC file
reader = reader.with_skip_validation(true);
}
for _ in 0..10 {
reader.next().unwrap().unwrap();
}
assert!(reader.next().is_none());
})
});

// write to an actual file
let dir = tempdir().unwrap();
let path = dir.path().join("test.arrow");
let mut file = std::fs::File::create(&path).unwrap();
file.write_all(&ipc_file()).unwrap();
drop(file);

group.bench_function("FileReader/read_10/mmap", |b| {
let batch = create_batch(8192, true);
// write to an actual file
let dir = tempdir().unwrap();
let path = dir.path().join("test.arrow");
let file = std::fs::File::create(&path).unwrap();
let mut writer = FileWriter::try_new(file, batch.schema().as_ref()).unwrap();
for _ in 0..10 {
writer.write(&batch).unwrap();
}
writer.finish().unwrap();
let path = &path;
b.iter(move || {
let ipc_file = std::fs::File::open(path).expect("failed to open file");
let mmap = unsafe { memmap2::Mmap::map(&ipc_file).expect("failed to mmap file") };

// Convert the mmap region to an Arrow `Buffer` to back the arrow arrays.
let bytes = bytes::Bytes::from_owner(mmap);
let buffer = Buffer::from(bytes);
let decoder = IPCBufferDecoder::new(buffer);
assert_eq!(decoder.num_batches(), 10);

for i in 0..decoder.num_batches() {
decoder.get_batch(i);
}
})
});

group.bench_function("FileReader/no_validation/read_10/mmap", |b| {
let path = &path;
b.iter(move || {
let ipc_file = std::fs::File::open(&path).expect("failed to open file");
let ipc_file = std::fs::File::open(path).expect("failed to open file");
let mmap = unsafe { memmap2::Mmap::map(&ipc_file).expect("failed to mmap file") };

// Convert the mmap region to an Arrow `Buffer` to back the arrow arrays.
let bytes = bytes::Bytes::from_owner(mmap);
let buffer = Buffer::from(bytes);
let decoder = IPCBufferDecoder::new(buffer);
let decoder = unsafe { decoder.with_skip_validation(true) };
assert_eq!(decoder.num_batches(), 10);

for i in 0..decoder.num_batches() {
Expand All @@ -123,6 +163,46 @@ fn criterion_benchmark(c: &mut Criterion) {
});
}

/// Return an IPC stream with 10 record batches
fn ipc_stream() -> Vec<u8> {
let batch = create_batch(8192, true);
let mut buffer = Vec::with_capacity(2 * 1024 * 1024);
let mut writer = StreamWriter::try_new(&mut buffer, batch.schema().as_ref()).unwrap();
for _ in 0..10 {
writer.write(&batch).unwrap();
}
writer.finish().unwrap();
buffer
}

/// Return an IPC stream with ZSTD compression with 10 record batches
fn ipc_stream_zstd() -> Vec<u8> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor nit, but I wonder if we could have an ipc_stream_options that takes a IpcWriteOptions and then have ipc_stream just call through to this

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this was a good suggestion -- I did it in 7d3f020 🧹

let batch = create_batch(8192, true);
let mut buffer = Vec::with_capacity(2 * 1024 * 1024);
let options = IpcWriteOptions::default()
.try_with_compression(Some(CompressionType::ZSTD))
.unwrap();
let mut writer =
StreamWriter::try_new_with_options(&mut buffer, batch.schema().as_ref(), options).unwrap();
for _ in 0..10 {
writer.write(&batch).unwrap();
}
writer.finish().unwrap();
buffer
}

/// Return an IPC file with 10 record batches
fn ipc_file() -> Vec<u8> {
let batch = create_batch(8192, true);
let mut buffer = Vec::with_capacity(2 * 1024 * 1024);
let mut writer = FileWriter::try_new(&mut buffer, batch.schema().as_ref()).unwrap();
for _ in 0..10 {
writer.write(&batch).unwrap();
}
writer.finish().unwrap();
buffer
}

// copied from the zero_copy_ipc example.
// should we move this to an actual API?
/// Wrapper around the example in the `FileDecoder` which handles the
Expand Down Expand Up @@ -166,6 +246,11 @@ impl IPCBufferDecoder {
}
}

unsafe fn with_skip_validation(mut self, skip_validation: bool) -> Self {
self.decoder = self.decoder.with_skip_validation(skip_validation);
self
}

fn num_batches(&self) -> usize {
self.batches.len()
}
Expand Down
Loading
Loading