Skip to content

Commit

Permalink
fix up
Browse files Browse the repository at this point in the history
gatesn committed Jan 15, 2025
2 parents 27f8b13 + b20b2a7 commit 253ccf6
Showing 47 changed files with 1,077 additions and 648 deletions.
25 changes: 16 additions & 9 deletions bench-vortex/benches/random_access.rs
Original file line number Diff line number Diff line change
@@ -11,12 +11,11 @@ use object_store::aws::AmazonS3Builder;
use object_store::local::LocalFileSystem;
use object_store::ObjectStore;
use tokio::runtime::Runtime;
use vortex::buffer::buffer;

#[global_allocator]
static GLOBAL: MiMalloc = MiMalloc;

const INDICES: [u64; 6] = [10, 11, 12, 13, 100_000, 3_000_000];

/// Benchmarks against object stores require setting
/// * AWS_ACCESS_KEY_ID
/// * AWS_SECRET_ACCESS_KEY
@@ -25,20 +24,27 @@ const INDICES: [u64; 6] = [10, 11, 12, 13, 100_000, 3_000_000];
///
/// environment variables and assume files to read are already present
fn random_access_vortex(c: &mut Criterion) {
let indices = buffer![10, 11, 12, 13, 100_000, 3_000_000];

let mut group = c.benchmark_group("random-access");

let taxi_vortex = taxi_data_vortex();
group.bench_function("vortex-tokio-local-disk", |b| {
b.to_async(Runtime::new().unwrap())
.iter(|| async { black_box(take_vortex_tokio(&taxi_vortex, &INDICES).await.unwrap()) })
b.to_async(Runtime::new().unwrap()).iter(|| async {
black_box(
take_vortex_tokio(&taxi_vortex, indices.clone())
.await
.unwrap(),
)
})
});

group.bench_function("vortex-local-fs", |b| {
let local_fs = Arc::new(LocalFileSystem::new()) as Arc<dyn ObjectStore>;
let local_fs_path = object_store::path::Path::from_filesystem_path(&taxi_vortex).unwrap();
b.to_async(Runtime::new().unwrap()).iter(|| async {
black_box(
take_vortex_object_store(local_fs.clone(), local_fs_path.clone(), &INDICES)
take_vortex_object_store(local_fs.clone(), local_fs_path.clone(), indices.clone())
.await
.unwrap(),
)
@@ -50,8 +56,9 @@ fn random_access_vortex(c: &mut Criterion) {

let taxi_parquet = taxi_data_parquet();
group.bench_function("parquet-tokio-local-disk", |b| {
b.to_async(Runtime::new().unwrap())
.iter(|| async { black_box(take_parquet(&taxi_parquet, &INDICES).await.unwrap()) })
b.to_async(Runtime::new().unwrap()).iter(|| async {
black_box(take_parquet(&taxi_parquet, indices.clone()).await.unwrap())
})
});

if env::var("AWS_ACCESS_KEY_ID").is_ok() {
@@ -65,7 +72,7 @@ fn random_access_vortex(c: &mut Criterion) {

b.to_async(Runtime::new().unwrap()).iter(|| async {
black_box(
take_vortex_object_store(r2_fs.clone(), r2_path.clone(), &INDICES)
take_vortex_object_store(r2_fs.clone(), r2_path.clone(), indices.clone())
.await
.unwrap(),
)
@@ -81,7 +88,7 @@ fn random_access_vortex(c: &mut Criterion) {

b.to_async(Runtime::new().unwrap()).iter(|| async {
black_box(
take_parquet_object_store(r2_fs.clone(), &r2_parquet_path, &INDICES)
take_parquet_object_store(r2_fs.clone(), &r2_parquet_path, indices.clone())
.await
.unwrap(),
)
14 changes: 6 additions & 8 deletions bench-vortex/src/clickbench.rs
Original file line number Diff line number Diff line change
@@ -7,9 +7,7 @@ use datafusion::datasource::listing::{
ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl,
};
use datafusion::prelude::{ParquetReadOptions, SessionContext};
use futures::executor::block_on;
use itertools::Itertools;
use rayon::prelude::*;
use futures::{stream, StreamExt, TryStreamExt};
use tokio::fs::{create_dir_all, OpenOptions};
use vortex::aliases::hash_map::HashMap;
use vortex::array::{ChunkedArray, StructArray};
@@ -152,9 +150,7 @@ pub async fn register_vortex_files(
let vortex_dir = input_path.join("vortex");
create_dir_all(&vortex_dir).await?;

(0..100)
.collect_vec()
.par_iter()
stream::iter(0..100)
.map(|idx| {
let parquet_file_path = input_path
.join("parquet")
@@ -163,7 +159,7 @@ pub async fn register_vortex_files(
let session = session.clone();
let schema = schema.clone();

block_on(async move {
tokio::spawn(async move {
let output_path = output_path.clone();
idempotent_async(&output_path, move |vtx_file| async move {
eprintln!("Processing file {idx}");
@@ -234,7 +230,9 @@ pub async fn register_vortex_files(
.expect("Failed to write Vortex file")
})
})
.collect::<Vec<_>>();
.buffered(16)
.try_collect::<Vec<_>>()
.await?;

let format = Arc::new(VortexFormat::new(CTX.clone()));
let table_path = vortex_dir
24 changes: 11 additions & 13 deletions bench-vortex/src/reader.rs
Original file line number Diff line number Diff line change
@@ -24,13 +24,13 @@ use stream::StreamExt;
use vortex::aliases::hash_map::HashMap;
use vortex::array::ChunkedArray;
use vortex::arrow::FromArrowType;
use vortex::buffer::Buffer;
use vortex::compress::CompressionStrategy;
use vortex::dtype::DType;
use vortex::error::VortexResult;
use vortex::file::v2::{VortexOpenOptions, VortexWriteOptions};
use vortex::file::v2::{Scan, VortexOpenOptions, VortexWriteOptions};
use vortex::io::{ObjectStoreReadAt, TokioFile, VortexReadAt, VortexWrite};
use vortex::sampling_compressor::{SamplingCompressor, ALL_ENCODINGS_CONTEXT};
use vortex::scan::Scan;
use vortex::stream::ArrayStreamExt;
use vortex::{ArrayData, IntoArrayData, IntoCanonical};

@@ -106,14 +106,12 @@ pub fn write_csv_as_parquet(csv_path: PathBuf, output_path: &Path) -> VortexResu

async fn take_vortex<T: VortexReadAt + Unpin + 'static>(
reader: T,
_indices: &[u64],
indices: Buffer<u64>,
) -> VortexResult<ArrayData> {
VortexOpenOptions::new(ALL_ENCODINGS_CONTEXT.clone())
.open(reader)
.await?
// FIXME(ngates): support row indices
// .scan_rows(Scan::all(), indices.iter().copied())?
.scan(Scan::all())?
.take(indices, Scan::all())?
.into_array_data()
.await?
// For equivalence.... we decompress to make sure we're not cheating too much.
@@ -124,33 +122,33 @@ async fn take_vortex<T: VortexReadAt + Unpin + 'static>(
pub async fn take_vortex_object_store(
fs: Arc<dyn ObjectStore>,
path: object_store::path::Path,
indices: &[u64],
indices: Buffer<u64>,
) -> VortexResult<ArrayData> {
take_vortex(ObjectStoreReadAt::new(fs.clone(), path), indices).await
}

pub async fn take_vortex_tokio(path: &Path, indices: &[u64]) -> VortexResult<ArrayData> {
pub async fn take_vortex_tokio(path: &Path, indices: Buffer<u64>) -> VortexResult<ArrayData> {
take_vortex(TokioFile::open(path)?, indices).await
}

pub async fn take_parquet_object_store(
fs: Arc<dyn ObjectStore>,
path: &object_store::path::Path,
indices: &[u64],
indices: Buffer<u64>,
) -> VortexResult<RecordBatch> {
let meta = fs.head(path).await?;
let reader = ParquetObjectReader::new(fs, meta);
parquet_take_from_stream(reader, indices).await
}

pub async fn take_parquet(path: &Path, indices: &[u64]) -> VortexResult<RecordBatch> {
pub async fn take_parquet(path: &Path, indices: Buffer<u64>) -> VortexResult<RecordBatch> {
let file = tokio::fs::File::open(path).await?;
parquet_take_from_stream(file, indices).await
}

async fn parquet_take_from_stream<T: AsyncFileReader + Unpin + Send + 'static>(
async_reader: T,
indices: &[u64],
indices: Buffer<u64>,
) -> VortexResult<RecordBatch> {
let builder = ParquetRecordBatchStreamBuilder::new_with_options(
async_reader,
@@ -175,12 +173,12 @@ async fn parquet_take_from_stream<T: AsyncFileReader + Unpin + Send + 'static>(

for idx in indices {
let row_group_idx = row_group_offsets
.binary_search(&(*idx as i64))
.binary_search(&(idx as i64))
.unwrap_or_else(|e| e - 1);
row_groups
.entry(row_group_idx)
.or_insert_with(Vec::new)
.push((*idx as i64) - row_group_offsets[row_group_idx]);
.push((idx as i64) - row_group_offsets[row_group_idx]);
}
let row_group_indices = row_groups
.keys()
4 changes: 3 additions & 1 deletion bench-vortex/src/tpch/dbgen.rs
Original file line number Diff line number Diff line change
@@ -13,6 +13,8 @@ use itertools::Itertools;
use tar::Archive;
use xshell::Shell;

use crate::IdempotentPath;

pub struct DBGen {
options: DBGenOptions,
}
@@ -37,7 +39,7 @@ impl Default for DBGenOptions {
fn default() -> Self {
Self {
scale_factor: 1,
base_dir: std::env::current_dir().unwrap().join("data").join("tpch"),
base_dir: "tpch".to_data_path(),
cache_dir: homedir::my_home()
.unwrap()
.unwrap()
4 changes: 2 additions & 2 deletions encodings/alp/src/alp/compute/mod.rs
Original file line number Diff line number Diff line change
@@ -79,10 +79,10 @@ impl SliceFn<ALPArray> for ALPEncoding {
}

impl FilterFn<ALPArray> for ALPEncoding {
fn filter(&self, array: &ALPArray, mask: FilterMask) -> VortexResult<ArrayData> {
fn filter(&self, array: &ALPArray, mask: &FilterMask) -> VortexResult<ArrayData> {
let patches = array
.patches()
.map(|p| p.filter(mask.clone()))
.map(|p| p.filter(mask))
.transpose()?
.flatten();

17 changes: 10 additions & 7 deletions encodings/alp/src/alp_rd/compute/filter.rs
Original file line number Diff line number Diff line change
@@ -5,16 +5,16 @@ use vortex_error::VortexResult;
use crate::{ALPRDArray, ALPRDEncoding};

impl FilterFn<ALPRDArray> for ALPRDEncoding {
fn filter(&self, array: &ALPRDArray, mask: FilterMask) -> VortexResult<ArrayData> {
fn filter(&self, array: &ALPRDArray, mask: &FilterMask) -> VortexResult<ArrayData> {
let left_parts_exceptions = array
.left_parts_patches()
.map(|patches| patches.filter(mask.clone()))
.map(|patches| patches.filter(mask))
.transpose()?
.flatten();

Ok(ALPRDArray::try_new(
array.dtype().clone(),
filter(&array.left_parts(), mask.clone())?,
filter(&array.left_parts(), mask)?,
array.left_parts_dict(),
filter(&array.right_parts(), mask)?,
array.right_bit_width(),
@@ -46,10 +46,13 @@ mod test {
assert!(encoded.left_parts_patches().is_some());

// The first two values need no patching
let filtered = filter(encoded.as_ref(), FilterMask::from_iter([true, false, true]))
.unwrap()
.into_primitive()
.unwrap();
let filtered = filter(
encoded.as_ref(),
&FilterMask::from_iter([true, false, true]),
)
.unwrap()
.into_primitive()
.unwrap();
assert_eq!(filtered.as_slice::<T>(), &[a, outlier]);
}
}
6 changes: 3 additions & 3 deletions encodings/datetime-parts/src/compute/filter.rs
Original file line number Diff line number Diff line change
@@ -5,11 +5,11 @@ use vortex_error::VortexResult;
use crate::{DateTimePartsArray, DateTimePartsEncoding};

impl FilterFn<DateTimePartsArray> for DateTimePartsEncoding {
fn filter(&self, array: &DateTimePartsArray, mask: FilterMask) -> VortexResult<ArrayData> {
fn filter(&self, array: &DateTimePartsArray, mask: &FilterMask) -> VortexResult<ArrayData> {
Ok(DateTimePartsArray::try_new(
array.dtype().clone(),
filter(array.days().as_ref(), mask.clone())?,
filter(array.seconds().as_ref(), mask.clone())?,
filter(array.days().as_ref(), mask)?,
filter(array.seconds().as_ref(), mask)?,
filter(array.subsecond().as_ref(), mask)?,
)?
.into_array())
2 changes: 1 addition & 1 deletion encodings/dict/src/compute/mod.rs
Original file line number Diff line number Diff line change
@@ -60,7 +60,7 @@ impl TakeFn<DictArray> for DictEncoding {
}

impl FilterFn<DictArray> for DictEncoding {
fn filter(&self, array: &DictArray, mask: FilterMask) -> VortexResult<ArrayData> {
fn filter(&self, array: &DictArray, mask: &FilterMask) -> VortexResult<ArrayData> {
let codes = filter(&array.codes(), mask)?;
DictArray::try_new(codes, array.values()).map(|a| a.into_array())
}
28 changes: 15 additions & 13 deletions encodings/fastlanes/src/bitpacking/compute/filter.rs
Original file line number Diff line number Diff line change
@@ -13,7 +13,7 @@ use crate::bitpacking::compute::take::UNPACK_CHUNK_THRESHOLD;
use crate::{BitPackedArray, BitPackedEncoding};

impl FilterFn<BitPackedArray> for BitPackedEncoding {
fn filter(&self, array: &BitPackedArray, mask: FilterMask) -> VortexResult<ArrayData> {
fn filter(&self, array: &BitPackedArray, mask: &FilterMask) -> VortexResult<ArrayData> {
let primitive = match_each_unsigned_integer_ptype!(array.ptype().to_unsigned(), |$I| {
filter_primitive::<$I>(array, mask)
});
@@ -31,13 +31,13 @@ impl FilterFn<BitPackedArray> for BitPackedEncoding {
/// dictates the final `PType` of the result.
fn filter_primitive<T: NativePType + BitPacking + ArrowNativeType>(
array: &BitPackedArray,
mask: FilterMask,
mask: &FilterMask,
) -> VortexResult<PrimitiveArray> {
let validity = array.validity().filter(&mask)?;
let validity = array.validity().filter(mask)?;

let patches = array
.patches()
.map(|patches| patches.filter(mask.clone()))
.map(|patches| patches.filter(mask))
.transpose()?
.flatten();

@@ -47,15 +47,13 @@ fn filter_primitive<T: NativePType + BitPacking + ArrowNativeType>(
.and_then(|a| a.into_primitive());
}

let values: Buffer<T> = match mask.iter()? {
let values: Buffer<T> = match mask.iter() {
FilterIter::Indices(indices) => {
filter_indices(array, mask.true_count(), indices.iter().copied())
}
FilterIter::IndicesIter(iter) => filter_indices(array, mask.true_count(), iter),
FilterIter::Slices(slices) => {
filter_slices(array, mask.true_count(), slices.iter().copied())
}
FilterIter::SlicesIter(iter) => filter_slices(array, mask.true_count(), iter),
};

let mut values = PrimitiveArray::new(values, validity).reinterpret_cast(array.ptype());
@@ -143,9 +141,9 @@ mod test {
let unpacked = PrimitiveArray::from_iter((0..4096).map(|i| (i % 63) as u8));
let bitpacked = BitPackedArray::encode(unpacked.as_ref(), 6).unwrap();

let mask = FilterMask::from_indices(bitpacked.len(), [0, 125, 2047, 2049, 2151, 2790]);
let mask = FilterMask::from_indices(bitpacked.len(), vec![0, 125, 2047, 2049, 2151, 2790]);

let primitive_result = filter(bitpacked.as_ref(), mask)
let primitive_result = filter(bitpacked.as_ref(), &mask)
.unwrap()
.into_primitive()
.unwrap();
@@ -160,9 +158,9 @@ mod test {
let bitpacked = BitPackedArray::encode(unpacked.as_ref(), 6).unwrap();
let sliced = slice(bitpacked.as_ref(), 128, 2050).unwrap();

let mask = FilterMask::from_indices(sliced.len(), [1919, 1921]);
let mask = FilterMask::from_indices(sliced.len(), vec![1919, 1921]);

let primitive_result = filter(&sliced, mask).unwrap().into_primitive().unwrap();
let primitive_result = filter(&sliced, &mask).unwrap().into_primitive().unwrap();
let res_bytes = primitive_result.as_slice::<u8>();
assert_eq!(res_bytes, &[31, 33]);
}
@@ -171,7 +169,11 @@ mod test {
fn filter_bitpacked() {
let unpacked = PrimitiveArray::from_iter((0..4096).map(|i| (i % 63) as u8));
let bitpacked = BitPackedArray::encode(unpacked.as_ref(), 6).unwrap();
let filtered = filter(bitpacked.as_ref(), FilterMask::from_indices(4096, 0..1024)).unwrap();
let filtered = filter(
bitpacked.as_ref(),
&FilterMask::from_indices(4096, (0..1024).collect()),
)
.unwrap();
assert_eq!(
filtered.into_primitive().unwrap().as_slice::<u8>(),
(0..1024).map(|i| (i % 63) as u8).collect::<Vec<_>>()
@@ -185,7 +187,7 @@ mod test {
let bitpacked = BitPackedArray::encode(unpacked.as_ref(), 9).unwrap();
let filtered = filter(
bitpacked.as_ref(),
FilterMask::from_indices(values.len(), 0..250),
&FilterMask::from_indices(values.len(), (0..250).collect()),
)
.unwrap()
.into_primitive()
2 changes: 1 addition & 1 deletion encodings/fastlanes/src/for/compute/mod.rs
Original file line number Diff line number Diff line change
@@ -53,7 +53,7 @@ impl TakeFn<FoRArray> for FoREncoding {
}

impl FilterFn<FoRArray> for FoREncoding {
fn filter(&self, array: &FoRArray, mask: FilterMask) -> VortexResult<ArrayData> {
fn filter(&self, array: &FoRArray, mask: &FilterMask) -> VortexResult<ArrayData> {
FoRArray::try_new(
filter(&array.encoded(), mask)?,
array.reference_scalar(),
4 changes: 2 additions & 2 deletions encodings/fsst/src/compute/mod.rs
Original file line number Diff line number Diff line change
@@ -80,12 +80,12 @@ impl ScalarAtFn<FSSTArray> for FSSTEncoding {

impl FilterFn<FSSTArray> for FSSTEncoding {
// Filtering an FSSTArray filters the codes array, leaving the symbols array untouched
fn filter(&self, array: &FSSTArray, mask: FilterMask) -> VortexResult<ArrayData> {
fn filter(&self, array: &FSSTArray, mask: &FilterMask) -> VortexResult<ArrayData> {
Ok(FSSTArray::try_new(
array.dtype().clone(),
array.symbols(),
array.symbol_lengths(),
filter(&array.codes(), mask.clone())?,
filter(&array.codes(), mask)?,
filter(&array.uncompressed_lengths(), mask)?,
)?
.into_array())
2 changes: 1 addition & 1 deletion encodings/fsst/tests/fsst_tests.rs
Original file line number Diff line number Diff line change
@@ -86,7 +86,7 @@ fn test_fsst_array_ops() {
// test filter
let mask = FilterMask::from_iter([false, true, false]);

let fsst_filtered = filter(&fsst_array, mask).unwrap();
let fsst_filtered = filter(&fsst_array, &mask).unwrap();
assert_eq!(fsst_filtered.encoding().id(), FSSTEncoding::ID);
assert_eq!(fsst_filtered.len(), 1);
assert_nth_scalar!(
12 changes: 6 additions & 6 deletions encodings/runend/src/compute/mod.rs
Original file line number Diff line number Diff line change
@@ -92,12 +92,12 @@ impl SliceFn<RunEndArray> for RunEndEncoding {
}

impl FilterFn<RunEndArray> for RunEndEncoding {
fn filter(&self, array: &RunEndArray, mask: FilterMask) -> VortexResult<ArrayData> {
fn filter(&self, array: &RunEndArray, mask: &FilterMask) -> VortexResult<ArrayData> {
let primitive_run_ends = array.ends().into_primitive()?;
let (run_ends, values_mask) = match_each_unsigned_integer_ptype!(primitive_run_ends.ptype(), |$P| {
filter_run_ends(primitive_run_ends.as_slice::<$P>(), array.offset() as u64, array.len() as u64, mask)?
});
let values = filter(&array.values(), values_mask)?;
let values = filter(&array.values(), &values_mask)?;

RunEndArray::try_new(run_ends.into_array(), values).map(|a| a.into_array())
}
@@ -108,14 +108,14 @@ fn filter_run_ends<R: NativePType + AddAssign + From<bool> + AsPrimitive<u64>>(
run_ends: &[R],
offset: u64,
length: u64,
mask: FilterMask,
mask: &FilterMask,
) -> VortexResult<(PrimitiveArray, FilterMask)> {
let mut new_run_ends = buffer_mut![R::zero(); run_ends.len()];

let mut start = 0u64;
let mut j = 0;
let mut count = R::zero();
let filter_values = mask.to_boolean_buffer()?;
let filter_values = mask.boolean_buffer();

let new_mask: FilterMask = BooleanBuffer::collect_bool(run_ends.len(), |i| {
let mut keep = false;
@@ -278,7 +278,7 @@ mod test {
let arr = ree_array();
let filtered = filter(
arr.as_ref(),
FilterMask::from_iter([
&FilterMask::from_iter([
true, true, false, false, false, false, false, false, false, false, true, true,
]),
)
@@ -308,7 +308,7 @@ mod test {
let arr = slice(ree_array(), 2, 7).unwrap();
let filtered = filter(
&arr,
FilterMask::from_iter([true, false, false, true, true]),
&FilterMask::from_iter([true, false, false, true, true]),
)
.unwrap();
let filtered_run_end = RunEndArray::try_from(filtered).unwrap();
4 changes: 2 additions & 2 deletions encodings/zigzag/src/compute.rs
Original file line number Diff line number Diff line change
@@ -30,7 +30,7 @@ impl ComputeVTable for ZigZagEncoding {
}

impl FilterFn<ZigZagArray> for ZigZagEncoding {
fn filter(&self, array: &ZigZagArray, mask: FilterMask) -> VortexResult<ArrayData> {
fn filter(&self, array: &ZigZagArray, mask: &FilterMask) -> VortexResult<ArrayData> {
let encoded = filter(&array.encoded(), mask)?;
Ok(ZigZagArray::try_new(encoded)?.into_array())
}
@@ -145,7 +145,7 @@ mod tests {
fn filter_zigzag() {
let zigzag = ZigZagArray::encode(&buffer![-189, -160, 1].into_array()).unwrap();
let filter_mask = BooleanBuffer::from(vec![true, false, true]).into();
let actual = filter(&zigzag.into_array(), filter_mask)
let actual = filter(&zigzag.into_array(), &filter_mask)
.unwrap()
.into_primitive()
.unwrap();
2 changes: 1 addition & 1 deletion fuzz/fuzz_targets/array_ops.rs
Original file line number Diff line number Diff line change
@@ -59,7 +59,7 @@ fuzz_target!(|fuzz_action: FuzzArrayAction| -> Corpus {
assert_search_sorted(sorted, s, side, expected.search(), i)
}
Action::Filter(mask) => {
current_array = filter(&current_array, mask).unwrap();
current_array = filter(&current_array, &mask).unwrap();
assert_array_eq(&expected.array(), &current_array, i);
}
}
2 changes: 1 addition & 1 deletion pyvortex/src/array.rs
Original file line number Diff line number Diff line change
@@ -263,7 +263,7 @@ impl PyArray {
fn filter(&self, filter: &Bound<PyArray>) -> PyResult<PyArray> {
let filter = filter.borrow();
let inner =
vortex::compute::filter(&self.inner, FilterMask::try_from(filter.inner.clone())?)?;
vortex::compute::filter(&self.inner, &FilterMask::try_from(filter.inner.clone())?)?;
Ok(PyArray { inner })
}

14 changes: 4 additions & 10 deletions vortex-array/src/array/bool/compute/filter.rs
Original file line number Diff line number Diff line change
@@ -6,22 +6,16 @@ use crate::compute::{FilterFn, FilterIter, FilterMask};
use crate::{ArrayData, IntoArrayData};

impl FilterFn<BoolArray> for BoolEncoding {
fn filter(&self, array: &BoolArray, mask: FilterMask) -> VortexResult<ArrayData> {
let validity = array.validity().filter(&mask)?;
fn filter(&self, array: &BoolArray, mask: &FilterMask) -> VortexResult<ArrayData> {
let validity = array.validity().filter(mask)?;

let buffer = match mask.iter()? {
let buffer = match mask.iter() {
FilterIter::Indices(indices) => filter_indices_slice(&array.boolean_buffer(), indices),
FilterIter::IndicesIter(iter) => {
filter_indices(&array.boolean_buffer(), mask.true_count(), iter)
}
FilterIter::Slices(slices) => filter_slices(
&array.boolean_buffer(),
mask.true_count(),
slices.iter().copied(),
),
FilterIter::SlicesIter(iter) => {
filter_slices(&array.boolean_buffer(), mask.true_count(), iter)
}
};

Ok(BoolArray::try_new(buffer, validity)?.into_array())
@@ -84,7 +78,7 @@ mod test {
let arr = BoolArray::from_iter([true, true, false]);
let mask = FilterMask::from_iter([true, false, true]);

let filtered = filter(&arr.into_array(), mask)
let filtered = filter(&arr.into_array(), &mask)
.unwrap()
.into_bool()
.unwrap();
62 changes: 13 additions & 49 deletions vortex-array/src/array/chunked/compute/filter.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use arrow_buffer::BooleanBufferBuilder;
use vortex_buffer::BufferMut;
use vortex_error::{VortexExpect, VortexResult, VortexUnwrap};

@@ -11,7 +10,7 @@ use crate::{ArrayDType, ArrayData, ArrayLen, IntoArrayData, IntoCanonical};
const FILTER_SLICES_SELECTIVITY_THRESHOLD: f64 = 0.8;

impl FilterFn<ChunkedArray> for ChunkedEncoding {
fn filter(&self, array: &ChunkedArray, mask: FilterMask) -> VortexResult<ArrayData> {
fn filter(&self, array: &ChunkedArray, mask: &FilterMask) -> VortexResult<ArrayData> {
let selected = mask.true_count();

// Based on filter selectivity, we take the values between a range of slices, or
@@ -38,31 +37,8 @@ enum ChunkFilter {
Slices(Vec<(usize, usize)>),
}

/// Given a sequence of slices that indicate ranges of set values, returns a boolean array
/// representing the same thing.
fn slices_to_mask(slices: &[(usize, usize)], len: usize) -> FilterMask {
let mut buffer = BooleanBufferBuilder::new(len);

let mut pos = 0;
for (slice_start, slice_end) in slices.iter().copied() {
// write however many trailing `false` between the end of the previous slice and the
// start of this one.
let n_leading_false = slice_start - pos;
buffer.append_n(n_leading_false, false);
buffer.append_n(slice_end - slice_start, true);
pos = slice_end;
}

// Pad the end of the buffer with false, if necessary.
let n_trailing_false = len - pos;
buffer.append_n(n_trailing_false, false);

FilterMask::from(buffer.finish())
}

/// Filter the chunks using slice ranges.
#[allow(deprecated)]
fn filter_slices(array: &ChunkedArray, mask: FilterMask) -> VortexResult<Vec<ArrayData>> {
fn filter_slices(array: &ChunkedArray, mask: &FilterMask) -> VortexResult<Vec<ArrayData>> {
let mut result = Vec::with_capacity(array.nchunks());

// Pre-materialize the chunk ends for performance.
@@ -72,8 +48,8 @@ fn filter_slices(array: &ChunkedArray, mask: FilterMask) -> VortexResult<Vec<Arr

let mut chunk_filters = vec![ChunkFilter::None; array.nchunks()];

for (slice_start, slice_end) in mask.iter_slices()? {
let (start_chunk, start_idx) = find_chunk_idx(slice_start, chunk_ends);
for (slice_start, slice_end) in mask.slices() {
let (start_chunk, start_idx) = find_chunk_idx(*slice_start, chunk_ends);
// NOTE: we adjust slice end back by one, in case it ends on a chunk boundary, we do not
// want to index into the unused chunk.
let (end_chunk, end_idx) = find_chunk_idx(slice_end - 1, chunk_ends);
@@ -120,15 +96,18 @@ fn filter_slices(array: &ChunkedArray, mask: FilterMask) -> VortexResult<Vec<Arr
}

// Now, apply the chunk filter to every slice.
for (chunk, chunk_filter) in array.chunks().zip(chunk_filters.iter()) {
for (chunk, chunk_filter) in array.chunks().zip(chunk_filters.into_iter()) {
match chunk_filter {
// All => preserve the entire chunk unfiltered.
ChunkFilter::All => result.push(chunk),
// None => whole chunk is filtered out, skip
ChunkFilter::None => {}
// Slices => turn the slices into a boolean buffer.
ChunkFilter::Slices(slices) => {
result.push(filter(&chunk, slices_to_mask(slices, chunk.len()))?);
result.push(filter(
&chunk,
&FilterMask::from_slices(chunk.len(), slices),
)?);
}
}
}
@@ -138,7 +117,7 @@ fn filter_slices(array: &ChunkedArray, mask: FilterMask) -> VortexResult<Vec<Arr

/// Filter the chunks using indices.
#[allow(deprecated)]
fn filter_indices(array: &ChunkedArray, mask: FilterMask) -> VortexResult<Vec<ArrayData>> {
fn filter_indices(array: &ChunkedArray, mask: &FilterMask) -> VortexResult<Vec<ArrayData>> {
let mut result = Vec::with_capacity(array.nchunks());
let mut current_chunk_id = 0;
let mut chunk_indices = BufferMut::with_capacity(array.nchunks());
@@ -148,8 +127,8 @@ fn filter_indices(array: &ChunkedArray, mask: FilterMask) -> VortexResult<Vec<Ar
let chunk_ends = array.chunk_offsets().into_canonical()?.into_primitive()?;
let chunk_ends = chunk_ends.as_slice::<u64>();

for set_index in mask.iter_indices()? {
let (chunk_id, index) = find_chunk_idx(set_index, chunk_ends);
for set_index in mask.indices() {
let (chunk_id, index) = find_chunk_idx(*set_index, chunk_ends);
if chunk_id != current_chunk_id {
// Push the chunk we've accumulated.
if !chunk_indices.is_empty() {
@@ -201,28 +180,13 @@ fn find_chunk_idx(idx: usize, chunk_ends: &[u64]) -> (usize, usize) {

#[cfg(test)]
mod test {
use itertools::Itertools;
use vortex_dtype::half::f16;
use vortex_dtype::{DType, Nullability, PType};

use crate::array::chunked::compute::filter::slices_to_mask;
use crate::array::{ChunkedArray, PrimitiveArray};
use crate::compute::{filter, FilterMask};
use crate::IntoArrayData;

#[test]
fn test_slices_to_predicate() {
let slices = [(2, 4), (6, 8), (9, 10)];
let predicate = slices_to_mask(&slices, 11);

let bools = predicate.to_boolean_buffer().unwrap().iter().collect_vec();

assert_eq!(
bools,
vec![false, false, true, true, false, false, true, true, false, true, false],
)
}

#[test]
fn filter_chunked_floats() {
let chunked = ChunkedArray::try_new(
@@ -252,7 +216,7 @@ mod test {
let mask = FilterMask::from_iter([
true, false, false, true, true, true, true, true, true, true, true,
]);
let filtered = filter(&chunked, mask).unwrap();
let filtered = filter(&chunked, &mask).unwrap();
assert_eq!(filtered.len(), 9);
}
}
2 changes: 1 addition & 1 deletion vortex-array/src/array/constant/compute/mod.rs
Original file line number Diff line number Diff line change
@@ -72,7 +72,7 @@ impl SliceFn<ConstantArray> for ConstantEncoding {
}

impl FilterFn<ConstantArray> for ConstantEncoding {
fn filter(&self, array: &ConstantArray, mask: FilterMask) -> VortexResult<ArrayData> {
fn filter(&self, array: &ConstantArray, mask: &FilterMask) -> VortexResult<ArrayData> {
Ok(ConstantArray::new(array.scalar(), mask.true_count()).into_array())
}
}
2 changes: 1 addition & 1 deletion vortex-array/src/array/list/mod.rs
Original file line number Diff line number Diff line change
@@ -311,7 +311,7 @@ mod test {

let filtered = filter(
&list,
FilterMask::from(BooleanBuffer::from(vec![false, true, true])),
&FilterMask::from(BooleanBuffer::from(vec![false, true, true])),
);

assert!(filtered.is_ok())
10 changes: 4 additions & 6 deletions vortex-array/src/array/primitive/compute/filter.rs
Original file line number Diff line number Diff line change
@@ -9,14 +9,12 @@ use crate::variants::PrimitiveArrayTrait;
use crate::{ArrayData, IntoArrayData};

impl FilterFn<PrimitiveArray> for PrimitiveEncoding {
fn filter(&self, array: &PrimitiveArray, mask: FilterMask) -> VortexResult<ArrayData> {
let validity = array.validity().filter(&mask)?;
fn filter(&self, array: &PrimitiveArray, mask: &FilterMask) -> VortexResult<ArrayData> {
let validity = array.validity().filter(mask)?;
match_each_native_ptype!(array.ptype(), |$T| {
let values = match mask.iter()? {
let values = match mask.iter() {
FilterIter::Indices(indices) => filter_primitive_indices(array.as_slice::<$T>(), indices.iter().copied()),
FilterIter::IndicesIter(iter) => filter_primitive_indices(array.as_slice::<$T>(), iter),
FilterIter::Slices(slices) => filter_primitive_slices(array.as_slice::<$T>(), mask.true_count(), slices.iter().copied()),
FilterIter::SlicesIter(iter) => filter_primitive_slices(array.as_slice::<$T>(), mask.true_count(), iter),
};
Ok(PrimitiveArray::new(values, validity).into_array())
})
@@ -57,7 +55,7 @@ mod test {
let mask = [true, true, false, true, true, true, false, true];
let arr = PrimitiveArray::from_iter([1u32, 24, 54, 2, 3, 2, 3, 2]);

let filtered = filter(&arr.to_array(), FilterMask::from_iter(mask))
let filtered = filter(&arr.to_array(), &FilterMask::from_iter(mask))
.unwrap()
.into_primitive()
.unwrap();
8 changes: 4 additions & 4 deletions vortex-array/src/array/sparse/compute/mod.rs
Original file line number Diff line number Diff line change
@@ -89,8 +89,8 @@ impl SearchSortedUsizeFn<SparseArray> for SparseEncoding {
}

impl FilterFn<SparseArray> for SparseEncoding {
fn filter(&self, array: &SparseArray, mask: FilterMask) -> VortexResult<ArrayData> {
let new_length = mask.to_boolean_buffer()?.count_set_bits();
fn filter(&self, array: &SparseArray, mask: &FilterMask) -> VortexResult<ArrayData> {
let new_length = mask.true_count();

let Some(new_patches) = array.resolved_patches()?.filter(mask)? else {
return Ok(ConstantArray::new(array.fill_scalar(), new_length).into_array());
@@ -188,7 +188,7 @@ mod test {
predicate.extend_from_slice(&[false; 17]);
let mask = FilterMask::from_iter(predicate);

let filtered_array = filter(&array, mask).unwrap();
let filtered_array = filter(&array, &mask).unwrap();
let filtered_array = SparseArray::try_from(filtered_array).unwrap();

assert_eq!(filtered_array.len(), 1);
@@ -208,7 +208,7 @@ mod test {
.unwrap()
.into_array();

let filtered_array = filter(&array, mask).unwrap();
let filtered_array = filter(&array, &mask).unwrap();
let filtered_array = SparseArray::try_from(filtered_array).unwrap();

assert_eq!(filtered_array.len(), 4);
11 changes: 6 additions & 5 deletions vortex-array/src/array/struct_/compute.rs
Original file line number Diff line number Diff line change
@@ -73,12 +73,12 @@ impl SliceFn<StructArray> for StructEncoding {
}

impl FilterFn<StructArray> for StructEncoding {
fn filter(&self, array: &StructArray, mask: FilterMask) -> VortexResult<ArrayData> {
let validity = array.validity().filter(&mask)?;
fn filter(&self, array: &StructArray, mask: &FilterMask) -> VortexResult<ArrayData> {
let validity = array.validity().filter(mask)?;

let fields: Vec<ArrayData> = array
.children()
.map(|field| filter(&field, mask.clone()))
.map(|field| filter(&field, mask))
.try_collect()?;
let length = fields
.first()
@@ -103,15 +103,16 @@ mod tests {
let mask = vec![
false, true, false, true, false, true, false, true, false, true,
];
let filtered = filter(struct_arr.as_ref(), FilterMask::from_iter(mask)).unwrap();
let filtered = filter(struct_arr.as_ref(), &FilterMask::from_iter(mask)).unwrap();
assert_eq!(filtered.len(), 5);
}

#[test]
fn filter_empty_struct_with_empty_filter() {
let struct_arr =
StructArray::try_new(vec![].into(), vec![], 0, Validity::NonNullable).unwrap();
let filtered = filter(struct_arr.as_ref(), FilterMask::from_iter::<[bool; 0]>([])).unwrap();
let filtered =
filter(struct_arr.as_ref(), &FilterMask::from_iter::<[bool; 0]>([])).unwrap();
assert_eq!(filtered.len(), 0);
}
}
5 changes: 3 additions & 2 deletions vortex-array/src/array/varbin/accessor.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use itertools::Itertools;
use vortex_dtype::match_each_integer_ptype;
use vortex_error::VortexResult;

@@ -26,14 +27,14 @@ impl ArrayAccessor<[u8]> for VarBinArray {
None => {
let mut iter = offsets
.iter()
.zip(offsets.iter().skip(1))
.tuple_windows()
.map(|(start, end)| Some(&bytes[*start as usize..*end as usize]));
Ok(f(&mut iter))
}
Some(validity) => {
let mut iter = offsets
.iter()
.zip(offsets.iter().skip(1))
.tuple_windows()
.zip(validity.iter())
.map(|((start, end), valid)| {
if valid {
26 changes: 13 additions & 13 deletions vortex-array/src/array/varbin/compute/filter.rs
Original file line number Diff line number Diff line change
@@ -12,12 +12,12 @@ use crate::variants::PrimitiveArrayTrait;
use crate::{ArrayDType, ArrayData, IntoArrayData, IntoArrayVariant};

impl FilterFn<VarBinArray> for VarBinEncoding {
fn filter(&self, array: &VarBinArray, mask: FilterMask) -> VortexResult<ArrayData> {
fn filter(&self, array: &VarBinArray, mask: &FilterMask) -> VortexResult<ArrayData> {
filter_select_var_bin(array, mask).map(|a| a.into_array())
}
}

fn filter_select_var_bin(arr: &VarBinArray, mask: FilterMask) -> VortexResult<VarBinArray> {
fn filter_select_var_bin(arr: &VarBinArray, mask: &FilterMask) -> VortexResult<VarBinArray> {
let selection_count = mask.true_count();
if selection_count * 2 > mask.len() {
filter_select_var_bin_by_slice(arr, mask, selection_count)
@@ -28,7 +28,7 @@ fn filter_select_var_bin(arr: &VarBinArray, mask: FilterMask) -> VortexResult<Va

fn filter_select_var_bin_by_slice(
values: &VarBinArray,
mask: FilterMask,
mask: &FilterMask,
selection_count: usize,
) -> VortexResult<VarBinArray> {
let offsets = values.offsets().into_primitive()?;
@@ -49,7 +49,7 @@ fn filter_select_var_bin_by_slice_primitive_offset<O>(
dtype: DType,
offsets: &[O],
data: &[u8],
mask: FilterMask,
mask: &FilterMask,
validity: Validity,
selection_count: usize,
) -> VortexResult<VarBinArray>
@@ -61,7 +61,7 @@ where
if let Some(val) = logical_validity.to_null_buffer()? {
let mut builder = VarBinBuilder::<O>::with_capacity(selection_count);

for (start, end) in mask.iter_slices()? {
for (start, end) in mask.slices().iter().copied() {
let null_sl = val.slice(start, end - start);
if null_sl.null_count() == 0 {
update_non_nullable_slice(data, offsets, &mut builder, start, end)
@@ -93,8 +93,8 @@ where

let mut builder = VarBinBuilder::<O>::with_capacity(selection_count);

mask.iter_slices()?.for_each(|(start, end)| {
update_non_nullable_slice(data, offsets, &mut builder, start, end)
mask.slices().iter().for_each(|(start, end)| {
update_non_nullable_slice(data, offsets, &mut builder, *start, *end)
});

Ok(builder.finish(dtype))
@@ -128,7 +128,7 @@ fn update_non_nullable_slice<O>(

fn filter_select_var_bin_by_index(
values: &VarBinArray,
mask: FilterMask,
mask: &FilterMask,
selection_count: usize,
) -> VortexResult<VarBinArray> {
let offsets = values.offsets().into_primitive()?;
@@ -149,12 +149,12 @@ fn filter_select_var_bin_by_index_primitive_offset<O: NativePType + PrimInt>(
dtype: DType,
offsets: &[O],
data: &[u8],
mask: FilterMask,
mask: &FilterMask,
validity: Validity,
selection_count: usize,
) -> VortexResult<VarBinArray> {
let mut builder = VarBinBuilder::<O>::with_capacity(selection_count);
for idx in mask.iter_indices()? {
for idx in mask.indices().iter().copied() {
if validity.is_valid(idx) {
let (start, end) = (
offsets[idx].to_usize().ok_or_else(|| {
@@ -205,7 +205,7 @@ mod test {
);
let filter = FilterMask::from_iter([true, false, true]);

let buf = filter_select_var_bin_by_index(&arr, filter, 2)
let buf = filter_select_var_bin_by_index(&arr, &filter, 2)
.unwrap()
.to_array();

@@ -228,7 +228,7 @@ mod test {
);
let filter = FilterMask::from_iter([true, false, true, false, true]);

let buf = filter_select_var_bin_by_slice(&arr, filter, 3)
let buf = filter_select_var_bin_by_slice(&arr, &filter, 3)
.unwrap()
.to_array();

@@ -258,7 +258,7 @@ mod test {
let arr = VarBinArray::try_new(offsets, bytes, DType::Utf8(Nullable), validity).unwrap();
let filter = FilterMask::from_iter([true, true, true, false, true, true]);

let buf = filter_select_var_bin_by_slice(&arr, filter, 5)
let buf = filter_select_var_bin_by_slice(&arr, &filter, 5)
.unwrap()
.to_array();

32 changes: 0 additions & 32 deletions vortex-array/src/array/varbin/iter.rs

This file was deleted.

613 changes: 448 additions & 165 deletions vortex-array/src/compute/filter.rs

Large diffs are not rendered by default.

7 changes: 4 additions & 3 deletions vortex-array/src/patches.rs
Original file line number Diff line number Diff line change
@@ -194,12 +194,13 @@ impl Patches {
}

/// Filter the patches by a mask, resulting in new patches for the filtered array.
pub fn filter(&self, mask: FilterMask) -> VortexResult<Option<Self>> {
pub fn filter(&self, mask: &FilterMask) -> VortexResult<Option<Self>> {
if mask.is_empty() {
return Ok(None);
}

let buffer = mask.to_boolean_buffer()?;
// TODO(ngates): add functions to operate with FilterMask directly
let buffer = mask.boolean_buffer();
let mut coordinate_indices = BufferMut::<u64>::empty();
let mut value_indices = BufferMut::<u64>::empty();
let mut last_inserted_index: usize = 0;
@@ -402,7 +403,7 @@ mod test {
);

let filtered = patches
.filter(FilterMask::from_indices(100, [10u32, 20, 30]))
.filter(&FilterMask::from_indices(100, vec![10, 20, 30]))
.unwrap()
.unwrap();

2 changes: 1 addition & 1 deletion vortex-array/src/validity.rs
Original file line number Diff line number Diff line change
@@ -229,7 +229,7 @@ impl Validity {
v @ (Validity::NonNullable | Validity::AllValid | Validity::AllInvalid) => {
Ok(v.clone())
}
Validity::Array(arr) => Ok(Validity::Array(filter(arr, mask.clone())?)),
Validity::Array(arr) => Ok(Validity::Array(filter(arr, mask)?)),
}
}

7 changes: 2 additions & 5 deletions vortex-datafusion/src/persistent/opener.rs
Original file line number Diff line number Diff line change
@@ -15,9 +15,8 @@ use vortex_error::VortexResult;
use vortex_expr::datafusion::convert_expr_to_vortex;
use vortex_expr::transform::simplify_typed::simplify_typed;
use vortex_expr::{and, get_item, ident, lit, pack, ExprRef, Identity};
use vortex_file::v2::{ExecutionMode, VortexOpenOptions};
use vortex_file::v2::{ExecutionMode, Scan, VortexOpenOptions};
use vortex_io::ObjectStoreReadAt;
use vortex_scan::Scan;

use super::cache::FileLayoutCache;

@@ -87,8 +86,6 @@ impl FileOpener for VortexFileOpener {
// Construct the projection expression based on the DataFusion projection mask.
// Each index in the mask corresponds to the field position of the root DType.

let scan = Scan::new(self.projection.clone(), self.filter.clone()).into_arc();

let read_at =
ObjectStoreReadAt::new(this.object_store.clone(), file_meta.location().clone());

@@ -104,7 +101,7 @@ impl FileOpener for VortexFileOpener {
.await?;

Ok(vxf
.scan(scan)?
.scan(Scan::new(this.projection.clone(), this.filter.clone()))?
.map_ok(RecordBatch::try_from)
.map(|r| r.and_then(|inner| inner))
.map_err(|e| e.into())
17 changes: 13 additions & 4 deletions vortex-expr/src/transform/partition.rs
Original file line number Diff line number Diff line change
@@ -127,11 +127,9 @@ impl<'a> Folder<'a> for ImmediateIdentityAccessesAnalysis<'a> {
.collect_vec();

accesses.into_iter().for_each(|c| {
let accesses = self.sub_expressions.entry(node).or_default();
if let Some(fields) = c {
self.sub_expressions
.entry(node)
.or_default()
.extend(fields.iter().cloned());
accesses.extend(fields.iter().cloned());
}
});

@@ -159,6 +157,11 @@ impl<'a> StructFieldExpressionSplitter<'a> {
let mut expr_top_level_ref = ImmediateIdentityAccessesAnalysis::new(scope_dtype);
expr.accept_with_context(&mut expr_top_level_ref, ())?;

let expression_accesses = expr_top_level_ref
.sub_expressions
.get(&expr)
.map(|ac| ac.len());

let mut splitter =
StructFieldExpressionSplitter::new(expr_top_level_ref.sub_expressions, scope_dtype);

@@ -178,6 +181,12 @@ impl<'a> StructFieldExpressionSplitter<'a> {
})
.collect();

// Ensure that there are not more accesses than partitions, we missed something
assert!(expression_accesses.unwrap_or(0) <= partitions.len());
// Ensure that there are as many partitions as there are accesses/fields in the scope,
// this will affect performance, not correctness.
debug_assert_eq!(expression_accesses.unwrap_or(0), partitions.len());

Ok(PartitionedExpr {
root: split.result(),
partitions: partitions.into_boxed_slice(),
5 changes: 3 additions & 2 deletions vortex-file/src/read/layouts/chunked.rs
Original file line number Diff line number Diff line change
@@ -422,6 +422,7 @@ mod tests {
use flatbuffers::{root, FlatBufferBuilder};
use futures_util::io::Cursor;
use futures_util::TryStreamExt;
use itertools::Itertools;
use vortex_array::array::ChunkedArray;
use vortex_array::compute::FilterMask;
use vortex_array::{ArrayDType, ArrayLen, IntoArrayData, IntoArrayVariant};
@@ -462,11 +463,11 @@ mod tests {
}
let flat_layouts = byte_offsets
.iter()
.zip(byte_offsets.iter().skip(1))
.tuple_windows()
.zip(
row_offsets
.iter()
.zip(row_offsets.iter().skip(1))
.tuple_windows()
.map(|(begin, end)| end - begin),
)
.map(|((begin, end), len)| write::LayoutSpec::flat(ByteRange::new(*begin, *end), len))
55 changes: 30 additions & 25 deletions vortex-file/src/read/mask.rs
Original file line number Diff line number Diff line change
@@ -2,7 +2,6 @@ use std::cmp::{max, min};
use std::fmt::{Display, Formatter};

use arrow_buffer::BooleanBuffer;
use vortex_array::aliases::hash_set::HashSet;
use vortex_array::array::{BoolArray, PrimitiveArray, SparseArray};
use vortex_array::compute::{and, filter, slice, try_cast, FilterMask};
use vortex_array::validity::{ArrayValidity, LogicalValidity, Validity};
@@ -27,7 +26,7 @@ impl PartialEq for RowMask {
fn eq(&self, other: &Self) -> bool {
self.begin == other.begin
&& self.end == other.end
&& self.mask.to_boolean_buffer().unwrap() == other.mask.to_boolean_buffer().unwrap()
&& self.mask.boolean_buffer() == other.mask.boolean_buffer()
}
}

@@ -110,7 +109,11 @@ impl RowMask {
// TODO(ngates): should from_indices take u64?
let mask = FilterMask::from_indices(
end - begin,
indices.as_slice::<u64>().iter().map(|i| *i as usize),
indices
.as_slice::<u64>()
.iter()
.map(|i| *i as usize)
.collect(),
);

RowMask::try_new(mask, begin, end)
@@ -145,10 +148,10 @@ impl RowMask {

// If both masks align perfectly
if self.begin == other.begin && self.end == other.end {
let this_buffer = self.mask.to_boolean_buffer()?;
let other_buffer = other.mask.to_boolean_buffer()?;
let this_buffer = self.mask.boolean_buffer();
let other_buffer = other.mask.boolean_buffer();

let unified = &this_buffer & (&other_buffer);
let unified = this_buffer & other_buffer;
return RowMask::from_mask_array(
BoolArray::from(unified).as_ref(),
self.begin,
@@ -164,25 +167,26 @@ impl RowMask {
));
}

let output_begin = min(self.begin, other.begin);
let output_end = max(self.end, other.end);
let output_len = output_end - output_begin;

let this_buffer = self.mask.to_boolean_buffer()?;
let other_buffer = other.mask.to_boolean_buffer()?;
let self_indices = this_buffer
.set_indices()
.map(|v| v + self.begin)
.collect::<HashSet<_>>();
let other_indices = other_buffer
.set_indices()
.map(|v| v + other.begin)
.collect::<HashSet<_>>();

let output_mask = FilterMask::from_indices(
output_end,
self_indices.intersection(&other_indices).copied(),
let output_mask = FilterMask::from_intersection_indices(
output_len,
self.mask
.indices()
.iter()
.copied()
.map(|v| v + self.begin - output_begin),
other
.mask
.indices()
.iter()
.copied()
.map(|v| v + other.begin - output_begin),
);

Self::try_new(output_mask, 0, output_end)
Self::try_new(output_mask, output_begin, output_end)
}

pub fn is_all_false(&self) -> bool {
@@ -215,7 +219,7 @@ impl RowMask {
} else {
FilterMask::from(
self.mask
.to_boolean_buffer()?
.boolean_buffer()
.slice(range_begin - self.begin, range_end - range_begin),
)
},
@@ -248,15 +252,16 @@ impl RowMask {
return Ok(Some(sliced.clone()));
}

filter(sliced, self.mask.clone()).map(Some)
filter(sliced, &self.mask).map(Some)
}

#[allow(deprecated)]
fn to_indices_array(&self) -> VortexResult<ArrayData> {
Ok(PrimitiveArray::new(
self.mask
.iter_indices()?
.map(|i| i as u64)
.indices()
.iter()
.map(|i| *i as u64)
.collect::<Buffer<u64>>(),
Validity::NonNullable,
)
137 changes: 116 additions & 21 deletions vortex-file/src/v2/file.rs
Original file line number Diff line number Diff line change
@@ -7,13 +7,16 @@ use std::task::{Context, Poll};
use futures::Stream;
use futures_util::{stream, FutureExt, StreamExt, TryFutureExt, TryStreamExt};
use pin_project_lite::pin_project;
use vortex_array::compute::FilterMask;
use vortex_array::stats::{Stat, StatsSet};
use vortex_array::stream::{ArrayStream, ArrayStreamAdapter};
use vortex_array::ContextRef;
use vortex_buffer::Buffer;
use vortex_dtype::{DType, FieldPath};
use vortex_error::{vortex_err, VortexResult};
use vortex_layout::{ExprEvaluator, LayoutReader, StatsEvaluator};
use vortex_scan::Scan;
use vortex_error::{vortex_bail, vortex_err, VortexExpect, VortexResult};
use vortex_expr::{ExprRef, Identity};
use vortex_layout::{ExprEvaluator, LayoutReader};
use vortex_scan::{RowMask, Scanner};

use crate::v2::exec::ExecDriver;
use crate::v2::io::IoDriver;
@@ -33,6 +36,28 @@ pub struct VortexFile<I> {
pub(crate) splits: Arc<[Range<u64>]>,
}

pub struct Scan {
projection: ExprRef,
filter: Option<ExprRef>,
}

impl Scan {
pub fn all() -> Self {
Self {
projection: Identity::new_expr(),
filter: None,
}
}

pub fn new(projection: ExprRef, filter: Option<ExprRef>) -> Self {
Self { projection, filter }
}

pub fn build(self, dtype: DType) -> VortexResult<Arc<Scanner>> {
Ok(Arc::new(Scanner::new(dtype, self.projection, self.filter)?))
}
}

/// Async implementation of Vortex File.
impl<I: IoDriver> VortexFile<I> {
/// Returns the number of rows in the file.
@@ -51,34 +76,104 @@ impl<I: IoDriver> VortexFile<I> {
}

/// Performs a scan operation over the file.
pub fn scan(&self, scan: Arc<Scan>) -> VortexResult<impl ArrayStream + 'static + use<'_, I>> {
let result_dtype = scan.result_dtype(self.dtype())?;
pub fn scan(&self, scan: Scan) -> VortexResult<impl ArrayStream + 'static + use<'_, I>> {
self.scan_with_masks(
ArcIter::new(self.splits.clone())
.map(|row_range| RowMask::new_valid_between(row_range.start, row_range.end)),
scan,
)
}

/// Takes the given rows while also applying the filter and projection functions from a scan.
/// The row indices must be sorted.
pub fn take(
&self,
row_indices: Buffer<u64>,
scan: Scan,
) -> VortexResult<impl ArrayStream + 'static + use<'_, I>> {
if !row_indices.windows(2).all(|w| w[0] <= w[1]) {
vortex_bail!("row indices must be sorted")
}

let row_masks = ArcIter::new(self.splits.clone()).filter_map(move |row_range| {
// Quickly short-circuit if the row range is outside the bounds of the row indices.
if row_range.end <= row_indices[0]
|| row_indices
.last()
.is_some_and(|&last| row_range.start >= last)
{
return None;
}

// For the given row range, find the indices that are within the row_indices.
let start_idx = row_indices
.binary_search(&row_range.start)
.unwrap_or_else(|x| x);
let end_idx = row_indices
.binary_search(&row_range.end)
.unwrap_or_else(|x| x);

if start_idx == end_idx {
// No rows in range
return None;
}

// Construct a row mask for the range.
let filter_mask = FilterMask::from_indices(
usize::try_from(row_range.end - row_range.start)
.vortex_expect("Split ranges are within usize"),
row_indices[start_idx..end_idx]
.iter()
.map(|&idx| {
usize::try_from(idx - row_range.start).vortex_expect("index within range")
})
.collect(),
);
Some(RowMask::new(filter_mask, row_range.start))
});

self.scan_with_masks(row_masks, scan)
}

fn scan_with_masks<R>(
&self,
row_masks: R,
scan: Scan,
) -> VortexResult<impl ArrayStream + 'static + use<'_, I, R>>
where
R: Iterator<Item = RowMask> + Send + 'static,
{
let scanner = scan.build(self.dtype().clone())?;

let result_dtype = scanner.result_dtype().clone();

// Set up a segment channel to collect segment requests from the execution stream.
let segment_channel = SegmentChannel::new();

// Create a single LayoutReader that is reused for the entire scan.
let reader: Arc<dyn LayoutReader> = self
.file_layout
.root_layout
.root_layout()
.reader(segment_channel.reader(), self.ctx.clone())?;

// Now we give one end of the channel to the layout reader...
log::debug!("Starting scan with {} splits", self.splits.len());
let exec_stream = stream::iter(ArcIter::new(self.splits.clone()))
.map(move |row_range| scan.clone().range_scan(row_range))
.map(move |range_scan| match range_scan {
Ok(range_scan) => {
let reader = reader.clone();
async move {
range_scan
.evaluate_async(|row_mask, expr| reader.evaluate_expr(row_mask, expr))
.await
let exec_stream = stream::iter(row_masks)
.map(
move |row_mask| match scanner.clone().range_scanner(row_mask) {
Ok(range_scan) => {
let reader = reader.clone();
async move {
range_scan
.evaluate_async(|row_mask, expr| {
reader.evaluate_expr(row_mask, expr)
})
.await
}
.boxed()
}
.boxed()
}
Err(e) => futures::future::ready(Err(e)).boxed(),
})
Err(e) => futures::future::ready(Err(e)).boxed(),
},
)
.boxed();
let exec_stream = self.exec_driver.drive(exec_stream);

@@ -107,7 +202,7 @@ impl<I: IoDriver> VortexFile<I> {
// Create a single LayoutReader that is reused for the entire scan.
let reader: Arc<dyn LayoutReader> = self
.file_layout
.root_layout
.root_layout()
.reader(segment_channel.reader(), self.ctx.clone())?;

let exec_future = async move { reader.evaluate_stats(field_paths, stats).await }.boxed();
32 changes: 30 additions & 2 deletions vortex-file/src/v2/footer/file_layout.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::sync::Arc;

use itertools::Itertools;
use vortex_dtype::DType;
use vortex_flatbuffers::{footer2 as fb, FlatBufferRoot, WriteFlatBuffer};
use vortex_layout::LayoutData;
@@ -9,11 +10,38 @@ use crate::v2::footer::segment::Segment;
/// Captures the layout information of a Vortex file.
#[derive(Debug, Clone)]
pub struct FileLayout {
pub(crate) root_layout: LayoutData,
pub(crate) segments: Arc<[Segment]>,
root_layout: LayoutData,
segments: Arc<[Segment]>,
}

impl FileLayout {
/// Create a new `FileLayout` from the root layout and segments.
///
/// ## Panics
///
/// Panics if the segments are not ordered by byte offset.
pub fn new(root_layout: LayoutData, segments: Arc<[Segment]>) -> Self {
// Note this assertion is `<=` since we allow zero-length segments
assert!(segments
.iter()
.tuple_windows()
.all(|(a, b)| a.offset <= b.offset));
Self {
root_layout,
segments,
}
}

/// Returns the root [`LayoutData`] of the file.
pub fn root_layout(&self) -> &LayoutData {
&self.root_layout
}

/// Returns the segment map of the file.
pub fn segment_map(&self) -> &Arc<[Segment]> {
&self.segments
}

/// Returns the [`DType`] of the file.
pub fn dtype(&self) -> &DType {
self.root_layout.dtype()
2 changes: 1 addition & 1 deletion vortex-file/src/v2/footer/segment.rs
Original file line number Diff line number Diff line change
@@ -4,7 +4,7 @@ use vortex_flatbuffers::footer2 as fb;

/// The location of a segment within a Vortex file.
#[derive(Clone, Debug)]
pub(crate) struct Segment {
pub struct Segment {
pub(crate) offset: u64,
pub(crate) length: u32,
pub(crate) alignment: Alignment,
214 changes: 131 additions & 83 deletions vortex-file/src/v2/io/file.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::cmp::Ordering;
use std::future::Future;
use std::ops::Range;
use std::sync::Arc;
@@ -7,7 +8,7 @@ use futures::Stream;
use futures_util::future::try_join_all;
use futures_util::{stream, StreamExt};
use vortex_buffer::{ByteBuffer, ByteBufferMut};
use vortex_error::{vortex_err, VortexExpect, VortexResult};
use vortex_error::{vortex_err, vortex_panic, VortexExpect, VortexResult};
use vortex_io::VortexReadAt;
use vortex_layout::segments::SegmentId;

@@ -63,7 +64,7 @@ impl FileSegmentRequest {
struct CoalescedSegmentRequest {
/// The range of the file to read.
pub(crate) byte_range: Range<u64>,
/// The original segment requests.
/// The original segment requests, ordered by segment ID.
pub(crate) requests: Vec<FileSegmentRequest>,
}

@@ -72,112 +73,154 @@ impl<R: VortexReadAt> IoDriver for FileIoDriver<R> {
&self,
stream: impl Stream<Item = SegmentRequest> + 'static,
) -> impl Stream<Item = VortexResult<()>> + 'static {
let segment_map = self.file_layout.segments.clone();
let read = self.read.clone();
let segment_cache1 = self.segment_cache.clone();
let segment_cache2 = self.segment_cache.clone();

stream
// We map the segment requests to their respective locations within the file.
.filter_map(move |request| {
let segment_map = segment_map.clone();
async move {
let Some(location) = segment_map.get(*request.id as usize) else {
request
.callback
.send(Err(vortex_err!("segment not found")))
.map_err(|_| vortex_err!("send failed"))
.vortex_expect("send failed");
return None;
};
Some(FileSegmentRequest {
id: request.id,
location: location.clone(),
callback: request.callback,
})
}
})
// We support zero-length segments (so layouts don't have to store this information)
// If we encounter a zero-length segment, we can just resolve it now.
.filter_map(move |request| async move {
if request.location.length == 0 {
let alignment = request.location.alignment;
request.resolve(Ok(ByteBuffer::empty_aligned(alignment)));
None
} else {
Some(request)
}
})
// Check if the segment exists in the cache
.filter_map(move |request| {
let segment_cache = segment_cache1.clone();
async move {
match segment_cache
.get(request.id, request.location.alignment)
.await
{
Ok(None) => Some(request),
Ok(Some(buffer)) => {
request.resolve(Ok(buffer));
None
}
Err(e) => {
request.resolve(Err(e));
None
}
// We map the segment requests to their respective locations within the file.
let segment_map = self.file_layout.segment_map().clone();
let stream = stream.filter_map(move |request| {
let segment_map = segment_map.clone();
async move {
let Some(location) = segment_map.get(*request.id as usize) else {
request
.callback
.send(Err(vortex_err!("segment not found")))
.map_err(|_| vortex_err!("send failed"))
.vortex_expect("send failed");
return None;
};
Some(FileSegmentRequest {
id: request.id,
location: location.clone(),
callback: request.callback,
})
}
});

// We support zero-length segments (so layouts don't have to store this information)
// If we encounter a zero-length segment, we can just resolve it now.
let stream = stream.filter_map(|request| async move {
if request.location.length == 0 {
let alignment = request.location.alignment;
request.resolve(Ok(ByteBuffer::empty_aligned(alignment)));
None
} else {
Some(request)
}
});

// Check if the segment exists in the cache
let segment_cache = self.segment_cache.clone();
let stream = stream.filter_map(move |request| {
let segment_cache = segment_cache.clone();
async move {
match segment_cache
.get(request.id, request.location.alignment)
.await
{
Ok(None) => Some(request),
Ok(Some(buffer)) => {
request.resolve(Ok(buffer));
None
}
Err(e) => {
request.resolve(Err(e));
None
}
}
})
// Grab all available segment requests from the I/O queue so we get maximal visibility into
// the requests for coalescing.
// Note that we can provide a somewhat arbitrarily high capacity here since we're going to
// deduplicate and coalesce. Meaning the resulting stream will at-most cover the entire
// file and therefore be reasonably bounded.
}
});

// Grab all available segment requests from the I/O queue so we get maximal visibility into
// the requests for coalescing.
// Note that we can provide a somewhat arbitrarily high capacity here since we're going to
// deduplicate and coalesce. Meaning the resulting stream will at-most cover the entire
// file and therefore be reasonably bounded.
let stream = stream
.ready_chunks(1024)
// Coalesce the segment requests to minimize the number of I/O operations.
.map(coalesce)
.flat_map(stream::iter)
// Submit the coalesced requests to the I/O.
.map(move |request| evaluate(read.clone(), request, segment_cache2.clone()))
// Buffer some number of concurrent I/O operations.
.buffer_unordered(self.concurrency)
.inspect(|requests| log::debug!("Processing {} segment requests", requests.len()));

// Coalesce the segment requests to minimize the number of I/O operations.
let stream = stream.map(coalesce).flat_map(stream::iter);

// Submit the coalesced requests to the I/O.
let read = self.read.clone();
let segment_map = self.file_layout.segment_map().clone();
let segment_cache = self.segment_cache.clone();
let stream = stream.map(move |request| {
let read = read.clone();
let segment_map = segment_map.clone();
let segment_cache = segment_cache.clone();
async move {
evaluate(
read.clone(),
request,
segment_map.clone(),
segment_cache.clone(),
)
.await
}
});

// Buffer some number of concurrent I/O operations.
stream.buffer_unordered(self.concurrency)
}
}

async fn evaluate<R: VortexReadAt>(
read: R,
request: CoalescedSegmentRequest,
segment_map: Arc<[Segment]>,
segment_cache: Arc<dyn SegmentCache>,
) -> VortexResult<()> {
log::debug!(
"Reading byte range: {:?} {}",
request.byte_range,
request.byte_range.end - request.byte_range.start
request.byte_range.end - request.byte_range.start,
);
let buffer: ByteBuffer = read
.read_byte_range(
request.byte_range.start,
request.byte_range.end - request.byte_range.start,
)
.await
.map_err(|e| vortex_err!("Failed to read coalesced segment: {:?} {:?}", request, e))?
.await?
.into();

// TODO(ngates): traverse the segment map to find un-requested segments that happen to
// fall within the range of the request. Then we can populate those in the cache.
let mut cache_futures = Vec::with_capacity(request.requests.len());
for req in request.requests {
let offset = usize::try_from(req.location.offset - request.byte_range.start)?;
// Figure out the segments covered by the read.
let start = segment_map.partition_point(|s| s.offset < request.byte_range.start);
let end = segment_map.partition_point(|s| s.offset < request.byte_range.end);

// Note that we may have multiple requests for the same segment.
let mut requests_iter = request.requests.into_iter().peekable();

let mut cache_futures = Vec::with_capacity(end - start);
for (i, segment) in segment_map[start..end].iter().enumerate() {
let segment_id = SegmentId::from(u32::try_from(i + start).vortex_expect("segment id"));
let offset = usize::try_from(segment.offset - request.byte_range.start)?;
let buf = buffer
.slice(offset..offset + req.location.length as usize)
.aligned(req.location.alignment);
.slice(offset..offset + segment.length as usize)
.aligned(segment.alignment);

// Send the callback
req.callback
.send(Ok(buf.clone()))
.map_err(|_| vortex_err!("send failed"))?;
// Find any request callbacks and send the buffer
while let Some(req) = requests_iter.peek() {
// If the request is before the current segment, we should have already resolved it.
match req.id.cmp(&segment_id) {
Ordering::Less => {
// This should never happen, it means we missed a segment request.
vortex_panic!("Skipped segment request");
}
Ordering::Equal => {
// Resolve the request
requests_iter
.next()
.vortex_expect("next request")
.resolve(Ok(buf.clone()));
}
Ordering::Greater => {
// No request for this segment, so we continue
break;
}
}
}

cache_futures.push(segment_cache.put(req.id, buf));
cache_futures.push(segment_cache.put(segment_id, buf));
}

// Populate the cache
@@ -208,6 +251,11 @@ fn coalesce(requests: Vec<FileSegmentRequest>) -> Vec<CoalescedSegmentRequest> {
coalesced.as_mut_slice()[idx].requests.push(req);
}

// Ensure we sort the requests by segment ID within the coalesced request.
for req in coalesced.iter_mut() {
req.requests.sort_unstable_by_key(|r| r.id);
}

coalesced
}

9 changes: 3 additions & 6 deletions vortex-file/src/v2/open/mod.rs
Original file line number Diff line number Diff line change
@@ -148,7 +148,7 @@ impl VortexOpenOptions {
.into_driver();

// Compute the splits of the file.
let splits = self.split_by.splits(&file_layout.root_layout)?.into();
let splits = self.split_by.splits(file_layout.root_layout())?.into();

// Finally, create the VortexFile.
Ok(VortexFile {
@@ -304,10 +304,7 @@ impl VortexOpenOptions {
.ok_or_else(|| vortex_err!("FileLayout missing segments"))?;
let segments = fb_segments.iter().map(Segment::try_from).try_collect()?;

Ok(FileLayout {
root_layout,
segments,
})
Ok(FileLayout::new(root_layout, segments))
}

/// Populate segments in the cache that were covered by the initial read.
@@ -318,7 +315,7 @@ impl VortexOpenOptions {
file_layout: &FileLayout,
segments: &dyn SegmentCache,
) -> VortexResult<()> {
for (idx, segment) in file_layout.segments.iter().enumerate() {
for (idx, segment) in file_layout.segment_map().iter().enumerate() {
if segment.offset < initial_offset {
// Skip segments that aren't in the initial read.
continue;
55 changes: 33 additions & 22 deletions vortex-file/src/v2/tests.rs
Original file line number Diff line number Diff line change
@@ -4,37 +4,48 @@ use vortex_array::array::ChunkedArray;
use vortex_array::stream::ArrayStreamExt;
use vortex_array::{ContextRef, IntoArrayData, IntoArrayVariant};
use vortex_buffer::buffer;
use vortex_error::VortexResult;
use vortex_scan::Scan;
use vortex_error::{VortexExpect, VortexResult};

use crate::v2::io::IoDriver;
use crate::v2::*;

#[test]
fn basic_file_roundtrip() -> VortexResult<()> {
block_on(async {
let array = ChunkedArray::from_iter([
buffer![0, 1, 2].into_array(),
buffer![3, 4, 5].into_array(),
buffer![6, 7, 8].into_array(),
])
.into_array();
fn chunked_file() -> VortexFile<impl IoDriver> {
let array = ChunkedArray::from_iter([
buffer![0, 1, 2].into_array(),
buffer![3, 4, 5].into_array(),
buffer![6, 7, 8].into_array(),
])
.into_array();

block_on(async {
let buffer: Bytes = VortexWriteOptions::default()
.write(vec![], array.into_array_stream())
.await?
.into();

let vxf = VortexOpenOptions::new(ContextRef::default())
VortexOpenOptions::new(ContextRef::default())
.open(buffer)
.await?;
let result = vxf
.scan(Scan::all())?
.into_array_data()
.await?
.into_primitive()?;
.await
})
.vortex_expect("Failed to create test file")
}

assert_eq!(result.as_slice::<i32>(), &[0, 1, 2, 3, 4, 5, 6, 7, 8]);
#[test]
fn basic_file_roundtrip() -> VortexResult<()> {
let vxf = chunked_file();
let result = block_on(vxf.scan(Scan::all())?.into_array_data())?.into_primitive()?;

Ok(())
})
assert_eq!(result.as_slice::<i32>(), &[0, 1, 2, 3, 4, 5, 6, 7, 8]);

Ok(())
}

#[test]
fn file_take() -> VortexResult<()> {
let vxf = chunked_file();
let result =
block_on(vxf.take(buffer![0, 1, 8], Scan::all())?.into_array_data())?.into_primitive()?;

assert_eq!(result.as_slice::<i32>(), &[0, 1, 8]);

Ok(())
}
8 changes: 1 addition & 7 deletions vortex-file/src/v2/writer.rs
Original file line number Diff line number Diff line change
@@ -67,13 +67,7 @@ impl VortexWriteOptions {
// Write the DType + FileLayout segments
let dtype_segment = self.write_flatbuffer(&mut write, stream.dtype()).await?;
let file_layout_segment = self
.write_flatbuffer(
&mut write,
&FileLayout {
root_layout,
segments: segments.into(),
},
)
.write_flatbuffer(&mut write, &FileLayout::new(root_layout, segments.into()))
.await?;

// Assemble the postscript, and write it manually to avoid any framing.
4 changes: 2 additions & 2 deletions vortex-file/src/write/writer.rs
Original file line number Diff line number Diff line change
@@ -255,12 +255,12 @@ impl ColumnWriter {
.flat_map(|(byte_offsets, row_offsets)| {
byte_offsets
.into_iter()
.tuple_windows::<(_, _)>()
.tuple_windows()
.map(|(begin, end)| ByteRange::new(begin, end))
.zip(
row_offsets
.into_iter()
.tuple_windows::<(_, _)>()
.tuple_windows()
.map(|(begin, end)| end - begin),
)
.map(|(range, len)| LayoutSpec::flat(range, len))
45 changes: 40 additions & 5 deletions vortex-layout/src/layouts/flat/eval_expr.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use async_trait::async_trait;
use flatbuffers::root;
use futures::future::try_join_all;
use vortex_array::compute::filter;
use vortex_array::compute::{filter, slice};
use vortex_array::parts::ArrayParts;
use vortex_array::ArrayData;
use vortex_error::{vortex_err, VortexExpect, VortexResult};
@@ -47,11 +47,24 @@ impl ExprEvaluator for FlatReader {

// Decode into an ArrayData.
let array = array_parts.decode(self.ctx(), self.dtype().clone())?;
assert_eq!(
array.len() as u64,
self.row_count(),
"FlatLayout array length mismatch {} != {}",
array.len(),
self.row_count()
);

// And finally apply the expression
// TODO(ngates): what's the best order to apply the filter mask / expression?
let array = expr.evaluate(&array)?;
filter(&array, row_mask.into_filter_mask()?)

// Filter the array based on the row mask.
let begin = usize::try_from(row_mask.begin())
.vortex_expect("RowMask begin must fit within FlatLayout size");
let array = slice(array, begin, begin + row_mask.len())?;
let array = filter(&array, row_mask.filter_mask())?;

// Then apply the expression
expr.evaluate(&array)
}
}

@@ -65,7 +78,7 @@ mod test {
use vortex_array::validity::Validity;
use vortex_array::{ArrayDType, IntoArrayVariant, ToArrayData};
use vortex_buffer::buffer;
use vortex_expr::{gt, lit, Identity};
use vortex_expr::{gt, ident, lit, Identity};
use vortex_scan::RowMask;

use crate::layouts::flat::writer::FlatLayoutWriter;
@@ -122,4 +135,26 @@ mod test {
);
})
}

#[test]
fn flat_unaligned_row_mask() {
block_on(async {
let mut segments = TestSegments::default();
let array = PrimitiveArray::new(buffer![1, 2, 3, 4, 5], Validity::AllValid);
let layout = FlatLayoutWriter::new(array.dtype().clone())
.push_one(&mut segments, array.to_array())
.unwrap();

let result = layout
.reader(Arc::new(segments), Default::default())
.unwrap()
.evaluate_expr(RowMask::new_valid_between(2, 4), ident())
.await
.unwrap()
.into_primitive()
.unwrap();

assert_eq!(result.as_slice::<i32>(), &[3, 4],);
})
}
}
7 changes: 5 additions & 2 deletions vortex-layout/src/layouts/struct_/eval_expr.rs
Original file line number Diff line number Diff line change
@@ -63,7 +63,7 @@ mod tests {
use vortex_buffer::buffer;
use vortex_dtype::PType::I32;
use vortex_dtype::{DType, Field, Nullability, StructDType};
use vortex_expr::{get_item, gt, ident, select};
use vortex_expr::{get_item, gt, ident, pack};
use vortex_scan::RowMask;

use crate::layouts::flat::writer::FlatLayoutWriter;
@@ -156,7 +156,10 @@ mod tests {
let (segments, layout) = struct_layout();

let reader = layout.reader(segments, Default::default()).unwrap();
let expr = select(vec!["a".into(), "b".into()], ident());
let expr = pack(
["a".into(), "b".into()],
vec![get_item("a", ident()), get_item("b", ident())],
);
let result = block_on(reader.evaluate_expr(
// Take rows 0 and 1, skip row 2, and anything after that
RowMask::new(FilterMask::from_iter([true, true, false]), 0),
2 changes: 1 addition & 1 deletion vortex-layout/src/strategies/mod.rs
Original file line number Diff line number Diff line change
@@ -66,7 +66,7 @@ pub trait LayoutWriterExt: LayoutWriter {
impl<L: LayoutWriter> LayoutWriterExt for L {}

/// A trait for creating new layout writers given a DType.
pub trait LayoutStrategy: Send {
pub trait LayoutStrategy: Send + Sync {
fn new_writer(&self, dtype: &DType) -> VortexResult<Box<dyn LayoutWriter>>;
}

62 changes: 26 additions & 36 deletions vortex-scan/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,58 +1,55 @@
mod range_scan;
mod row_mask;

use std::ops::Range;
use std::sync::Arc;

pub use range_scan::*;
pub use row_mask::*;
use vortex_array::compute::FilterMask;
use vortex_array::{ArrayDType, Canonical, IntoArrayData};
use vortex_dtype::DType;
use vortex_error::{vortex_err, VortexResult};
use vortex_expr::{ExprRef, Identity};
use vortex_error::VortexResult;
use vortex_expr::ExprRef;

/// Represents a scan operation to read data from a set of rows, with an optional filter expression,
/// and a projection expression.
///
/// A scan operation can be broken into many [`RangeScan`] operations, each of which leverages
/// shared statistics from the parent [`Scan`] to optimize the order in which filter and projection
/// A scan operation can be broken into many [`RangeScanner`] operations, each of which leverages
/// shared statistics from the parent [`Scanner`] to optimize the order in which filter and projection
/// operations are applied.
///
/// For example, if a filter expression has a top-level `AND` clause, it may be the case that one
/// clause is significantly more selective than the other. In this case, we may want to compute the
/// most selective filter first, then prune rows using result of the filter, before evaluating
/// the second filter over the reduced set of rows.
#[derive(Debug, Clone)]
pub struct Scan {
pub struct Scanner {
#[allow(dead_code)]
dtype: DType,
projection: ExprRef,
filter: Option<ExprRef>,
projection_dtype: DType,
// A sorted list of row indices to include in the scan. We store row indices since they may
// produce a very sparse RowMask.
// take_indices: Vec<u64>,
// statistics: RwLock<Statistics>
}

impl Scan {
impl Scanner {
/// Create a new scan with the given projection and optional filter.
pub fn new(projection: ExprRef, filter: Option<ExprRef>) -> Self {
pub fn new(dtype: DType, projection: ExprRef, filter: Option<ExprRef>) -> VortexResult<Self> {
// TODO(ngates): compute and cache a FieldMask based on the referenced fields.
// Where FieldMask ~= Vec<FieldPath>
Self { projection, filter }
}

/// Convert this scan into an Arc.
pub fn into_arc(self) -> Arc<Self> {
Arc::new(self)
}
let result_dtype = projection
.evaluate(&Canonical::empty(&dtype)?.into_array())?
.dtype()
.clone();

/// Scan all rows with the identity projection.
pub fn all() -> Arc<Self> {
Self {
projection: Identity::new_expr(),
filter: None,
}
.into_arc()
Ok(Self {
dtype,
projection,
filter,
projection_dtype: result_dtype,
})
}

/// Returns the filter expression, if any.
@@ -66,24 +63,17 @@ impl Scan {
}

/// Compute the result dtype of the scan given the input dtype.
pub fn result_dtype(&self, dtype: &DType) -> VortexResult<DType> {
Ok(self
.projection
.evaluate(&Canonical::empty(dtype)?.into_array())?
.dtype()
.clone())
pub fn result_dtype(&self) -> &DType {
&self.projection_dtype
}

/// Instantiate a new scan for a specific range. The range scan will share statistics with this
/// parent scan in order to optimize future range scans.
pub fn range_scan(self: Arc<Self>, range: Range<u64>) -> VortexResult<RangeScan> {
// TODO(ngates): binary search take_indices to compute initial mask.
let length = usize::try_from(range.end - range.start)
.map_err(|_| vortex_err!("Range length must fit within usize"))?;
Ok(RangeScan::new(
pub fn range_scanner(self: Arc<Self>, row_mask: RowMask) -> VortexResult<RangeScanner> {
Ok(RangeScanner::new(
self,
range.start,
FilterMask::new_true(length),
row_mask.begin(),
row_mask.filter_mask().clone(),
))
}
}
29 changes: 17 additions & 12 deletions vortex-scan/src/range_scan.rs
Original file line number Diff line number Diff line change
@@ -3,14 +3,14 @@ use std::ops::{BitAnd, Range};
use std::sync::Arc;

use vortex_array::compute::FilterMask;
use vortex_array::{ArrayData, IntoArrayVariant};
use vortex_array::{ArrayData, Canonical, IntoArrayData, IntoArrayVariant};
use vortex_error::VortexResult;
use vortex_expr::ExprRef;

use crate::{RowMask, Scan};
use crate::{RowMask, Scanner};

pub struct RangeScan {
scan: Arc<Scan>,
pub struct RangeScanner {
scan: Arc<Scanner>,
row_range: Range<u64>,
mask: FilterMask,
state: State,
@@ -56,8 +56,8 @@ pub enum NextOp {
// If instead we make the projection API `project(row_mask, expr)`, then the project API is
// identical to the filter API and there's no point having both. Hence, a single
// `evaluate(row_mask, expr)` API.
impl RangeScan {
pub(crate) fn new(scan: Arc<Scan>, row_offset: u64, mask: FilterMask) -> Self {
impl RangeScanner {
pub(crate) fn new(scan: Arc<Scanner>, row_offset: u64, mask: FilterMask) -> Self {
let state = scan
.filter()
.map(|filter| {
@@ -103,11 +103,16 @@ impl RangeScan {
match &self.state {
State::FilterEval(_) => {
// Intersect the result of the filter expression with our initial row mask.
let mask = result.into_bool()?.boolean_buffer();
let mask = self.mask.to_boolean_buffer()?.bitand(&mask);
let mask = FilterMask::from_buffer(result.into_bool()?.boolean_buffer());
let mask = self.mask.bitand(&mask);
// Then move onto the projection
self.state =
State::Project((FilterMask::from(mask), self.scan.projection().clone()))
if mask.is_empty() {
// If the mask is empty, then we're done.
self.state =
State::Ready(Canonical::empty(self.scan.result_dtype())?.into_array());
} else {
self.state = State::Project((mask, self.scan.projection().clone()))
}
}
State::Project(_) => {
// We're done.
@@ -118,7 +123,7 @@ impl RangeScan {
Ok(())
}

/// Evaluate the [`RangeScan`] operation using a synchronous expression evaluator.
/// Evaluate the [`RangeScanner`] operation using a synchronous expression evaluator.
pub fn evaluate<E>(mut self, evaluator: E) -> VortexResult<ArrayData>
where
E: Fn(RowMask, ExprRef) -> VortexResult<ArrayData>,
@@ -133,7 +138,7 @@ impl RangeScan {
}
}

/// Evaluate the [`RangeScan`] operation using an async expression evaluator.
/// Evaluate the [`RangeScanner`] operation using an async expression evaluator.
pub async fn evaluate_async<E, F>(mut self, evaluator: E) -> VortexResult<ArrayData>
where
E: Fn(RowMask, ExprRef) -> F,
89 changes: 40 additions & 49 deletions vortex-scan/src/row_mask.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
use std::cmp::{max, min};
use std::fmt::{Display, Formatter};
use std::ops::RangeBounds;
use std::ops::{BitAnd, RangeBounds};

use vortex_array::aliases::hash_set::HashSet;
use vortex_array::array::{BoolArray, BooleanBuffer, PrimitiveArray, SparseArray};
use vortex_array::array::{BooleanBuffer, PrimitiveArray, SparseArray};
use vortex_array::compute::{and, filter, slice, try_cast, FilterMask};
use vortex_array::validity::{ArrayValidity, LogicalValidity, Validity};
use vortex_array::{ArrayDType, ArrayData, IntoArrayData, IntoArrayVariant};
@@ -22,12 +21,11 @@ pub struct RowMask {
end: u64,
}

// We don't want to implement full logical equality, this naive equality is sufficient for tests.
#[cfg(test)]
impl PartialEq for RowMask {
fn eq(&self, other: &Self) -> bool {
self.begin == other.begin
&& self.end == other.end
&& self.mask.to_boolean_buffer().unwrap() == other.mask.to_boolean_buffer().unwrap()
self.begin == other.begin && self.end == other.end && self.mask == other.mask
}
}

@@ -106,7 +104,11 @@ impl RowMask {

let mask = FilterMask::from_indices(
length,
indices.as_slice::<u64>().iter().map(|i| *i as usize),
indices
.as_slice::<u64>()
.iter()
.map(|i| *i as usize)
.collect(),
);

Ok(RowMask::new(mask, begin))
@@ -160,18 +162,14 @@ impl RowMask {
}
}

pub fn and_rowmask(&self, other: RowMask) -> VortexResult<Self> {
pub fn and_rowmask(self, other: RowMask) -> VortexResult<Self> {
if other.true_count() == other.len() {
return Ok(self.clone());
return Ok(self);
}

// If both masks align perfectly
if self.begin == other.begin && self.end == other.end {
let this_buffer = self.mask.to_boolean_buffer()?;
let other_buffer = other.mask.to_boolean_buffer()?;

let unified = &this_buffer & (&other_buffer);
return RowMask::from_mask_array(BoolArray::from(unified).as_ref(), self.begin);
return Ok(RowMask::new(self.mask.bitand(&other.mask), self.begin));
}

// Disjoint row ranges
@@ -187,25 +185,21 @@ impl RowMask {
let output_len = usize::try_from(output_end - output_begin)
.map_err(|_| vortex_err!("Range length does not fit into a usize"))?;

let this_buffer = self.mask.to_boolean_buffer()?;
let other_buffer = other.mask.to_boolean_buffer()?;

// TODO(ngates): do not use a set for this. We know both iterators are sorted.
let self_indices = this_buffer
.set_indices()
.map(|v| v as u64 + self.begin)
.collect::<HashSet<_>>();
let other_indices = other_buffer
.set_indices()
.map(|v| v as u64 + other.begin)
.collect::<HashSet<_>>();

let output_mask = FilterMask::from_indices(
let output_mask = FilterMask::from_intersection_indices(
output_len,
self_indices.intersection(&other_indices).copied().map(|i| {
usize::try_from(i - output_begin)
.vortex_expect("we know this must fit within usize")
}),
self.mask
.indices()
.iter()
.copied()
.map(|v| v as u64 + self.begin - output_begin)
.map(|v| usize::try_from(v).vortex_expect("mask index must fit into usize")),
other
.mask
.indices()
.iter()
.copied()
.map(|v| v as u64 + other.begin - output_begin)
.map(|v| usize::try_from(v).vortex_expect("mask index must fit into usize")),
);

Ok(Self::new(output_mask, output_begin))
@@ -231,6 +225,11 @@ impl RowMask {
self.mask.is_empty()
}

/// Returns the [`FilterMask`] whose true values are relative to the range of this `RowMask`.
pub fn filter_mask(&self) -> &FilterMask {
&self.mask
}

/// Limit mask to `[begin..end)` range
pub fn slice(&self, begin: u64, end: u64) -> VortexResult<Self> {
let range_begin = max(self.begin, begin);
@@ -239,13 +238,11 @@ impl RowMask {
if range_begin == self.begin && range_end == self.end {
self.mask.clone()
} else {
FilterMask::from(
self.mask.to_boolean_buffer()?.slice(
usize::try_from(range_begin - self.begin)
.vortex_expect("we know this must fit into usize"),
usize::try_from(range_end - range_begin)
.vortex_expect("we know this must fit into usize"),
),
self.mask.slice(
usize::try_from(range_begin - self.begin)
.vortex_expect("we know this must fit into usize"),
usize::try_from(range_end - range_begin)
.vortex_expect("we know this must fit into usize"),
)
},
range_begin,
@@ -281,15 +278,15 @@ impl RowMask {
return Ok(Some(sliced.clone()));
}

filter(sliced, self.mask.clone()).map(Some)
filter(sliced, &self.mask).map(Some)
}

#[allow(deprecated)]
fn to_indices_array(&self) -> VortexResult<ArrayData> {
Ok(PrimitiveArray::new(
self.mask
.iter_indices()?
.map(|i| i as u64)
.indices()
.iter()
.map(|i| *i as u64)
.collect::<Buffer<u64>>(),
Validity::NonNullable,
)
@@ -312,12 +309,6 @@ impl RowMask {
pub fn true_count(&self) -> usize {
self.mask.true_count()
}

// Convert the [`RowMask`] into a [`FilterMask`].
pub fn into_filter_mask(self) -> VortexResult<FilterMask> {
let offset = self.begin;
Ok(self.shift(offset)?.mask)
}
}

#[cfg(test)]

0 comments on commit 253ccf6

Please sign in to comment.