From f7529eb10bc72eb243423c57960460c4fbfc4dcc Mon Sep 17 00:00:00 2001 From: Adam Gutglick Date: Wed, 15 Jan 2025 11:20:44 +0000 Subject: [PATCH 1/9] chore: assorted cleanups (#1953) 1. remove dead module 2. `thing.zip(thing.skip(1))` -> `tuple_windows` 3. slightly nicer `Scan::range_scan` (same overall behavior, just move clone to inside of the call) --- vortex-array/src/array/varbin/accessor.rs | 5 ++-- vortex-array/src/array/varbin/iter.rs | 32 ----------------------- vortex-file/src/read/layouts/chunked.rs | 5 ++-- vortex-file/src/v2/file.rs | 2 +- vortex-file/src/write/writer.rs | 4 +-- vortex-scan/src/lib.rs | 4 +-- 6 files changed, 11 insertions(+), 41 deletions(-) delete mode 100644 vortex-array/src/array/varbin/iter.rs diff --git a/vortex-array/src/array/varbin/accessor.rs b/vortex-array/src/array/varbin/accessor.rs index e61853fce4..c5d6663e69 100644 --- a/vortex-array/src/array/varbin/accessor.rs +++ b/vortex-array/src/array/varbin/accessor.rs @@ -1,3 +1,4 @@ +use itertools::Itertools; use vortex_dtype::match_each_integer_ptype; use vortex_error::VortexResult; @@ -26,14 +27,14 @@ impl ArrayAccessor<[u8]> for VarBinArray { None => { let mut iter = offsets .iter() - .zip(offsets.iter().skip(1)) + .tuple_windows() .map(|(start, end)| Some(&bytes[*start as usize..*end as usize])); Ok(f(&mut iter)) } Some(validity) => { let mut iter = offsets .iter() - .zip(offsets.iter().skip(1)) + .tuple_windows() .zip(validity.iter()) .map(|((start, end), valid)| { if valid { diff --git a/vortex-array/src/array/varbin/iter.rs b/vortex-array/src/array/varbin/iter.rs deleted file mode 100644 index b7936224d6..0000000000 --- a/vortex-array/src/array/varbin/iter.rs +++ /dev/null @@ -1,32 +0,0 @@ -use arrow_buffer::NullBuffer; -use vortex_dtype::NativePType; - -use crate::ArrayLen; - -pub struct VarBinIter<'a, I> { - bytes: &'a [u8], - indices: &'a [I], - validity: NullBuffer, - idx: usize, -} - -impl<'a, I: NativePType> Iterator for VarBinIter<'a, I> { - type Item = Option<&'a [u8]>; - - fn next(&mut self) -> Option { - if self.idx >= self.indices.len() - 1 { - return None; - } - - if self.validity.is_valid(self.idx) { - let start = self.indices[self.idx]; - let end = self.indices[self.idx + 1]; - let value = Some(&self.bytes[start..end]); - self.idx += 1; - Some(value) - } else { - self.idx += 1; - Some(None) - } - } -} diff --git a/vortex-file/src/read/layouts/chunked.rs b/vortex-file/src/read/layouts/chunked.rs index c20d7fe54d..e0205e3727 100644 --- a/vortex-file/src/read/layouts/chunked.rs +++ b/vortex-file/src/read/layouts/chunked.rs @@ -422,6 +422,7 @@ mod tests { use flatbuffers::{root, FlatBufferBuilder}; use futures_util::io::Cursor; use futures_util::TryStreamExt; + use itertools::Itertools; use vortex_array::array::ChunkedArray; use vortex_array::compute::FilterMask; use vortex_array::{ArrayDType, ArrayLen, IntoArrayData, IntoArrayVariant}; @@ -462,11 +463,11 @@ mod tests { } let flat_layouts = byte_offsets .iter() - .zip(byte_offsets.iter().skip(1)) + .tuple_windows() .zip( row_offsets .iter() - .zip(row_offsets.iter().skip(1)) + .tuple_windows() .map(|(begin, end)| end - begin), ) .map(|((begin, end), len)| write::LayoutSpec::flat(ByteRange::new(*begin, *end), len)) diff --git a/vortex-file/src/v2/file.rs b/vortex-file/src/v2/file.rs index 1e86e78603..7720f9e77e 100644 --- a/vortex-file/src/v2/file.rs +++ b/vortex-file/src/v2/file.rs @@ -64,7 +64,7 @@ impl VortexFile { // Now we give one end of the channel to the layout reader... log::debug!("Starting scan with {} splits", self.splits.len()); let exec_stream = stream::iter(ArcIter::new(self.splits.clone())) - .map(move |row_range| scan.clone().range_scan(row_range)) + .map(move |row_range| scan.range_scan(row_range)) .map(move |range_scan| match range_scan { Ok(range_scan) => { let reader = reader.clone(); diff --git a/vortex-file/src/write/writer.rs b/vortex-file/src/write/writer.rs index b869106e4e..607fea125c 100644 --- a/vortex-file/src/write/writer.rs +++ b/vortex-file/src/write/writer.rs @@ -255,12 +255,12 @@ impl ColumnWriter { .flat_map(|(byte_offsets, row_offsets)| { byte_offsets .into_iter() - .tuple_windows::<(_, _)>() + .tuple_windows() .map(|(begin, end)| ByteRange::new(begin, end)) .zip( row_offsets .into_iter() - .tuple_windows::<(_, _)>() + .tuple_windows() .map(|(begin, end)| end - begin), ) .map(|(range, len)| LayoutSpec::flat(range, len)) diff --git a/vortex-scan/src/lib.rs b/vortex-scan/src/lib.rs index b7ae3cc79c..510895a16d 100644 --- a/vortex-scan/src/lib.rs +++ b/vortex-scan/src/lib.rs @@ -76,12 +76,12 @@ impl Scan { /// Instantiate a new scan for a specific range. The range scan will share statistics with this /// parent scan in order to optimize future range scans. - pub fn range_scan(self: Arc, range: Range) -> VortexResult { + pub fn range_scan(self: &Arc, range: Range) -> VortexResult { // TODO(ngates): binary search take_indices to compute initial mask. let length = usize::try_from(range.end - range.start) .map_err(|_| vortex_err!("Range length must fit within usize"))?; Ok(RangeScan::new( - self, + self.clone(), range.start, FilterMask::new_true(length), )) From 83a72ddf69dec20d0b439bc8ecef19883a9652f5 Mon Sep 17 00:00:00 2001 From: Nicholas Gates Date: Wed, 15 Jan 2025 11:43:44 +0000 Subject: [PATCH 2/9] Cache coalesced segments (#1949) This PR caches all segments that were fetched as part of a coalesced read, even if they weren't explicitly requested --- vortex-file/src/v2/io/file.rs | 193 ++++++++++++++++++++-------------- 1 file changed, 115 insertions(+), 78 deletions(-) diff --git a/vortex-file/src/v2/io/file.rs b/vortex-file/src/v2/io/file.rs index 8cf438dcca..f40818e232 100644 --- a/vortex-file/src/v2/io/file.rs +++ b/vortex-file/src/v2/io/file.rs @@ -1,4 +1,5 @@ use std::future::Future; +use std::iter; use std::ops::Range; use std::sync::Arc; @@ -72,112 +73,148 @@ impl IoDriver for FileIoDriver { &self, stream: impl Stream + 'static, ) -> impl Stream> + 'static { + // We map the segment requests to their respective locations within the file. let segment_map = self.file_layout.segments.clone(); - let read = self.read.clone(); - let segment_cache1 = self.segment_cache.clone(); - let segment_cache2 = self.segment_cache.clone(); - - stream - // We map the segment requests to their respective locations within the file. - .filter_map(move |request| { - let segment_map = segment_map.clone(); - async move { - let Some(location) = segment_map.get(*request.id as usize) else { - request - .callback - .send(Err(vortex_err!("segment not found"))) - .map_err(|_| vortex_err!("send failed")) - .vortex_expect("send failed"); - return None; - }; - Some(FileSegmentRequest { - id: request.id, - location: location.clone(), - callback: request.callback, - }) - } - }) - // We support zero-length segments (so layouts don't have to store this information) - // If we encounter a zero-length segment, we can just resolve it now. - .filter_map(move |request| async move { - if request.location.length == 0 { - let alignment = request.location.alignment; - request.resolve(Ok(ByteBuffer::empty_aligned(alignment))); - None - } else { - Some(request) - } - }) - // Check if the segment exists in the cache - .filter_map(move |request| { - let segment_cache = segment_cache1.clone(); - async move { - match segment_cache - .get(request.id, request.location.alignment) - .await - { - Ok(None) => Some(request), - Ok(Some(buffer)) => { - request.resolve(Ok(buffer)); - None - } - Err(e) => { - request.resolve(Err(e)); - None - } + let stream = stream.filter_map(move |request| { + let segment_map = segment_map.clone(); + async move { + let Some(location) = segment_map.get(*request.id as usize) else { + request + .callback + .send(Err(vortex_err!("segment not found"))) + .map_err(|_| vortex_err!("send failed")) + .vortex_expect("send failed"); + return None; + }; + Some(FileSegmentRequest { + id: request.id, + location: location.clone(), + callback: request.callback, + }) + } + }); + + // We support zero-length segments (so layouts don't have to store this information) + // If we encounter a zero-length segment, we can just resolve it now. + let stream = stream.filter_map(|request| async move { + if request.location.length == 0 { + let alignment = request.location.alignment; + request.resolve(Ok(ByteBuffer::empty_aligned(alignment))); + None + } else { + Some(request) + } + }); + + // Check if the segment exists in the cache + let segment_cache = self.segment_cache.clone(); + let stream = stream.filter_map(move |request| { + let segment_cache = segment_cache.clone(); + async move { + match segment_cache + .get(request.id, request.location.alignment) + .await + { + Ok(None) => Some(request), + Ok(Some(buffer)) => { + request.resolve(Ok(buffer)); + None + } + Err(e) => { + request.resolve(Err(e)); + None } } - }) - // Grab all available segment requests from the I/O queue so we get maximal visibility into - // the requests for coalescing. - // Note that we can provide a somewhat arbitrarily high capacity here since we're going to - // deduplicate and coalesce. Meaning the resulting stream will at-most cover the entire - // file and therefore be reasonably bounded. + } + }); + + // Grab all available segment requests from the I/O queue so we get maximal visibility into + // the requests for coalescing. + // Note that we can provide a somewhat arbitrarily high capacity here since we're going to + // deduplicate and coalesce. Meaning the resulting stream will at-most cover the entire + // file and therefore be reasonably bounded. + let stream = stream .ready_chunks(1024) - // Coalesce the segment requests to minimize the number of I/O operations. - .map(coalesce) - .flat_map(stream::iter) - // Submit the coalesced requests to the I/O. - .map(move |request| evaluate(read.clone(), request, segment_cache2.clone())) - // Buffer some number of concurrent I/O operations. - .buffer_unordered(self.concurrency) + .inspect(|requests| log::debug!("Processing {} segment requests", requests.len())); + + // Coalesce the segment requests to minimize the number of I/O operations. + let stream = stream.map(coalesce).flat_map(stream::iter); + + // Submit the coalesced requests to the I/O. + let read = self.read.clone(); + let segment_map = self.file_layout.segments.clone(); + let segment_cache = self.segment_cache.clone(); + let stream = stream.map(move |request| { + let read = read.clone(); + let segment_map = segment_map.clone(); + let segment_cache = segment_cache.clone(); + async move { + evaluate( + read.clone(), + request, + segment_map.clone(), + segment_cache.clone(), + ) + .await + } + }); + + // Buffer some number of concurrent I/O operations. + stream.buffer_unordered(self.concurrency) } } async fn evaluate( read: R, request: CoalescedSegmentRequest, + segment_map: Arc<[Segment]>, segment_cache: Arc, ) -> VortexResult<()> { log::debug!( "Reading byte range: {:?} {}", request.byte_range, - request.byte_range.end - request.byte_range.start + request.byte_range.end - request.byte_range.start, ); let buffer: ByteBuffer = read .read_byte_range( request.byte_range.start, request.byte_range.end - request.byte_range.start, ) - .await - .map_err(|e| vortex_err!("Failed to read coalesced segment: {:?} {:?}", request, e))? + .await? .into(); - // TODO(ngates): traverse the segment map to find un-requested segments that happen to - // fall within the range of the request. Then we can populate those in the cache. - let mut cache_futures = Vec::with_capacity(request.requests.len()); + // Figure out the segments covered by the read. + let start = segment_map.partition_point(|s| s.offset < request.byte_range.start); + let end = segment_map.partition_point(|s| s.offset < request.byte_range.end); + + let mut requests = iter::repeat_with(|| None) + .take(end - start) + .collect::>(); for req in request.requests { - let offset = usize::try_from(req.location.offset - request.byte_range.start)?; + let id = *req.id as usize; + requests[id - start] = Some(req); + } + + let mut cache_futures = Vec::with_capacity(requests.len()); + for (i, (segment, maybe_req)) in segment_map[start..end] + .iter() + .zip(requests.into_iter()) + .enumerate() + { + let offset = usize::try_from(segment.offset - request.byte_range.start)?; let buf = buffer - .slice(offset..offset + req.location.length as usize) - .aligned(req.location.alignment); + .slice(offset..offset + segment.length as usize) + .aligned(segment.alignment); // Send the callback - req.callback - .send(Ok(buf.clone())) - .map_err(|_| vortex_err!("send failed"))?; + if let Some(req) = maybe_req { + req.callback + .send(Ok(buf.clone())) + .map_err(|_| vortex_err!("send failed"))?; + } - cache_futures.push(segment_cache.put(req.id, buf)); + let id = SegmentId::from(u32::try_from(i + start).vortex_expect("segment id")); + cache_futures.push(segment_cache.put(id, buf)); } // Populate the cache From de6d63b18408d2683f3f407099909c3cc82843f6 Mon Sep 17 00:00:00 2001 From: Joe Isaacs Date: Wed, 15 Jan 2025 12:05:45 +0000 Subject: [PATCH 3/9] Check partition count in expression partition logic (#1944) This is a performance check, so only on debug builds --- vortex-expr/src/transform/partition.rs | 17 +++++++++++++---- vortex-layout/src/layouts/struct_/eval_expr.rs | 7 +++++-- 2 files changed, 18 insertions(+), 6 deletions(-) diff --git a/vortex-expr/src/transform/partition.rs b/vortex-expr/src/transform/partition.rs index 779d9713c8..c6d18338b4 100644 --- a/vortex-expr/src/transform/partition.rs +++ b/vortex-expr/src/transform/partition.rs @@ -127,11 +127,9 @@ impl<'a> Folder<'a> for ImmediateIdentityAccessesAnalysis<'a> { .collect_vec(); accesses.into_iter().for_each(|c| { + let accesses = self.sub_expressions.entry(node).or_default(); if let Some(fields) = c { - self.sub_expressions - .entry(node) - .or_default() - .extend(fields.iter().cloned()); + accesses.extend(fields.iter().cloned()); } }); @@ -159,6 +157,11 @@ impl<'a> StructFieldExpressionSplitter<'a> { let mut expr_top_level_ref = ImmediateIdentityAccessesAnalysis::new(scope_dtype); expr.accept_with_context(&mut expr_top_level_ref, ())?; + let expression_accesses = expr_top_level_ref + .sub_expressions + .get(&expr) + .map(|ac| ac.len()); + let mut splitter = StructFieldExpressionSplitter::new(expr_top_level_ref.sub_expressions, scope_dtype); @@ -178,6 +181,12 @@ impl<'a> StructFieldExpressionSplitter<'a> { }) .collect(); + // Ensure that there are not more accesses than partitions, we missed something + assert!(expression_accesses.unwrap_or(0) <= partitions.len()); + // Ensure that there are as many partitions as there are accesses/fields in the scope, + // this will affect performance, not correctness. + debug_assert_eq!(expression_accesses.unwrap_or(0), partitions.len()); + Ok(PartitionedExpr { root: split.result(), partitions: partitions.into_boxed_slice(), diff --git a/vortex-layout/src/layouts/struct_/eval_expr.rs b/vortex-layout/src/layouts/struct_/eval_expr.rs index d383a21217..e9373e7651 100644 --- a/vortex-layout/src/layouts/struct_/eval_expr.rs +++ b/vortex-layout/src/layouts/struct_/eval_expr.rs @@ -63,7 +63,7 @@ mod tests { use vortex_buffer::buffer; use vortex_dtype::PType::I32; use vortex_dtype::{DType, Field, Nullability, StructDType}; - use vortex_expr::{get_item, gt, ident, select}; + use vortex_expr::{get_item, gt, ident, pack}; use vortex_scan::RowMask; use crate::layouts::flat::writer::FlatLayoutWriter; @@ -156,7 +156,10 @@ mod tests { let (segments, layout) = struct_layout(); let reader = layout.reader(segments, Default::default()).unwrap(); - let expr = select(vec!["a".into(), "b".into()], ident()); + let expr = pack( + ["a".into(), "b".into()], + vec![get_item("a", ident()), get_item("b", ident())], + ); let result = block_on(reader.evaluate_expr( // Take rows 0 and 1, skip row 2, and anything after that RowMask::new(FilterMask::from_iter([true, true, false]), 0), From 9cf1eec425cccaf5b80085fb425e30c82d95f800 Mon Sep 17 00:00:00 2001 From: Nicholas Gates Date: Wed, 15 Jan 2025 12:58:32 +0000 Subject: [PATCH 4/9] Add take implementation (#1955) Allow scanning with a `Buffer` row indices. --- bench-vortex/benches/random_access.rs | 25 +++++---- bench-vortex/src/reader.rs | 21 ++++---- vortex-file/src/v2/file.rs | 75 ++++++++++++++++++++++++--- vortex-file/src/v2/tests.rs | 54 +++++++++++-------- vortex-scan/src/lib.rs | 15 ++---- 5 files changed, 133 insertions(+), 57 deletions(-) diff --git a/bench-vortex/benches/random_access.rs b/bench-vortex/benches/random_access.rs index dc39ab6d56..e6c0cfafe7 100644 --- a/bench-vortex/benches/random_access.rs +++ b/bench-vortex/benches/random_access.rs @@ -11,12 +11,11 @@ use object_store::aws::AmazonS3Builder; use object_store::local::LocalFileSystem; use object_store::ObjectStore; use tokio::runtime::Runtime; +use vortex::buffer::buffer; #[global_allocator] static GLOBAL: MiMalloc = MiMalloc; -const INDICES: [u64; 6] = [10, 11, 12, 13, 100_000, 3_000_000]; - /// Benchmarks against object stores require setting /// * AWS_ACCESS_KEY_ID /// * AWS_SECRET_ACCESS_KEY @@ -25,12 +24,19 @@ const INDICES: [u64; 6] = [10, 11, 12, 13, 100_000, 3_000_000]; /// /// environment variables and assume files to read are already present fn random_access_vortex(c: &mut Criterion) { + let indices = buffer![10, 11, 12, 13, 100_000, 3_000_000]; + let mut group = c.benchmark_group("random-access"); let taxi_vortex = taxi_data_vortex(); group.bench_function("vortex-tokio-local-disk", |b| { - b.to_async(Runtime::new().unwrap()) - .iter(|| async { black_box(take_vortex_tokio(&taxi_vortex, &INDICES).await.unwrap()) }) + b.to_async(Runtime::new().unwrap()).iter(|| async { + black_box( + take_vortex_tokio(&taxi_vortex, indices.clone()) + .await + .unwrap(), + ) + }) }); group.bench_function("vortex-local-fs", |b| { @@ -38,7 +44,7 @@ fn random_access_vortex(c: &mut Criterion) { let local_fs_path = object_store::path::Path::from_filesystem_path(&taxi_vortex).unwrap(); b.to_async(Runtime::new().unwrap()).iter(|| async { black_box( - take_vortex_object_store(local_fs.clone(), local_fs_path.clone(), &INDICES) + take_vortex_object_store(local_fs.clone(), local_fs_path.clone(), indices.clone()) .await .unwrap(), ) @@ -50,8 +56,9 @@ fn random_access_vortex(c: &mut Criterion) { let taxi_parquet = taxi_data_parquet(); group.bench_function("parquet-tokio-local-disk", |b| { - b.to_async(Runtime::new().unwrap()) - .iter(|| async { black_box(take_parquet(&taxi_parquet, &INDICES).await.unwrap()) }) + b.to_async(Runtime::new().unwrap()).iter(|| async { + black_box(take_parquet(&taxi_parquet, indices.clone()).await.unwrap()) + }) }); if env::var("AWS_ACCESS_KEY_ID").is_ok() { @@ -65,7 +72,7 @@ fn random_access_vortex(c: &mut Criterion) { b.to_async(Runtime::new().unwrap()).iter(|| async { black_box( - take_vortex_object_store(r2_fs.clone(), r2_path.clone(), &INDICES) + take_vortex_object_store(r2_fs.clone(), r2_path.clone(), indices.clone()) .await .unwrap(), ) @@ -81,7 +88,7 @@ fn random_access_vortex(c: &mut Criterion) { b.to_async(Runtime::new().unwrap()).iter(|| async { black_box( - take_parquet_object_store(r2_fs.clone(), &r2_parquet_path, &INDICES) + take_parquet_object_store(r2_fs.clone(), &r2_parquet_path, indices.clone()) .await .unwrap(), ) diff --git a/bench-vortex/src/reader.rs b/bench-vortex/src/reader.rs index f9e701230e..106d4606ce 100644 --- a/bench-vortex/src/reader.rs +++ b/bench-vortex/src/reader.rs @@ -24,6 +24,7 @@ use stream::StreamExt; use vortex::aliases::hash_map::HashMap; use vortex::array::ChunkedArray; use vortex::arrow::FromArrowType; +use vortex::buffer::Buffer; use vortex::compress::CompressionStrategy; use vortex::dtype::DType; use vortex::error::VortexResult; @@ -106,14 +107,12 @@ pub fn write_csv_as_parquet(csv_path: PathBuf, output_path: &Path) -> VortexResu async fn take_vortex( reader: T, - _indices: &[u64], + indices: Buffer, ) -> VortexResult { VortexOpenOptions::new(ALL_ENCODINGS_CONTEXT.clone()) .open(reader) .await? - // FIXME(ngates): support row indices - // .scan_rows(Scan::all(), indices.iter().copied())? - .scan(Scan::all())? + .take(indices, Scan::all())? .into_array_data() .await? // For equivalence.... we decompress to make sure we're not cheating too much. @@ -124,33 +123,33 @@ async fn take_vortex( pub async fn take_vortex_object_store( fs: Arc, path: object_store::path::Path, - indices: &[u64], + indices: Buffer, ) -> VortexResult { take_vortex(ObjectStoreReadAt::new(fs.clone(), path), indices).await } -pub async fn take_vortex_tokio(path: &Path, indices: &[u64]) -> VortexResult { +pub async fn take_vortex_tokio(path: &Path, indices: Buffer) -> VortexResult { take_vortex(TokioFile::open(path)?, indices).await } pub async fn take_parquet_object_store( fs: Arc, path: &object_store::path::Path, - indices: &[u64], + indices: Buffer, ) -> VortexResult { let meta = fs.head(path).await?; let reader = ParquetObjectReader::new(fs, meta); parquet_take_from_stream(reader, indices).await } -pub async fn take_parquet(path: &Path, indices: &[u64]) -> VortexResult { +pub async fn take_parquet(path: &Path, indices: Buffer) -> VortexResult { let file = tokio::fs::File::open(path).await?; parquet_take_from_stream(file, indices).await } async fn parquet_take_from_stream( async_reader: T, - indices: &[u64], + indices: Buffer, ) -> VortexResult { let builder = ParquetRecordBatchStreamBuilder::new_with_options( async_reader, @@ -175,12 +174,12 @@ async fn parquet_take_from_stream( for idx in indices { let row_group_idx = row_group_offsets - .binary_search(&(*idx as i64)) + .binary_search(&(idx as i64)) .unwrap_or_else(|e| e - 1); row_groups .entry(row_group_idx) .or_insert_with(Vec::new) - .push((*idx as i64) - row_group_offsets[row_group_idx]); + .push((idx as i64) - row_group_offsets[row_group_idx]); } let row_group_indices = row_groups .keys() diff --git a/vortex-file/src/v2/file.rs b/vortex-file/src/v2/file.rs index 7720f9e77e..49728b08dd 100644 --- a/vortex-file/src/v2/file.rs +++ b/vortex-file/src/v2/file.rs @@ -6,12 +6,14 @@ use std::task::{Context, Poll}; use futures::Stream; use futures_util::{stream, FutureExt, StreamExt, TryStreamExt}; use pin_project_lite::pin_project; +use vortex_array::compute::FilterMask; use vortex_array::stream::{ArrayStream, ArrayStreamAdapter}; use vortex_array::{ArrayData, ContextRef}; +use vortex_buffer::Buffer; use vortex_dtype::DType; -use vortex_error::{vortex_err, VortexResult}; +use vortex_error::{vortex_bail, vortex_err, VortexExpect, VortexResult}; use vortex_layout::{ExprEvaluator, LayoutReader}; -use vortex_scan::Scan; +use vortex_scan::{RowMask, Scan}; use crate::v2::exec::ExecDriver; use crate::v2::io::IoDriver; @@ -50,6 +52,69 @@ impl VortexFile { /// Performs a scan operation over the file. pub fn scan(&self, scan: Arc) -> VortexResult> { + self.scan_with_masks( + ArcIter::new(self.splits.clone()) + .map(|row_range| RowMask::new_valid_between(row_range.start, row_range.end)), + scan, + ) + } + + /// Takes the given rows while also applying the filter and projection functions from a scan. + /// The row indices must be sorted. + pub fn take( + &self, + row_indices: Buffer, + scan: Arc, + ) -> VortexResult> { + if !row_indices.windows(2).all(|w| w[0] <= w[1]) { + vortex_bail!("row indices must be sorted") + } + + let row_masks = ArcIter::new(self.splits.clone()).filter_map(move |row_range| { + // Quickly short-circuit if the row range is outside the bounds of the row indices. + if row_range.end <= row_indices[0] + || row_indices + .last() + .is_some_and(|&last| row_range.start >= last) + { + return None; + } + + // For the given row range, find the indices that are within the row_indices. + let start_idx = row_indices + .binary_search(&row_range.start) + .unwrap_or_else(|x| x); + let end_idx = row_indices + .binary_search(&row_range.end) + .unwrap_or_else(|x| x); + + if start_idx == end_idx { + // No rows in range + return None; + } + + // Construct a row mask for the range. + let filter_mask = FilterMask::from_indices( + usize::try_from(row_range.end - row_range.start) + .vortex_expect("Split ranges are within usize"), + row_indices[start_idx..end_idx].iter().map(|&idx| { + usize::try_from(idx - row_range.start).vortex_expect("index within range") + }), + ); + Some(RowMask::new(filter_mask, row_range.start)) + }); + + self.scan_with_masks(row_masks, scan) + } + + fn scan_with_masks( + &self, + row_masks: R, + scan: Arc, + ) -> VortexResult> + where + R: Iterator + Send + 'static, + { let result_dtype = scan.result_dtype(self.dtype())?; // Set up a segment channel to collect segment requests from the execution stream. @@ -62,10 +127,8 @@ impl VortexFile { .reader(segment_channel.reader(), self.ctx.clone())?; // Now we give one end of the channel to the layout reader... - log::debug!("Starting scan with {} splits", self.splits.len()); - let exec_stream = stream::iter(ArcIter::new(self.splits.clone())) - .map(move |row_range| scan.range_scan(row_range)) - .map(move |range_scan| match range_scan { + let exec_stream = stream::iter(row_masks) + .map(move |row_mask| match scan.clone().range_scan(row_mask) { Ok(range_scan) => { let reader = reader.clone(); async move { diff --git a/vortex-file/src/v2/tests.rs b/vortex-file/src/v2/tests.rs index c1cf92abd6..e2c8810b01 100644 --- a/vortex-file/src/v2/tests.rs +++ b/vortex-file/src/v2/tests.rs @@ -4,37 +4,49 @@ use vortex_array::array::ChunkedArray; use vortex_array::stream::ArrayStreamExt; use vortex_array::{ContextRef, IntoArrayData, IntoArrayVariant}; use vortex_buffer::buffer; -use vortex_error::VortexResult; +use vortex_error::{VortexExpect, VortexResult}; use vortex_scan::Scan; +use crate::v2::io::IoDriver; use crate::v2::*; -#[test] -fn basic_file_roundtrip() -> VortexResult<()> { - block_on(async { - let array = ChunkedArray::from_iter([ - buffer![0, 1, 2].into_array(), - buffer![3, 4, 5].into_array(), - buffer![6, 7, 8].into_array(), - ]) - .into_array(); +fn chunked_file() -> VortexFile { + let array = ChunkedArray::from_iter([ + buffer![0, 1, 2].into_array(), + buffer![3, 4, 5].into_array(), + buffer![6, 7, 8].into_array(), + ]) + .into_array(); + block_on(async { let buffer: Bytes = VortexWriteOptions::default() .write(vec![], array.into_array_stream()) .await? .into(); - - let vxf = VortexOpenOptions::new(ContextRef::default()) + VortexOpenOptions::new(ContextRef::default()) .open(buffer) - .await?; - let result = vxf - .scan(Scan::all())? - .into_array_data() - .await? - .into_primitive()?; + .await + }) + .vortex_expect("Failed to create test file") +} - assert_eq!(result.as_slice::(), &[0, 1, 2, 3, 4, 5, 6, 7, 8]); +#[test] +fn basic_file_roundtrip() -> VortexResult<()> { + let vxf = chunked_file(); + let result = block_on(vxf.scan(Scan::all())?.into_array_data())?.into_primitive()?; - Ok(()) - }) + assert_eq!(result.as_slice::(), &[0, 1, 2, 3, 4, 5, 6, 7, 8]); + + Ok(()) +} + +#[test] +fn file_take() -> VortexResult<()> { + let vxf = chunked_file(); + let result = + block_on(vxf.take(buffer![0, 1, 8], Scan::all())?.into_array_data())?.into_primitive()?; + + assert_eq!(result.as_slice::(), &[0, 1, 8]); + + Ok(()) } diff --git a/vortex-scan/src/lib.rs b/vortex-scan/src/lib.rs index 510895a16d..2974fb8a98 100644 --- a/vortex-scan/src/lib.rs +++ b/vortex-scan/src/lib.rs @@ -1,15 +1,13 @@ mod range_scan; mod row_mask; -use std::ops::Range; use std::sync::Arc; pub use range_scan::*; pub use row_mask::*; -use vortex_array::compute::FilterMask; use vortex_array::{ArrayDType, Canonical, IntoArrayData}; use vortex_dtype::DType; -use vortex_error::{vortex_err, VortexResult}; +use vortex_error::VortexResult; use vortex_expr::{ExprRef, Identity}; /// Represents a scan operation to read data from a set of rows, with an optional filter expression, @@ -76,14 +74,11 @@ impl Scan { /// Instantiate a new scan for a specific range. The range scan will share statistics with this /// parent scan in order to optimize future range scans. - pub fn range_scan(self: &Arc, range: Range) -> VortexResult { - // TODO(ngates): binary search take_indices to compute initial mask. - let length = usize::try_from(range.end - range.start) - .map_err(|_| vortex_err!("Range length must fit within usize"))?; + pub fn range_scan(self: Arc, row_mask: RowMask) -> VortexResult { Ok(RangeScan::new( - self.clone(), - range.start, - FilterMask::new_true(length), + self, + row_mask.begin(), + row_mask.into_filter_mask()?, )) } } From 3ecb285005609ef7614e8183b670108d92830ee2 Mon Sep 17 00:00:00 2001 From: Robert Kruszewski Date: Wed, 15 Jan 2025 14:11:39 +0000 Subject: [PATCH 5/9] fix: Always save TPC-H data in bench-vortex/data (#1957) --- bench-vortex/src/tpch/dbgen.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/bench-vortex/src/tpch/dbgen.rs b/bench-vortex/src/tpch/dbgen.rs index 12d3dfdf1e..63eb125e2f 100644 --- a/bench-vortex/src/tpch/dbgen.rs +++ b/bench-vortex/src/tpch/dbgen.rs @@ -13,6 +13,8 @@ use itertools::Itertools; use tar::Archive; use xshell::Shell; +use crate::IdempotentPath; + pub struct DBGen { options: DBGenOptions, } @@ -37,7 +39,7 @@ impl Default for DBGenOptions { fn default() -> Self { Self { scale_factor: 1, - base_dir: std::env::current_dir().unwrap().join("data").join("tpch"), + base_dir: "tpch".to_data_path(), cache_dir: homedir::my_home() .unwrap() .unwrap() From f30b8dd6a6beca84ed41d548181331ccf9d9ffd7 Mon Sep 17 00:00:00 2001 From: Nicholas Gates Date: Wed, 15 Jan 2025 14:12:19 +0000 Subject: [PATCH 6/9] FilterMask Optimizations (#1950) Allow FilterMask to be cheaply created from various formats, and add specialized implementations of `Eq` and `BitAnd`. Fixes https://github.com/spiraldb/vortex/issues/1848 --- encodings/alp/src/alp/compute/mod.rs | 4 +- encodings/alp/src/alp_rd/compute/filter.rs | 17 +- .../datetime-parts/src/compute/filter.rs | 6 +- encodings/dict/src/compute/mod.rs | 2 +- .../src/bitpacking/compute/filter.rs | 28 +- encodings/fastlanes/src/for/compute/mod.rs | 2 +- encodings/fsst/src/compute/mod.rs | 4 +- encodings/fsst/tests/fsst_tests.rs | 2 +- encodings/runend/src/compute/mod.rs | 12 +- encodings/zigzag/src/compute.rs | 4 +- fuzz/fuzz_targets/array_ops.rs | 2 +- pyvortex/src/array.rs | 2 +- vortex-array/src/array/bool/compute/filter.rs | 14 +- .../src/array/chunked/compute/filter.rs | 62 +- .../src/array/constant/compute/mod.rs | 2 +- vortex-array/src/array/list/mod.rs | 2 +- .../src/array/primitive/compute/filter.rs | 10 +- vortex-array/src/array/sparse/compute/mod.rs | 8 +- vortex-array/src/array/struct_/compute.rs | 11 +- .../src/array/varbin/compute/filter.rs | 26 +- vortex-array/src/compute/filter.rs | 613 +++++++++++++----- vortex-array/src/patches.rs | 7 +- vortex-array/src/validity.rs | 2 +- vortex-file/src/read/mask.rs | 55 +- vortex-layout/src/layouts/flat/eval_expr.rs | 2 +- vortex-scan/src/range_scan.rs | 7 +- vortex-scan/src/row_mask.rs | 78 +-- 27 files changed, 613 insertions(+), 371 deletions(-) diff --git a/encodings/alp/src/alp/compute/mod.rs b/encodings/alp/src/alp/compute/mod.rs index 74c0546ff6..1a8b147c09 100644 --- a/encodings/alp/src/alp/compute/mod.rs +++ b/encodings/alp/src/alp/compute/mod.rs @@ -79,10 +79,10 @@ impl SliceFn for ALPEncoding { } impl FilterFn for ALPEncoding { - fn filter(&self, array: &ALPArray, mask: FilterMask) -> VortexResult { + fn filter(&self, array: &ALPArray, mask: &FilterMask) -> VortexResult { let patches = array .patches() - .map(|p| p.filter(mask.clone())) + .map(|p| p.filter(mask)) .transpose()? .flatten(); diff --git a/encodings/alp/src/alp_rd/compute/filter.rs b/encodings/alp/src/alp_rd/compute/filter.rs index 5bd9981f32..f0811e1801 100644 --- a/encodings/alp/src/alp_rd/compute/filter.rs +++ b/encodings/alp/src/alp_rd/compute/filter.rs @@ -5,16 +5,16 @@ use vortex_error::VortexResult; use crate::{ALPRDArray, ALPRDEncoding}; impl FilterFn for ALPRDEncoding { - fn filter(&self, array: &ALPRDArray, mask: FilterMask) -> VortexResult { + fn filter(&self, array: &ALPRDArray, mask: &FilterMask) -> VortexResult { let left_parts_exceptions = array .left_parts_patches() - .map(|patches| patches.filter(mask.clone())) + .map(|patches| patches.filter(mask)) .transpose()? .flatten(); Ok(ALPRDArray::try_new( array.dtype().clone(), - filter(&array.left_parts(), mask.clone())?, + filter(&array.left_parts(), mask)?, array.left_parts_dict(), filter(&array.right_parts(), mask)?, array.right_bit_width(), @@ -46,10 +46,13 @@ mod test { assert!(encoded.left_parts_patches().is_some()); // The first two values need no patching - let filtered = filter(encoded.as_ref(), FilterMask::from_iter([true, false, true])) - .unwrap() - .into_primitive() - .unwrap(); + let filtered = filter( + encoded.as_ref(), + &FilterMask::from_iter([true, false, true]), + ) + .unwrap() + .into_primitive() + .unwrap(); assert_eq!(filtered.as_slice::(), &[a, outlier]); } } diff --git a/encodings/datetime-parts/src/compute/filter.rs b/encodings/datetime-parts/src/compute/filter.rs index c7c337a986..da768fc36a 100644 --- a/encodings/datetime-parts/src/compute/filter.rs +++ b/encodings/datetime-parts/src/compute/filter.rs @@ -5,11 +5,11 @@ use vortex_error::VortexResult; use crate::{DateTimePartsArray, DateTimePartsEncoding}; impl FilterFn for DateTimePartsEncoding { - fn filter(&self, array: &DateTimePartsArray, mask: FilterMask) -> VortexResult { + fn filter(&self, array: &DateTimePartsArray, mask: &FilterMask) -> VortexResult { Ok(DateTimePartsArray::try_new( array.dtype().clone(), - filter(array.days().as_ref(), mask.clone())?, - filter(array.seconds().as_ref(), mask.clone())?, + filter(array.days().as_ref(), mask)?, + filter(array.seconds().as_ref(), mask)?, filter(array.subsecond().as_ref(), mask)?, )? .into_array()) diff --git a/encodings/dict/src/compute/mod.rs b/encodings/dict/src/compute/mod.rs index 79cfb45ab7..be979c7bf4 100644 --- a/encodings/dict/src/compute/mod.rs +++ b/encodings/dict/src/compute/mod.rs @@ -60,7 +60,7 @@ impl TakeFn for DictEncoding { } impl FilterFn for DictEncoding { - fn filter(&self, array: &DictArray, mask: FilterMask) -> VortexResult { + fn filter(&self, array: &DictArray, mask: &FilterMask) -> VortexResult { let codes = filter(&array.codes(), mask)?; DictArray::try_new(codes, array.values()).map(|a| a.into_array()) } diff --git a/encodings/fastlanes/src/bitpacking/compute/filter.rs b/encodings/fastlanes/src/bitpacking/compute/filter.rs index 86b072bdfa..5b9821d2a0 100644 --- a/encodings/fastlanes/src/bitpacking/compute/filter.rs +++ b/encodings/fastlanes/src/bitpacking/compute/filter.rs @@ -13,7 +13,7 @@ use crate::bitpacking::compute::take::UNPACK_CHUNK_THRESHOLD; use crate::{BitPackedArray, BitPackedEncoding}; impl FilterFn for BitPackedEncoding { - fn filter(&self, array: &BitPackedArray, mask: FilterMask) -> VortexResult { + fn filter(&self, array: &BitPackedArray, mask: &FilterMask) -> VortexResult { let primitive = match_each_unsigned_integer_ptype!(array.ptype().to_unsigned(), |$I| { filter_primitive::<$I>(array, mask) }); @@ -31,13 +31,13 @@ impl FilterFn for BitPackedEncoding { /// dictates the final `PType` of the result. fn filter_primitive( array: &BitPackedArray, - mask: FilterMask, + mask: &FilterMask, ) -> VortexResult { - let validity = array.validity().filter(&mask)?; + let validity = array.validity().filter(mask)?; let patches = array .patches() - .map(|patches| patches.filter(mask.clone())) + .map(|patches| patches.filter(mask)) .transpose()? .flatten(); @@ -47,15 +47,13 @@ fn filter_primitive( .and_then(|a| a.into_primitive()); } - let values: Buffer = match mask.iter()? { + let values: Buffer = match mask.iter() { FilterIter::Indices(indices) => { filter_indices(array, mask.true_count(), indices.iter().copied()) } - FilterIter::IndicesIter(iter) => filter_indices(array, mask.true_count(), iter), FilterIter::Slices(slices) => { filter_slices(array, mask.true_count(), slices.iter().copied()) } - FilterIter::SlicesIter(iter) => filter_slices(array, mask.true_count(), iter), }; let mut values = PrimitiveArray::new(values, validity).reinterpret_cast(array.ptype()); @@ -143,9 +141,9 @@ mod test { let unpacked = PrimitiveArray::from_iter((0..4096).map(|i| (i % 63) as u8)); let bitpacked = BitPackedArray::encode(unpacked.as_ref(), 6).unwrap(); - let mask = FilterMask::from_indices(bitpacked.len(), [0, 125, 2047, 2049, 2151, 2790]); + let mask = FilterMask::from_indices(bitpacked.len(), vec![0, 125, 2047, 2049, 2151, 2790]); - let primitive_result = filter(bitpacked.as_ref(), mask) + let primitive_result = filter(bitpacked.as_ref(), &mask) .unwrap() .into_primitive() .unwrap(); @@ -160,9 +158,9 @@ mod test { let bitpacked = BitPackedArray::encode(unpacked.as_ref(), 6).unwrap(); let sliced = slice(bitpacked.as_ref(), 128, 2050).unwrap(); - let mask = FilterMask::from_indices(sliced.len(), [1919, 1921]); + let mask = FilterMask::from_indices(sliced.len(), vec![1919, 1921]); - let primitive_result = filter(&sliced, mask).unwrap().into_primitive().unwrap(); + let primitive_result = filter(&sliced, &mask).unwrap().into_primitive().unwrap(); let res_bytes = primitive_result.as_slice::(); assert_eq!(res_bytes, &[31, 33]); } @@ -171,7 +169,11 @@ mod test { fn filter_bitpacked() { let unpacked = PrimitiveArray::from_iter((0..4096).map(|i| (i % 63) as u8)); let bitpacked = BitPackedArray::encode(unpacked.as_ref(), 6).unwrap(); - let filtered = filter(bitpacked.as_ref(), FilterMask::from_indices(4096, 0..1024)).unwrap(); + let filtered = filter( + bitpacked.as_ref(), + &FilterMask::from_indices(4096, (0..1024).collect()), + ) + .unwrap(); assert_eq!( filtered.into_primitive().unwrap().as_slice::(), (0..1024).map(|i| (i % 63) as u8).collect::>() @@ -185,7 +187,7 @@ mod test { let bitpacked = BitPackedArray::encode(unpacked.as_ref(), 9).unwrap(); let filtered = filter( bitpacked.as_ref(), - FilterMask::from_indices(values.len(), 0..250), + &FilterMask::from_indices(values.len(), (0..250).collect()), ) .unwrap() .into_primitive() diff --git a/encodings/fastlanes/src/for/compute/mod.rs b/encodings/fastlanes/src/for/compute/mod.rs index 98a919a952..1294069d47 100644 --- a/encodings/fastlanes/src/for/compute/mod.rs +++ b/encodings/fastlanes/src/for/compute/mod.rs @@ -53,7 +53,7 @@ impl TakeFn for FoREncoding { } impl FilterFn for FoREncoding { - fn filter(&self, array: &FoRArray, mask: FilterMask) -> VortexResult { + fn filter(&self, array: &FoRArray, mask: &FilterMask) -> VortexResult { FoRArray::try_new( filter(&array.encoded(), mask)?, array.reference_scalar(), diff --git a/encodings/fsst/src/compute/mod.rs b/encodings/fsst/src/compute/mod.rs index fd20058eda..0ce03f932e 100644 --- a/encodings/fsst/src/compute/mod.rs +++ b/encodings/fsst/src/compute/mod.rs @@ -80,12 +80,12 @@ impl ScalarAtFn for FSSTEncoding { impl FilterFn for FSSTEncoding { // Filtering an FSSTArray filters the codes array, leaving the symbols array untouched - fn filter(&self, array: &FSSTArray, mask: FilterMask) -> VortexResult { + fn filter(&self, array: &FSSTArray, mask: &FilterMask) -> VortexResult { Ok(FSSTArray::try_new( array.dtype().clone(), array.symbols(), array.symbol_lengths(), - filter(&array.codes(), mask.clone())?, + filter(&array.codes(), mask)?, filter(&array.uncompressed_lengths(), mask)?, )? .into_array()) diff --git a/encodings/fsst/tests/fsst_tests.rs b/encodings/fsst/tests/fsst_tests.rs index a7f73741c9..3f00e6b1cc 100644 --- a/encodings/fsst/tests/fsst_tests.rs +++ b/encodings/fsst/tests/fsst_tests.rs @@ -86,7 +86,7 @@ fn test_fsst_array_ops() { // test filter let mask = FilterMask::from_iter([false, true, false]); - let fsst_filtered = filter(&fsst_array, mask).unwrap(); + let fsst_filtered = filter(&fsst_array, &mask).unwrap(); assert_eq!(fsst_filtered.encoding().id(), FSSTEncoding::ID); assert_eq!(fsst_filtered.len(), 1); assert_nth_scalar!( diff --git a/encodings/runend/src/compute/mod.rs b/encodings/runend/src/compute/mod.rs index 7db4b77d45..72f3fd12ef 100644 --- a/encodings/runend/src/compute/mod.rs +++ b/encodings/runend/src/compute/mod.rs @@ -92,12 +92,12 @@ impl SliceFn for RunEndEncoding { } impl FilterFn for RunEndEncoding { - fn filter(&self, array: &RunEndArray, mask: FilterMask) -> VortexResult { + fn filter(&self, array: &RunEndArray, mask: &FilterMask) -> VortexResult { let primitive_run_ends = array.ends().into_primitive()?; let (run_ends, values_mask) = match_each_unsigned_integer_ptype!(primitive_run_ends.ptype(), |$P| { filter_run_ends(primitive_run_ends.as_slice::<$P>(), array.offset() as u64, array.len() as u64, mask)? }); - let values = filter(&array.values(), values_mask)?; + let values = filter(&array.values(), &values_mask)?; RunEndArray::try_new(run_ends.into_array(), values).map(|a| a.into_array()) } @@ -108,14 +108,14 @@ fn filter_run_ends + AsPrimitive>( run_ends: &[R], offset: u64, length: u64, - mask: FilterMask, + mask: &FilterMask, ) -> VortexResult<(PrimitiveArray, FilterMask)> { let mut new_run_ends = buffer_mut![R::zero(); run_ends.len()]; let mut start = 0u64; let mut j = 0; let mut count = R::zero(); - let filter_values = mask.to_boolean_buffer()?; + let filter_values = mask.boolean_buffer(); let new_mask: FilterMask = BooleanBuffer::collect_bool(run_ends.len(), |i| { let mut keep = false; @@ -278,7 +278,7 @@ mod test { let arr = ree_array(); let filtered = filter( arr.as_ref(), - FilterMask::from_iter([ + &FilterMask::from_iter([ true, true, false, false, false, false, false, false, false, false, true, true, ]), ) @@ -308,7 +308,7 @@ mod test { let arr = slice(ree_array(), 2, 7).unwrap(); let filtered = filter( &arr, - FilterMask::from_iter([true, false, false, true, true]), + &FilterMask::from_iter([true, false, false, true, true]), ) .unwrap(); let filtered_run_end = RunEndArray::try_from(filtered).unwrap(); diff --git a/encodings/zigzag/src/compute.rs b/encodings/zigzag/src/compute.rs index f685c6a55c..9b2e914c7d 100644 --- a/encodings/zigzag/src/compute.rs +++ b/encodings/zigzag/src/compute.rs @@ -30,7 +30,7 @@ impl ComputeVTable for ZigZagEncoding { } impl FilterFn for ZigZagEncoding { - fn filter(&self, array: &ZigZagArray, mask: FilterMask) -> VortexResult { + fn filter(&self, array: &ZigZagArray, mask: &FilterMask) -> VortexResult { let encoded = filter(&array.encoded(), mask)?; Ok(ZigZagArray::try_new(encoded)?.into_array()) } @@ -145,7 +145,7 @@ mod tests { fn filter_zigzag() { let zigzag = ZigZagArray::encode(&buffer![-189, -160, 1].into_array()).unwrap(); let filter_mask = BooleanBuffer::from(vec![true, false, true]).into(); - let actual = filter(&zigzag.into_array(), filter_mask) + let actual = filter(&zigzag.into_array(), &filter_mask) .unwrap() .into_primitive() .unwrap(); diff --git a/fuzz/fuzz_targets/array_ops.rs b/fuzz/fuzz_targets/array_ops.rs index d71fee4be1..fedc61857c 100644 --- a/fuzz/fuzz_targets/array_ops.rs +++ b/fuzz/fuzz_targets/array_ops.rs @@ -59,7 +59,7 @@ fuzz_target!(|fuzz_action: FuzzArrayAction| -> Corpus { assert_search_sorted(sorted, s, side, expected.search(), i) } Action::Filter(mask) => { - current_array = filter(¤t_array, mask).unwrap(); + current_array = filter(¤t_array, &mask).unwrap(); assert_array_eq(&expected.array(), ¤t_array, i); } } diff --git a/pyvortex/src/array.rs b/pyvortex/src/array.rs index 7d81a34906..43f88bb610 100644 --- a/pyvortex/src/array.rs +++ b/pyvortex/src/array.rs @@ -263,7 +263,7 @@ impl PyArray { fn filter(&self, filter: &Bound) -> PyResult { let filter = filter.borrow(); let inner = - vortex::compute::filter(&self.inner, FilterMask::try_from(filter.inner.clone())?)?; + vortex::compute::filter(&self.inner, &FilterMask::try_from(filter.inner.clone())?)?; Ok(PyArray { inner }) } diff --git a/vortex-array/src/array/bool/compute/filter.rs b/vortex-array/src/array/bool/compute/filter.rs index 92c832487e..128fd122f8 100644 --- a/vortex-array/src/array/bool/compute/filter.rs +++ b/vortex-array/src/array/bool/compute/filter.rs @@ -6,22 +6,16 @@ use crate::compute::{FilterFn, FilterIter, FilterMask}; use crate::{ArrayData, IntoArrayData}; impl FilterFn for BoolEncoding { - fn filter(&self, array: &BoolArray, mask: FilterMask) -> VortexResult { - let validity = array.validity().filter(&mask)?; + fn filter(&self, array: &BoolArray, mask: &FilterMask) -> VortexResult { + let validity = array.validity().filter(mask)?; - let buffer = match mask.iter()? { + let buffer = match mask.iter() { FilterIter::Indices(indices) => filter_indices_slice(&array.boolean_buffer(), indices), - FilterIter::IndicesIter(iter) => { - filter_indices(&array.boolean_buffer(), mask.true_count(), iter) - } FilterIter::Slices(slices) => filter_slices( &array.boolean_buffer(), mask.true_count(), slices.iter().copied(), ), - FilterIter::SlicesIter(iter) => { - filter_slices(&array.boolean_buffer(), mask.true_count(), iter) - } }; Ok(BoolArray::try_new(buffer, validity)?.into_array()) @@ -84,7 +78,7 @@ mod test { let arr = BoolArray::from_iter([true, true, false]); let mask = FilterMask::from_iter([true, false, true]); - let filtered = filter(&arr.into_array(), mask) + let filtered = filter(&arr.into_array(), &mask) .unwrap() .into_bool() .unwrap(); diff --git a/vortex-array/src/array/chunked/compute/filter.rs b/vortex-array/src/array/chunked/compute/filter.rs index ea01a01fbf..fc29caf825 100644 --- a/vortex-array/src/array/chunked/compute/filter.rs +++ b/vortex-array/src/array/chunked/compute/filter.rs @@ -1,4 +1,3 @@ -use arrow_buffer::BooleanBufferBuilder; use vortex_buffer::BufferMut; use vortex_error::{VortexExpect, VortexResult, VortexUnwrap}; @@ -11,7 +10,7 @@ use crate::{ArrayDType, ArrayData, ArrayLen, IntoArrayData, IntoCanonical}; const FILTER_SLICES_SELECTIVITY_THRESHOLD: f64 = 0.8; impl FilterFn for ChunkedEncoding { - fn filter(&self, array: &ChunkedArray, mask: FilterMask) -> VortexResult { + fn filter(&self, array: &ChunkedArray, mask: &FilterMask) -> VortexResult { let selected = mask.true_count(); // Based on filter selectivity, we take the values between a range of slices, or @@ -38,31 +37,8 @@ enum ChunkFilter { Slices(Vec<(usize, usize)>), } -/// Given a sequence of slices that indicate ranges of set values, returns a boolean array -/// representing the same thing. -fn slices_to_mask(slices: &[(usize, usize)], len: usize) -> FilterMask { - let mut buffer = BooleanBufferBuilder::new(len); - - let mut pos = 0; - for (slice_start, slice_end) in slices.iter().copied() { - // write however many trailing `false` between the end of the previous slice and the - // start of this one. - let n_leading_false = slice_start - pos; - buffer.append_n(n_leading_false, false); - buffer.append_n(slice_end - slice_start, true); - pos = slice_end; - } - - // Pad the end of the buffer with false, if necessary. - let n_trailing_false = len - pos; - buffer.append_n(n_trailing_false, false); - - FilterMask::from(buffer.finish()) -} - /// Filter the chunks using slice ranges. -#[allow(deprecated)] -fn filter_slices(array: &ChunkedArray, mask: FilterMask) -> VortexResult> { +fn filter_slices(array: &ChunkedArray, mask: &FilterMask) -> VortexResult> { let mut result = Vec::with_capacity(array.nchunks()); // Pre-materialize the chunk ends for performance. @@ -72,8 +48,8 @@ fn filter_slices(array: &ChunkedArray, mask: FilterMask) -> VortexResult VortexResult preserve the entire chunk unfiltered. ChunkFilter::All => result.push(chunk), @@ -128,7 +104,10 @@ fn filter_slices(array: &ChunkedArray, mask: FilterMask) -> VortexResult {} // Slices => turn the slices into a boolean buffer. ChunkFilter::Slices(slices) => { - result.push(filter(&chunk, slices_to_mask(slices, chunk.len()))?); + result.push(filter( + &chunk, + &FilterMask::from_slices(chunk.len(), slices), + )?); } } } @@ -138,7 +117,7 @@ fn filter_slices(array: &ChunkedArray, mask: FilterMask) -> VortexResult VortexResult> { +fn filter_indices(array: &ChunkedArray, mask: &FilterMask) -> VortexResult> { let mut result = Vec::with_capacity(array.nchunks()); let mut current_chunk_id = 0; let mut chunk_indices = BufferMut::with_capacity(array.nchunks()); @@ -148,8 +127,8 @@ fn filter_indices(array: &ChunkedArray, mask: FilterMask) -> VortexResult(); - for set_index in mask.iter_indices()? { - let (chunk_id, index) = find_chunk_idx(set_index, chunk_ends); + for set_index in mask.indices() { + let (chunk_id, index) = find_chunk_idx(*set_index, chunk_ends); if chunk_id != current_chunk_id { // Push the chunk we've accumulated. if !chunk_indices.is_empty() { @@ -201,28 +180,13 @@ fn find_chunk_idx(idx: usize, chunk_ends: &[u64]) -> (usize, usize) { #[cfg(test)] mod test { - use itertools::Itertools; use vortex_dtype::half::f16; use vortex_dtype::{DType, Nullability, PType}; - use crate::array::chunked::compute::filter::slices_to_mask; use crate::array::{ChunkedArray, PrimitiveArray}; use crate::compute::{filter, FilterMask}; use crate::IntoArrayData; - #[test] - fn test_slices_to_predicate() { - let slices = [(2, 4), (6, 8), (9, 10)]; - let predicate = slices_to_mask(&slices, 11); - - let bools = predicate.to_boolean_buffer().unwrap().iter().collect_vec(); - - assert_eq!( - bools, - vec![false, false, true, true, false, false, true, true, false, true, false], - ) - } - #[test] fn filter_chunked_floats() { let chunked = ChunkedArray::try_new( @@ -252,7 +216,7 @@ mod test { let mask = FilterMask::from_iter([ true, false, false, true, true, true, true, true, true, true, true, ]); - let filtered = filter(&chunked, mask).unwrap(); + let filtered = filter(&chunked, &mask).unwrap(); assert_eq!(filtered.len(), 9); } } diff --git a/vortex-array/src/array/constant/compute/mod.rs b/vortex-array/src/array/constant/compute/mod.rs index dbb7dfbc7b..a5ba0b8485 100644 --- a/vortex-array/src/array/constant/compute/mod.rs +++ b/vortex-array/src/array/constant/compute/mod.rs @@ -72,7 +72,7 @@ impl SliceFn for ConstantEncoding { } impl FilterFn for ConstantEncoding { - fn filter(&self, array: &ConstantArray, mask: FilterMask) -> VortexResult { + fn filter(&self, array: &ConstantArray, mask: &FilterMask) -> VortexResult { Ok(ConstantArray::new(array.scalar(), mask.true_count()).into_array()) } } diff --git a/vortex-array/src/array/list/mod.rs b/vortex-array/src/array/list/mod.rs index 1eee1b3452..eb9bdab32d 100644 --- a/vortex-array/src/array/list/mod.rs +++ b/vortex-array/src/array/list/mod.rs @@ -311,7 +311,7 @@ mod test { let filtered = filter( &list, - FilterMask::from(BooleanBuffer::from(vec![false, true, true])), + &FilterMask::from(BooleanBuffer::from(vec![false, true, true])), ); assert!(filtered.is_ok()) diff --git a/vortex-array/src/array/primitive/compute/filter.rs b/vortex-array/src/array/primitive/compute/filter.rs index 89a712e396..0ed45f85c8 100644 --- a/vortex-array/src/array/primitive/compute/filter.rs +++ b/vortex-array/src/array/primitive/compute/filter.rs @@ -9,14 +9,12 @@ use crate::variants::PrimitiveArrayTrait; use crate::{ArrayData, IntoArrayData}; impl FilterFn for PrimitiveEncoding { - fn filter(&self, array: &PrimitiveArray, mask: FilterMask) -> VortexResult { - let validity = array.validity().filter(&mask)?; + fn filter(&self, array: &PrimitiveArray, mask: &FilterMask) -> VortexResult { + let validity = array.validity().filter(mask)?; match_each_native_ptype!(array.ptype(), |$T| { - let values = match mask.iter()? { + let values = match mask.iter() { FilterIter::Indices(indices) => filter_primitive_indices(array.as_slice::<$T>(), indices.iter().copied()), - FilterIter::IndicesIter(iter) => filter_primitive_indices(array.as_slice::<$T>(), iter), FilterIter::Slices(slices) => filter_primitive_slices(array.as_slice::<$T>(), mask.true_count(), slices.iter().copied()), - FilterIter::SlicesIter(iter) => filter_primitive_slices(array.as_slice::<$T>(), mask.true_count(), iter), }; Ok(PrimitiveArray::new(values, validity).into_array()) }) @@ -57,7 +55,7 @@ mod test { let mask = [true, true, false, true, true, true, false, true]; let arr = PrimitiveArray::from_iter([1u32, 24, 54, 2, 3, 2, 3, 2]); - let filtered = filter(&arr.to_array(), FilterMask::from_iter(mask)) + let filtered = filter(&arr.to_array(), &FilterMask::from_iter(mask)) .unwrap() .into_primitive() .unwrap(); diff --git a/vortex-array/src/array/sparse/compute/mod.rs b/vortex-array/src/array/sparse/compute/mod.rs index 1d323085b3..433cdd6062 100644 --- a/vortex-array/src/array/sparse/compute/mod.rs +++ b/vortex-array/src/array/sparse/compute/mod.rs @@ -89,8 +89,8 @@ impl SearchSortedUsizeFn for SparseEncoding { } impl FilterFn for SparseEncoding { - fn filter(&self, array: &SparseArray, mask: FilterMask) -> VortexResult { - let new_length = mask.to_boolean_buffer()?.count_set_bits(); + fn filter(&self, array: &SparseArray, mask: &FilterMask) -> VortexResult { + let new_length = mask.true_count(); let Some(new_patches) = array.resolved_patches()?.filter(mask)? else { return Ok(ConstantArray::new(array.fill_scalar(), new_length).into_array()); @@ -188,7 +188,7 @@ mod test { predicate.extend_from_slice(&[false; 17]); let mask = FilterMask::from_iter(predicate); - let filtered_array = filter(&array, mask).unwrap(); + let filtered_array = filter(&array, &mask).unwrap(); let filtered_array = SparseArray::try_from(filtered_array).unwrap(); assert_eq!(filtered_array.len(), 1); @@ -208,7 +208,7 @@ mod test { .unwrap() .into_array(); - let filtered_array = filter(&array, mask).unwrap(); + let filtered_array = filter(&array, &mask).unwrap(); let filtered_array = SparseArray::try_from(filtered_array).unwrap(); assert_eq!(filtered_array.len(), 4); diff --git a/vortex-array/src/array/struct_/compute.rs b/vortex-array/src/array/struct_/compute.rs index ba3b44dce7..d95788d60e 100644 --- a/vortex-array/src/array/struct_/compute.rs +++ b/vortex-array/src/array/struct_/compute.rs @@ -73,12 +73,12 @@ impl SliceFn for StructEncoding { } impl FilterFn for StructEncoding { - fn filter(&self, array: &StructArray, mask: FilterMask) -> VortexResult { - let validity = array.validity().filter(&mask)?; + fn filter(&self, array: &StructArray, mask: &FilterMask) -> VortexResult { + let validity = array.validity().filter(mask)?; let fields: Vec = array .children() - .map(|field| filter(&field, mask.clone())) + .map(|field| filter(&field, mask)) .try_collect()?; let length = fields .first() @@ -103,7 +103,7 @@ mod tests { let mask = vec![ false, true, false, true, false, true, false, true, false, true, ]; - let filtered = filter(struct_arr.as_ref(), FilterMask::from_iter(mask)).unwrap(); + let filtered = filter(struct_arr.as_ref(), &FilterMask::from_iter(mask)).unwrap(); assert_eq!(filtered.len(), 5); } @@ -111,7 +111,8 @@ mod tests { fn filter_empty_struct_with_empty_filter() { let struct_arr = StructArray::try_new(vec![].into(), vec![], 0, Validity::NonNullable).unwrap(); - let filtered = filter(struct_arr.as_ref(), FilterMask::from_iter::<[bool; 0]>([])).unwrap(); + let filtered = + filter(struct_arr.as_ref(), &FilterMask::from_iter::<[bool; 0]>([])).unwrap(); assert_eq!(filtered.len(), 0); } } diff --git a/vortex-array/src/array/varbin/compute/filter.rs b/vortex-array/src/array/varbin/compute/filter.rs index da8cb06bd2..671aa075c8 100644 --- a/vortex-array/src/array/varbin/compute/filter.rs +++ b/vortex-array/src/array/varbin/compute/filter.rs @@ -12,12 +12,12 @@ use crate::variants::PrimitiveArrayTrait; use crate::{ArrayDType, ArrayData, IntoArrayData, IntoArrayVariant}; impl FilterFn for VarBinEncoding { - fn filter(&self, array: &VarBinArray, mask: FilterMask) -> VortexResult { + fn filter(&self, array: &VarBinArray, mask: &FilterMask) -> VortexResult { filter_select_var_bin(array, mask).map(|a| a.into_array()) } } -fn filter_select_var_bin(arr: &VarBinArray, mask: FilterMask) -> VortexResult { +fn filter_select_var_bin(arr: &VarBinArray, mask: &FilterMask) -> VortexResult { let selection_count = mask.true_count(); if selection_count * 2 > mask.len() { filter_select_var_bin_by_slice(arr, mask, selection_count) @@ -28,7 +28,7 @@ fn filter_select_var_bin(arr: &VarBinArray, mask: FilterMask) -> VortexResult VortexResult { let offsets = values.offsets().into_primitive()?; @@ -49,7 +49,7 @@ fn filter_select_var_bin_by_slice_primitive_offset( dtype: DType, offsets: &[O], data: &[u8], - mask: FilterMask, + mask: &FilterMask, validity: Validity, selection_count: usize, ) -> VortexResult @@ -61,7 +61,7 @@ where if let Some(val) = logical_validity.to_null_buffer()? { let mut builder = VarBinBuilder::::with_capacity(selection_count); - for (start, end) in mask.iter_slices()? { + for (start, end) in mask.slices().iter().copied() { let null_sl = val.slice(start, end - start); if null_sl.null_count() == 0 { update_non_nullable_slice(data, offsets, &mut builder, start, end) @@ -93,8 +93,8 @@ where let mut builder = VarBinBuilder::::with_capacity(selection_count); - mask.iter_slices()?.for_each(|(start, end)| { - update_non_nullable_slice(data, offsets, &mut builder, start, end) + mask.slices().iter().for_each(|(start, end)| { + update_non_nullable_slice(data, offsets, &mut builder, *start, *end) }); Ok(builder.finish(dtype)) @@ -128,7 +128,7 @@ fn update_non_nullable_slice( fn filter_select_var_bin_by_index( values: &VarBinArray, - mask: FilterMask, + mask: &FilterMask, selection_count: usize, ) -> VortexResult { let offsets = values.offsets().into_primitive()?; @@ -149,12 +149,12 @@ fn filter_select_var_bin_by_index_primitive_offset( dtype: DType, offsets: &[O], data: &[u8], - mask: FilterMask, + mask: &FilterMask, validity: Validity, selection_count: usize, ) -> VortexResult { let mut builder = VarBinBuilder::::with_capacity(selection_count); - for idx in mask.iter_indices()? { + for idx in mask.indices().iter().copied() { if validity.is_valid(idx) { let (start, end) = ( offsets[idx].to_usize().ok_or_else(|| { @@ -205,7 +205,7 @@ mod test { ); let filter = FilterMask::from_iter([true, false, true]); - let buf = filter_select_var_bin_by_index(&arr, filter, 2) + let buf = filter_select_var_bin_by_index(&arr, &filter, 2) .unwrap() .to_array(); @@ -228,7 +228,7 @@ mod test { ); let filter = FilterMask::from_iter([true, false, true, false, true]); - let buf = filter_select_var_bin_by_slice(&arr, filter, 3) + let buf = filter_select_var_bin_by_slice(&arr, &filter, 3) .unwrap() .to_array(); @@ -258,7 +258,7 @@ mod test { let arr = VarBinArray::try_new(offsets, bytes, DType::Utf8(Nullable), validity).unwrap(); let filter = FilterMask::from_iter([true, true, true, false, true, true]); - let buf = filter_select_var_bin_by_slice(&arr, filter, 5) + let buf = filter_select_var_bin_by_slice(&arr, &filter, 5) .unwrap() .to_array(); diff --git a/vortex-array/src/compute/filter.rs b/vortex-array/src/compute/filter.rs index 89fab5e6f8..61738210bf 100644 --- a/vortex-array/src/compute/filter.rs +++ b/vortex-array/src/compute/filter.rs @@ -1,18 +1,20 @@ -use std::iter::TrustedLen; +use std::cmp::Ordering; +use std::ops::BitAnd; use std::sync::{Arc, OnceLock}; use arrow_array::BooleanArray; -use arrow_buffer::{BooleanBuffer, BooleanBufferBuilder, MutableBuffer}; -use num_traits::AsPrimitive; +use arrow_buffer::{BooleanBuffer, BooleanBufferBuilder}; use vortex_dtype::{DType, Nullability}; -use vortex_error::{vortex_bail, vortex_err, VortexError, VortexExpect, VortexResult}; +use vortex_error::{ + vortex_bail, vortex_err, vortex_panic, VortexError, VortexExpect, VortexResult, +}; -use crate::array::{BoolArray, ConstantArray}; +use crate::array::ConstantArray; use crate::arrow::FromArrowArray; use crate::compute::scalar_at; use crate::encoding::Encoding; use crate::stats::ArrayStatistics; -use crate::{ArrayDType, ArrayData, Canonical, IntoArrayData, IntoCanonical}; +use crate::{ArrayDType, ArrayData, Canonical, IntoArrayData, IntoArrayVariant, IntoCanonical}; /// If the filter selects more than this fraction of rows, iterate over slices instead of indices. /// @@ -22,7 +24,7 @@ const FILTER_SLICES_SELECTIVITY_THRESHOLD: f64 = 0.8; pub trait FilterFn { /// Filter an array by the provided predicate. - fn filter(&self, array: &Array, mask: FilterMask) -> VortexResult; + fn filter(&self, array: &Array, mask: &FilterMask) -> VortexResult; } impl FilterFn for E @@ -30,7 +32,7 @@ where E: FilterFn, for<'a> &'a E::Array: TryFrom<&'a ArrayData, Error = VortexError>, { - fn filter(&self, array: &ArrayData, mask: FilterMask) -> VortexResult { + fn filter(&self, array: &ArrayData, mask: &FilterMask) -> VortexResult { let (array_ref, encoding) = array.downcast_array_ref::()?; FilterFn::filter(encoding, array_ref, mask) } @@ -46,7 +48,7 @@ where /// /// The `predicate` must receive an Array with type non-nullable bool, and will panic if this is /// not the case. -pub fn filter(array: &ArrayData, mask: FilterMask) -> VortexResult { +pub fn filter(array: &ArrayData, mask: &FilterMask) -> VortexResult { if mask.len() != array.len() { vortex_bail!( "mask.len() is {}, does not equal array.len() of {}", @@ -85,14 +87,14 @@ pub fn filter(array: &ArrayData, mask: FilterMask) -> VortexResult { Ok(filtered) } -fn filter_impl(array: &ArrayData, mask: FilterMask) -> VortexResult { +fn filter_impl(array: &ArrayData, mask: &FilterMask) -> VortexResult { if let Some(filter_fn) = array.encoding().filter_fn() { return filter_fn.filter(array, mask); } // We can use scalar_at if the mask has length 1. if mask.true_count() == 1 && array.encoding().scalar_at_fn().is_some() { - let idx = mask.indices()?[0]; + let idx = mask.first().vortex_expect("true_count == 1"); return Ok(ConstantArray::new(scalar_at(array, idx)?, 1).into_array()); } @@ -103,209 +105,426 @@ fn filter_impl(array: &ArrayData, mask: FilterMask) -> VortexResult { ); let array_ref = array.clone().into_arrow()?; - let mask_array = BooleanArray::new(mask.to_boolean_buffer()?, None); + let mask_array = BooleanArray::new(mask.boolean_buffer().clone(), None); let filtered = arrow_select::filter::filter(array_ref.as_ref(), &mask_array)?; Ok(ArrayData::from_arrow(filtered, array.dtype().is_nullable())) } /// Represents the mask argument to a filter function. -/// Internally this will cache the canonical representation of the mask if it is ever used. +/// +/// A [`FilterMask`] can be constructed from various representations, and converted to various +/// others. Internally, these are cached. +#[derive(Clone, Debug)] +pub struct FilterMask(Arc); + #[derive(Debug)] -pub struct FilterMask { - array: ArrayData, +struct Inner { + // The three possible representations of the mask. + buffer: OnceLock, + indices: OnceLock>, + slices: OnceLock>, + + // Pre-computed values. + len: usize, true_count: usize, - range_selectivity: f64, - indices: Arc>>, - slices: Arc>>, - buffer: Arc>, + selectivity: f64, } -/// We implement Clone manually to trigger population of our cached indices or slices. -/// By making the filter API take FilterMask by value, whenever it gets used multiple times -/// in a recursive function, we will cache the slices internally. -impl Clone for FilterMask { - fn clone(&self) -> Self { - if self.range_selectivity > FILTER_SLICES_SELECTIVITY_THRESHOLD { - let _: VortexResult<_> = self - .slices - .get_or_try_init(|| Ok(self.boolean_buffer()?.set_slices().collect())); - } else { - let _: VortexResult<_> = self.indices(); - } +impl Inner { + /// Constructs a [`BooleanBuffer`] from one of the other representations. + fn buffer(&self) -> &BooleanBuffer { + self.buffer.get_or_init(|| { + if self.true_count == 0 { + return BooleanBuffer::new_unset(self.len); + } + + if self.true_count == self.len { + return BooleanBuffer::new_set(self.len); + } + + if let Some(indices) = self.indices.get() { + let mut buf = BooleanBufferBuilder::new(self.len); + // TODO(ngates): for dense indices, we can do better by collecting into u64s. + buf.append_n(self.len, false); + indices.iter().for_each(|idx| buf.set_bit(*idx, true)); + return BooleanBuffer::from(buf); + } + + if let Some(slices) = self.slices.get() { + let mut buf = BooleanBufferBuilder::new(self.len); + for (start, end) in slices.iter().copied() { + buf.append_n(start - buf.len(), false); + buf.append_n(end - start, true); + } + if let Some((_, end)) = slices.last() { + buf.append_n(self.len - end, false); + } + debug_assert_eq!(buf.len(), self.len); + return BooleanBuffer::from(buf); + } - Self { - array: self.array.clone(), - true_count: self.true_count, - range_selectivity: self.range_selectivity, - indices: self.indices.clone(), - slices: self.slices.clone(), - buffer: self.buffer.clone(), - } + vortex_panic!("No mask representation found") + }) } -} - -/// Wrapper around Arrow's BitIndexIterator that knows its total length. -pub struct BitIndexIterator<'a> { - inner: arrow_buffer::bit_iterator::BitIndexIterator<'a>, - index: usize, - trusted_len: usize, -} -impl<'a> BitIndexIterator<'a> { - pub fn new( - inner: arrow_buffer::bit_iterator::BitIndexIterator<'a>, - trusted_len: usize, - ) -> Self { - Self { - inner, - index: 0, - trusted_len, - } + /// Constructs an indices vector from one of the other representations. + fn indices(&self) -> &[usize] { + self.indices.get_or_init(|| { + if self.true_count == 0 { + return vec![]; + } + + if self.true_count == self.len { + return (0..self.len).collect(); + } + + if let Some(buffer) = self.buffer.get() { + let mut indices = Vec::with_capacity(self.true_count); + indices.extend(buffer.set_indices()); + return indices; + } + + if let Some(slices) = self.slices.get() { + let mut indices = Vec::with_capacity(self.true_count); + indices.extend(slices.iter().flat_map(|(start, end)| *start..*end)); + return indices; + } + + vortex_panic!("No mask representation found") + }) } -} -impl Iterator for BitIndexIterator<'_> { - type Item = usize; + /// Constructs a slices vector from one of the other representations. + fn slices(&self) -> &[(usize, usize)] { + self.slices.get_or_init(|| { + if self.true_count == self.len { + return vec![(0, self.len)]; + } + + if let Some(buffer) = self.buffer.get() { + return buffer.set_slices().collect(); + } + + if let Some(indices) = self.indices.get() { + let mut slices = Vec::with_capacity(self.true_count); // Upper bound + let mut iter = indices.iter().copied(); + + // Handle empty input + let Some(first) = iter.next() else { + return slices; + }; + + let mut start = first; + let mut prev = first; + for curr in iter { + if curr != prev + 1 { + slices.push((start, prev + 1)); + start = curr; + } + prev = curr; + } - fn next(&mut self) -> Option { - self.index += 1; - self.inner.next() - } + // Don't forget the last range + slices.push((start, prev + 1)); - fn size_hint(&self) -> (usize, Option) { - let remaining = self.trusted_len - self.index; - (remaining, Some(remaining)) - } -} + return slices; + } -/// Safety: BitIndexIterator is TrustedLen because it knows its total length. -unsafe impl TrustedLen for BitIndexIterator<'_> {} -impl ExactSizeIterator for BitIndexIterator<'_> {} + vortex_panic!("No mask representation found") + }) + } -pub enum FilterIter<'a> { - // Slice of pre-cached indices of a filter mask. - Indices(&'a [usize]), - // Iterator over set bits of the filter mask's boolean buffer. - IndicesIter(BitIndexIterator<'a>), - // Slice of pre-cached slices of a filter mask. - Slices(&'a [(usize, usize)]), - // Iterator over contiguous ranges of set bits of the filter mask's boolean buffer. - SlicesIter(arrow_buffer::bit_iterator::BitSliceIterator<'a>), + fn first(&self) -> Option { + if self.true_count == 0 { + return None; + } + if self.true_count == self.len { + return Some(0); + } + if let Some(buffer) = self.buffer.get() { + return buffer.set_indices().next(); + } + if let Some(indices) = self.indices.get() { + return indices.first().copied(); + } + if let Some(slices) = self.slices.get() { + return slices.first().map(|(start, _)| *start); + } + None + } } impl FilterMask { /// Create a new FilterMask where all values are set. pub fn new_true(length: usize) -> Self { - Self::from(BooleanBuffer::new_set(length)) + Self(Arc::new(Inner { + buffer: Default::default(), + indices: Default::default(), + slices: Default::default(), + len: length, + true_count: length, + selectivity: 1.0, + })) + } + + /// Create a new FilterMask where no values are set. + pub fn new_false(length: usize) -> Self { + Self(Arc::new(Inner { + buffer: Default::default(), + indices: Default::default(), + slices: Default::default(), + len: length, + true_count: 0, + selectivity: 0.0, + })) + } + + /// Create a new [`FilterMask`] from a [`BooleanBuffer`]. + pub fn from_buffer(buffer: BooleanBuffer) -> Self { + let true_count = buffer.count_set_bits(); + let len = buffer.len(); + Self(Arc::new(Inner { + buffer: OnceLock::from(buffer), + indices: Default::default(), + slices: Default::default(), + len, + true_count, + selectivity: true_count as f64 / len as f64, + })) + } + + /// Create a new [`FilterMask`] from a [`Vec`]. + pub fn from_indices(len: usize, vec: Vec) -> Self { + let true_count = vec.len(); + assert!(vec.iter().all(|&idx| idx < len)); + Self(Arc::new(Inner { + buffer: Default::default(), + indices: OnceLock::from(vec), + slices: Default::default(), + len, + true_count, + selectivity: true_count as f64 / len as f64, + })) + } + + /// Create a new [`FilterMask`] from a [`Vec<(usize, usize)>`] where each range + /// represents a contiguous range of true values. + pub fn from_slices(len: usize, vec: Vec<(usize, usize)>) -> Self { + assert!(vec.iter().all(|&(b, e)| b < e && e <= len)); + let true_count = vec.iter().map(|(b, e)| e - b).sum(); + Self(Arc::new(Inner { + buffer: Default::default(), + indices: Default::default(), + slices: OnceLock::from(vec), + len, + true_count, + selectivity: true_count as f64 / len as f64, + })) } - /// Create a new FilterMask where the given indices are set. - pub fn from_indices, I: IntoIterator>( - length: usize, - indices: I, + /// Create a new [`FilterMask`] from the intersection of two indices slices. + pub fn from_intersection_indices( + len: usize, + lhs: impl Iterator, + rhs: impl Iterator, ) -> Self { - let mut buffer = MutableBuffer::new_null(length); - indices - .into_iter() - .for_each(|idx| arrow_buffer::bit_util::set_bit(&mut buffer, idx.as_())); - Self::from(BooleanBufferBuilder::new_from_buffer(buffer, length).finish()) + let mut intersection = Vec::with_capacity(len); + let mut lhs = lhs.peekable(); + let mut rhs = rhs.peekable(); + while let (Some(&l), Some(&r)) = (lhs.peek(), rhs.peek()) { + match l.cmp(&r) { + Ordering::Less => { + lhs.next(); + } + Ordering::Greater => { + rhs.next(); + } + Ordering::Equal => { + intersection.push(l); + lhs.next(); + rhs.next(); + } + } + } + Self::from_indices(len, intersection) } + #[inline] pub fn len(&self) -> usize { - self.array.len() + self.0.len } + #[inline] pub fn is_empty(&self) -> bool { - self.array.is_empty() + self.0.len == 0 } /// Get the true count of the mask. + #[inline] pub fn true_count(&self) -> usize { - self.true_count + self.0.true_count } /// Get the false count of the mask. + #[inline] pub fn false_count(&self) -> usize { - self.array.len() - self.true_count + self.len() - self.true_count() } /// Return the selectivity of the full mask. + #[inline] pub fn selectivity(&self) -> f64 { - self.true_count as f64 / self.len() as f64 + self.0.selectivity } - /// Return the selectivity of the range of true values of the mask. - pub fn range_selectivity(&self) -> f64 { - self.range_selectivity + /// Get the canonical representation of the mask. + pub fn boolean_buffer(&self) -> &BooleanBuffer { + self.0.buffer() } - /// Get the canonical representation of the mask. - pub fn to_boolean_buffer(&self) -> VortexResult { - log::debug!( - "FilterMask: len {} selectivity: {} true_count: {}", - self.len(), - self.range_selectivity(), - self.true_count, - ); - self.boolean_buffer().cloned() + /// Get the indices of the true values in the mask. + pub fn indices(&self) -> &[usize] { + self.0.indices() } - fn boolean_buffer(&self) -> VortexResult<&BooleanBuffer> { - self.buffer.get_or_try_init(|| { - Ok(self - .array - .clone() - .into_canonical()? - .into_bool()? - .boolean_buffer()) - }) + /// Get the slices of the true values in the mask. + pub fn slices(&self) -> &[(usize, usize)] { + self.0.slices() } - fn indices(&self) -> VortexResult<&[usize]> { - self.indices - .get_or_try_init(|| { - let mut indices = Vec::with_capacity(self.true_count()); - indices.extend(self.boolean_buffer()?.set_indices()); - Ok(indices) - }) - .map(|v| v.as_slice()) + /// Returns the first true index in the mask. + pub fn first(&self) -> Option { + self.0.first() } /// Returns the best iterator based on a selectivity threshold. /// /// Currently, this threshold is fixed at 0.8 based on Arrow Rust. - pub fn iter(&self) -> VortexResult { - Ok( - if self.range_selectivity > FILTER_SLICES_SELECTIVITY_THRESHOLD { - // Iterate over slices - if let Some(slices) = self.slices.get() { - FilterIter::Slices(slices.as_slice()) - } else { - FilterIter::SlicesIter(self.boolean_buffer()?.set_slices()) - } - } else { - // Iterate over indices - if let Some(indices) = self.indices.get() { - FilterIter::Indices(indices.as_slice()) - } else { - FilterIter::IndicesIter(BitIndexIterator::new( - self.boolean_buffer()?.set_indices(), - self.true_count, - )) - } - }, - ) + pub fn iter(&self) -> FilterIter { + if self.selectivity() > FILTER_SLICES_SELECTIVITY_THRESHOLD { + FilterIter::Slices(self.slices()) + } else { + FilterIter::Indices(self.indices()) + } } - #[deprecated(note = "Move to using iter() instead")] - pub fn iter_slices(&self) -> VortexResult + '_> { - Ok(self.boolean_buffer()?.set_slices()) + /// Slice the mask. + pub fn slice(&self, offset: usize, length: usize) -> Self { + if self.true_count() == 0 { + return Self::new_false(length); + } + if self.true_count() == self.len() { + return Self::new_true(length); + } + + if let Some(buffer) = self.0.buffer.get() { + return Self::from_buffer(buffer.slice(offset, length)); + } + + let end = offset + length; + + if let Some(indices) = self.0.indices.get() { + let indices = indices + .iter() + .copied() + .filter(|&idx| offset <= idx && idx < end) + .map(|idx| idx - offset) + .collect(); + return Self::from_indices(length, indices); + } + + if let Some(slices) = self.0.slices.get() { + let slices = slices + .iter() + .copied() + .filter(|(s, e)| *s < end && *e > offset) + .map(|(s, e)| (s.max(offset), e.min(end))) + .collect(); + return Self::from_slices(length, slices); + } + + vortex_panic!("No mask representation found") } +} + +pub enum FilterIter<'a> { + /// Slice of pre-cached indices of a filter mask. + Indices(&'a [usize]), + /// Slice of pre-cached slices of a filter mask. + Slices(&'a [(usize, usize)]), +} - #[deprecated(note = "Move to using iter() instead")] - pub fn iter_indices(&self) -> VortexResult + '_> { - Ok(self.boolean_buffer()?.set_indices()) +impl PartialEq for FilterMask { + fn eq(&self, other: &Self) -> bool { + if self.len() != other.len() { + return false; + } + if self.true_count() != other.true_count() { + return false; + } + + // Since the true counts are the same, a full or empty mask is equal to the other mask. + if self.true_count() == 0 || self.true_count() == self.len() { + return true; + } + + // Compare the buffer if both masks are non-empty. + if let (Some(buffer), Some(other)) = (self.0.buffer.get(), other.0.buffer.get()) { + return buffer == other; + } + + // Compare the indices if both masks are non-empty. + if let (Some(indices), Some(other)) = (self.0.indices.get(), other.0.indices.get()) { + return indices == other; + } + + // Compare the slices if both masks are non-empty. + if let (Some(slices), Some(other)) = (self.0.slices.get(), other.0.slices.get()) { + return slices == other; + } + + // Otherwise, we fall back to comparison based on sparsity. + // We could go further an exhaustively check whose OnceLocks are initialized, but that's + // probably not worth the effort. + self.boolean_buffer() == other.boolean_buffer() + } +} + +impl Eq for FilterMask {} + +impl BitAnd for &FilterMask { + type Output = FilterMask; + + fn bitand(self, rhs: Self) -> Self::Output { + if self.len() != rhs.len() { + vortex_panic!("FilterMasks must have the same length"); + } + if self.true_count() == 0 || rhs.true_count() == 0 { + return FilterMask::new_false(self.len()); + } + if self.true_count() == self.len() { + return rhs.clone(); + } + if rhs.true_count() == self.len() { + return self.clone(); + } + + if let (Some(lhs), Some(rhs)) = (self.0.buffer.get(), rhs.0.buffer.get()) { + return FilterMask::from_buffer(lhs & rhs); + } + + if let (Some(lhs), Some(rhs)) = (self.0.indices.get(), rhs.0.indices.get()) { + // TODO(ngates): this may only make sense for sparse indices. + return FilterMask::from_intersection_indices( + self.len(), + lhs.iter().copied(), + rhs.iter().copied(), + ); + } + + // TODO(ngates): we could perform a more efficient intersection for slices. + FilterMask::from_buffer(self.boolean_buffer() & rhs.boolean_buffer()) } } @@ -325,34 +544,35 @@ impl TryFrom for FilterMask { .compute_true_count() .ok_or_else(|| vortex_err!("Failed to compute true count for boolean array"))?; - let selectivity = true_count as f64 / array.len() as f64; + if true_count == 0 { + return Ok(Self::new_false(array.len())); + } + if true_count == array.len() { + return Ok(Self::new_true(array.len())); + } - Ok(Self { - array, - true_count, - range_selectivity: selectivity, - indices: Arc::new(OnceLock::new()), - slices: Arc::new(OnceLock::new()), - buffer: Arc::new(OnceLock::new()), - }) + // TODO(ngates): should we have a `to_filter_mask` compute function where encodings + // pick the best possible conversion? E.g. SparseArray may want from_indices. + Ok(Self::from_buffer(array.into_bool()?.boolean_buffer())) } } impl From for FilterMask { fn from(value: BooleanBuffer) -> Self { - Self::try_from(BoolArray::from(value).into_array()) - .vortex_expect("Failed to convert BooleanBuffer to FilterMask") + Self::from_buffer(value) } } impl FromIterator for FilterMask { fn from_iter>(iter: T) -> Self { - Self::from(BooleanBuffer::from_iter(iter)) + Self::from_buffer(BooleanBuffer::from_iter(iter)) } } #[cfg(test)] mod test { + use itertools::Itertools; + use super::*; use crate::array::{BoolArray, PrimitiveArray}; use crate::compute::filter::filter; @@ -368,7 +588,7 @@ mod test { ) .unwrap(); - let filtered = filter(&items, mask).unwrap(); + let filtered = filter(&items, &mask).unwrap(); assert_eq!( filtered .into_canonical() @@ -379,4 +599,67 @@ mod test { &[0i32, 1i32, 2i32] ); } + + #[test] + fn filter_mask_all_true() { + let mask = FilterMask::new_true(5); + assert_eq!(mask.len(), 5); + assert_eq!(mask.true_count(), 5); + assert_eq!(mask.selectivity(), 1.0); + assert_eq!(mask.indices(), &[0, 1, 2, 3, 4]); + assert_eq!(mask.slices(), &[(0, 5)]); + assert_eq!(mask.boolean_buffer(), &BooleanBuffer::new_set(5)); + } + + #[test] + fn filter_mask_all_false() { + let mask = FilterMask::new_false(5); + assert_eq!(mask.len(), 5); + assert_eq!(mask.true_count(), 0); + assert_eq!(mask.selectivity(), 0.0); + assert_eq!(mask.indices(), &[] as &[usize]); + assert_eq!(mask.slices(), &[]); + assert_eq!(mask.boolean_buffer(), &BooleanBuffer::new_unset(5)); + } + + #[test] + fn filter_mask_from() { + let masks = [ + FilterMask::from_indices(5, vec![0, 2, 3]), + FilterMask::from_slices(5, vec![(0, 1), (2, 4)]), + FilterMask::from_buffer(BooleanBuffer::from_iter([true, false, true, true, false])), + ]; + + for mask in &masks { + assert_eq!(mask.len(), 5); + assert_eq!(mask.true_count(), 3); + assert_eq!(mask.selectivity(), 0.6); + assert_eq!(mask.indices(), &[0, 2, 3]); + assert_eq!(mask.slices(), &[(0, 1), (2, 4)]); + assert_eq!( + &mask.boolean_buffer().iter().collect_vec(), + &[true, false, true, true, false] + ); + } + } + + #[test] + fn filter_mask_eq() { + assert_eq!( + FilterMask::new_true(5), + FilterMask::from_buffer(BooleanBuffer::new_set(5)) + ); + assert_eq!( + FilterMask::new_false(5), + FilterMask::from_buffer(BooleanBuffer::new_unset(5)) + ); + assert_eq!( + FilterMask::from_indices(5, vec![0, 2, 3]), + FilterMask::from_slices(5, vec![(0, 1), (2, 4)]) + ); + assert_eq!( + FilterMask::from_indices(5, vec![0, 2, 3]), + FilterMask::from_buffer(BooleanBuffer::from_iter([true, false, true, true, false])) + ); + } } diff --git a/vortex-array/src/patches.rs b/vortex-array/src/patches.rs index 7d57a4561d..3122e8e3ee 100644 --- a/vortex-array/src/patches.rs +++ b/vortex-array/src/patches.rs @@ -194,12 +194,13 @@ impl Patches { } /// Filter the patches by a mask, resulting in new patches for the filtered array. - pub fn filter(&self, mask: FilterMask) -> VortexResult> { + pub fn filter(&self, mask: &FilterMask) -> VortexResult> { if mask.is_empty() { return Ok(None); } - let buffer = mask.to_boolean_buffer()?; + // TODO(ngates): add functions to operate with FilterMask directly + let buffer = mask.boolean_buffer(); let mut coordinate_indices = BufferMut::::empty(); let mut value_indices = BufferMut::::empty(); let mut last_inserted_index: usize = 0; @@ -402,7 +403,7 @@ mod test { ); let filtered = patches - .filter(FilterMask::from_indices(100, [10u32, 20, 30])) + .filter(&FilterMask::from_indices(100, vec![10, 20, 30])) .unwrap() .unwrap(); diff --git a/vortex-array/src/validity.rs b/vortex-array/src/validity.rs index 6128796a79..04f31480b1 100644 --- a/vortex-array/src/validity.rs +++ b/vortex-array/src/validity.rs @@ -229,7 +229,7 @@ impl Validity { v @ (Validity::NonNullable | Validity::AllValid | Validity::AllInvalid) => { Ok(v.clone()) } - Validity::Array(arr) => Ok(Validity::Array(filter(arr, mask.clone())?)), + Validity::Array(arr) => Ok(Validity::Array(filter(arr, mask)?)), } } diff --git a/vortex-file/src/read/mask.rs b/vortex-file/src/read/mask.rs index d123d4d43d..24a3138f2b 100644 --- a/vortex-file/src/read/mask.rs +++ b/vortex-file/src/read/mask.rs @@ -2,7 +2,6 @@ use std::cmp::{max, min}; use std::fmt::{Display, Formatter}; use arrow_buffer::BooleanBuffer; -use vortex_array::aliases::hash_set::HashSet; use vortex_array::array::{BoolArray, PrimitiveArray, SparseArray}; use vortex_array::compute::{and, filter, slice, try_cast, FilterMask}; use vortex_array::validity::{ArrayValidity, LogicalValidity, Validity}; @@ -27,7 +26,7 @@ impl PartialEq for RowMask { fn eq(&self, other: &Self) -> bool { self.begin == other.begin && self.end == other.end - && self.mask.to_boolean_buffer().unwrap() == other.mask.to_boolean_buffer().unwrap() + && self.mask.boolean_buffer() == other.mask.boolean_buffer() } } @@ -110,7 +109,11 @@ impl RowMask { // TODO(ngates): should from_indices take u64? let mask = FilterMask::from_indices( end - begin, - indices.as_slice::().iter().map(|i| *i as usize), + indices + .as_slice::() + .iter() + .map(|i| *i as usize) + .collect(), ); RowMask::try_new(mask, begin, end) @@ -145,10 +148,10 @@ impl RowMask { // If both masks align perfectly if self.begin == other.begin && self.end == other.end { - let this_buffer = self.mask.to_boolean_buffer()?; - let other_buffer = other.mask.to_boolean_buffer()?; + let this_buffer = self.mask.boolean_buffer(); + let other_buffer = other.mask.boolean_buffer(); - let unified = &this_buffer & (&other_buffer); + let unified = this_buffer & other_buffer; return RowMask::from_mask_array( BoolArray::from(unified).as_ref(), self.begin, @@ -164,25 +167,26 @@ impl RowMask { )); } + let output_begin = min(self.begin, other.begin); let output_end = max(self.end, other.end); + let output_len = output_end - output_begin; - let this_buffer = self.mask.to_boolean_buffer()?; - let other_buffer = other.mask.to_boolean_buffer()?; - let self_indices = this_buffer - .set_indices() - .map(|v| v + self.begin) - .collect::>(); - let other_indices = other_buffer - .set_indices() - .map(|v| v + other.begin) - .collect::>(); - - let output_mask = FilterMask::from_indices( - output_end, - self_indices.intersection(&other_indices).copied(), + let output_mask = FilterMask::from_intersection_indices( + output_len, + self.mask + .indices() + .iter() + .copied() + .map(|v| v + self.begin - output_begin), + other + .mask + .indices() + .iter() + .copied() + .map(|v| v + other.begin - output_begin), ); - Self::try_new(output_mask, 0, output_end) + Self::try_new(output_mask, output_begin, output_end) } pub fn is_all_false(&self) -> bool { @@ -215,7 +219,7 @@ impl RowMask { } else { FilterMask::from( self.mask - .to_boolean_buffer()? + .boolean_buffer() .slice(range_begin - self.begin, range_end - range_begin), ) }, @@ -248,15 +252,16 @@ impl RowMask { return Ok(Some(sliced.clone())); } - filter(sliced, self.mask.clone()).map(Some) + filter(sliced, &self.mask).map(Some) } #[allow(deprecated)] fn to_indices_array(&self) -> VortexResult { Ok(PrimitiveArray::new( self.mask - .iter_indices()? - .map(|i| i as u64) + .indices() + .iter() + .map(|i| *i as u64) .collect::>(), Validity::NonNullable, ) diff --git a/vortex-layout/src/layouts/flat/eval_expr.rs b/vortex-layout/src/layouts/flat/eval_expr.rs index a30f672bee..5b2132b378 100644 --- a/vortex-layout/src/layouts/flat/eval_expr.rs +++ b/vortex-layout/src/layouts/flat/eval_expr.rs @@ -51,7 +51,7 @@ impl ExprEvaluator for FlatReader { // And finally apply the expression // TODO(ngates): what's the best order to apply the filter mask / expression? let array = expr.evaluate(&array)?; - filter(&array, row_mask.into_filter_mask()?) + filter(&array, &row_mask.into_filter_mask()?) } } diff --git a/vortex-scan/src/range_scan.rs b/vortex-scan/src/range_scan.rs index 55daf862be..2c44158827 100644 --- a/vortex-scan/src/range_scan.rs +++ b/vortex-scan/src/range_scan.rs @@ -103,11 +103,10 @@ impl RangeScan { match &self.state { State::FilterEval(_) => { // Intersect the result of the filter expression with our initial row mask. - let mask = result.into_bool()?.boolean_buffer(); - let mask = self.mask.to_boolean_buffer()?.bitand(&mask); + let mask = FilterMask::from_buffer(result.into_bool()?.boolean_buffer()); + let mask = self.mask.bitand(&mask); // Then move onto the projection - self.state = - State::Project((FilterMask::from(mask), self.scan.projection().clone())) + self.state = State::Project((mask, self.scan.projection().clone())) } State::Project(_) => { // We're done. diff --git a/vortex-scan/src/row_mask.rs b/vortex-scan/src/row_mask.rs index 240e93ed88..59deb8fb58 100644 --- a/vortex-scan/src/row_mask.rs +++ b/vortex-scan/src/row_mask.rs @@ -1,9 +1,8 @@ use std::cmp::{max, min}; use std::fmt::{Display, Formatter}; -use std::ops::RangeBounds; +use std::ops::{BitAnd, RangeBounds}; -use vortex_array::aliases::hash_set::HashSet; -use vortex_array::array::{BoolArray, BooleanBuffer, PrimitiveArray, SparseArray}; +use vortex_array::array::{BooleanBuffer, PrimitiveArray, SparseArray}; use vortex_array::compute::{and, filter, slice, try_cast, FilterMask}; use vortex_array::validity::{ArrayValidity, LogicalValidity, Validity}; use vortex_array::{ArrayDType, ArrayData, IntoArrayData, IntoArrayVariant}; @@ -22,12 +21,11 @@ pub struct RowMask { end: u64, } +// We don't want to implement full logical equality, this naive equality is sufficient for tests. #[cfg(test)] impl PartialEq for RowMask { fn eq(&self, other: &Self) -> bool { - self.begin == other.begin - && self.end == other.end - && self.mask.to_boolean_buffer().unwrap() == other.mask.to_boolean_buffer().unwrap() + self.begin == other.begin && self.end == other.end && self.mask == other.mask } } @@ -106,7 +104,11 @@ impl RowMask { let mask = FilterMask::from_indices( length, - indices.as_slice::().iter().map(|i| *i as usize), + indices + .as_slice::() + .iter() + .map(|i| *i as usize) + .collect(), ); Ok(RowMask::new(mask, begin)) @@ -160,18 +162,14 @@ impl RowMask { } } - pub fn and_rowmask(&self, other: RowMask) -> VortexResult { + pub fn and_rowmask(self, other: RowMask) -> VortexResult { if other.true_count() == other.len() { - return Ok(self.clone()); + return Ok(self); } // If both masks align perfectly if self.begin == other.begin && self.end == other.end { - let this_buffer = self.mask.to_boolean_buffer()?; - let other_buffer = other.mask.to_boolean_buffer()?; - - let unified = &this_buffer & (&other_buffer); - return RowMask::from_mask_array(BoolArray::from(unified).as_ref(), self.begin); + return Ok(RowMask::new(self.mask.bitand(&other.mask), self.begin)); } // Disjoint row ranges @@ -187,25 +185,21 @@ impl RowMask { let output_len = usize::try_from(output_end - output_begin) .map_err(|_| vortex_err!("Range length does not fit into a usize"))?; - let this_buffer = self.mask.to_boolean_buffer()?; - let other_buffer = other.mask.to_boolean_buffer()?; - - // TODO(ngates): do not use a set for this. We know both iterators are sorted. - let self_indices = this_buffer - .set_indices() - .map(|v| v as u64 + self.begin) - .collect::>(); - let other_indices = other_buffer - .set_indices() - .map(|v| v as u64 + other.begin) - .collect::>(); - - let output_mask = FilterMask::from_indices( + let output_mask = FilterMask::from_intersection_indices( output_len, - self_indices.intersection(&other_indices).copied().map(|i| { - usize::try_from(i - output_begin) - .vortex_expect("we know this must fit within usize") - }), + self.mask + .indices() + .iter() + .copied() + .map(|v| v as u64 + self.begin - output_begin) + .map(|v| usize::try_from(v).vortex_expect("mask index must fit into usize")), + other + .mask + .indices() + .iter() + .copied() + .map(|v| v as u64 + other.begin - output_begin) + .map(|v| usize::try_from(v).vortex_expect("mask index must fit into usize")), ); Ok(Self::new(output_mask, output_begin)) @@ -239,13 +233,11 @@ impl RowMask { if range_begin == self.begin && range_end == self.end { self.mask.clone() } else { - FilterMask::from( - self.mask.to_boolean_buffer()?.slice( - usize::try_from(range_begin - self.begin) - .vortex_expect("we know this must fit into usize"), - usize::try_from(range_end - range_begin) - .vortex_expect("we know this must fit into usize"), - ), + self.mask.slice( + usize::try_from(range_begin - self.begin) + .vortex_expect("we know this must fit into usize"), + usize::try_from(range_end - range_begin) + .vortex_expect("we know this must fit into usize"), ) }, range_begin, @@ -281,15 +273,15 @@ impl RowMask { return Ok(Some(sliced.clone())); } - filter(sliced, self.mask.clone()).map(Some) + filter(sliced, &self.mask).map(Some) } - #[allow(deprecated)] fn to_indices_array(&self) -> VortexResult { Ok(PrimitiveArray::new( self.mask - .iter_indices()? - .map(|i| i as u64) + .indices() + .iter() + .map(|i| *i as u64) .collect::>(), Validity::NonNullable, ) From a0f9c7da196c5fa5adc3345d61400bfb237df912 Mon Sep 17 00:00:00 2001 From: Robert Kruszewski Date: Wed, 15 Jan 2025 14:23:17 +0000 Subject: [PATCH 7/9] fix: FilterMask::from_indices requires a vec (broken after filtermask changes) (#1959) --- vortex-file/src/v2/file.rs | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/vortex-file/src/v2/file.rs b/vortex-file/src/v2/file.rs index 49728b08dd..96f985eef5 100644 --- a/vortex-file/src/v2/file.rs +++ b/vortex-file/src/v2/file.rs @@ -97,9 +97,12 @@ impl VortexFile { let filter_mask = FilterMask::from_indices( usize::try_from(row_range.end - row_range.start) .vortex_expect("Split ranges are within usize"), - row_indices[start_idx..end_idx].iter().map(|&idx| { - usize::try_from(idx - row_range.start).vortex_expect("index within range") - }), + row_indices[start_idx..end_idx] + .iter() + .map(|&idx| { + usize::try_from(idx - row_range.start).vortex_expect("index within range") + }) + .collect(), ); Some(RowMask::new(filter_mask, row_range.start)) }); From d3bb8f6e27fb14a3a190f6bfc470fad4139b6203 Mon Sep 17 00:00:00 2001 From: Nicholas Gates Date: Wed, 15 Jan 2025 15:04:19 +0000 Subject: [PATCH 8/9] Fix clickbench (#1956) Revert back to parallelism using Tokio --- bench-vortex/src/clickbench.rs | 14 ++--- vortex-file/src/v2/file.rs | 2 +- vortex-file/src/v2/footer/file_layout.rs | 32 ++++++++++- vortex-file/src/v2/footer/segment.rs | 2 +- vortex-file/src/v2/io/file.rs | 61 ++++++++++++--------- vortex-file/src/v2/open/mod.rs | 9 +-- vortex-file/src/v2/writer.rs | 8 +-- vortex-layout/src/layouts/flat/eval_expr.rs | 45 +++++++++++++-- vortex-layout/src/strategies/mod.rs | 2 +- vortex-scan/src/lib.rs | 2 +- vortex-scan/src/row_mask.rs | 11 ++-- 11 files changed, 125 insertions(+), 63 deletions(-) diff --git a/bench-vortex/src/clickbench.rs b/bench-vortex/src/clickbench.rs index f35a093c28..fa5ab090a6 100644 --- a/bench-vortex/src/clickbench.rs +++ b/bench-vortex/src/clickbench.rs @@ -7,9 +7,7 @@ use datafusion::datasource::listing::{ ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl, }; use datafusion::prelude::{ParquetReadOptions, SessionContext}; -use futures::executor::block_on; -use itertools::Itertools; -use rayon::prelude::*; +use futures::{stream, StreamExt, TryStreamExt}; use tokio::fs::{create_dir_all, OpenOptions}; use vortex::aliases::hash_map::HashMap; use vortex::array::{ChunkedArray, StructArray}; @@ -152,9 +150,7 @@ pub async fn register_vortex_files( let vortex_dir = input_path.join("vortex"); create_dir_all(&vortex_dir).await?; - (0..100) - .collect_vec() - .par_iter() + stream::iter(0..100) .map(|idx| { let parquet_file_path = input_path .join("parquet") @@ -163,7 +159,7 @@ pub async fn register_vortex_files( let session = session.clone(); let schema = schema.clone(); - block_on(async move { + tokio::spawn(async move { let output_path = output_path.clone(); idempotent_async(&output_path, move |vtx_file| async move { eprintln!("Processing file {idx}"); @@ -234,7 +230,9 @@ pub async fn register_vortex_files( .expect("Failed to write Vortex file") }) }) - .collect::>(); + .buffered(16) + .try_collect::>() + .await?; let format = Arc::new(VortexFormat::new(CTX.clone())); let table_path = vortex_dir diff --git a/vortex-file/src/v2/file.rs b/vortex-file/src/v2/file.rs index 96f985eef5..231dff4a36 100644 --- a/vortex-file/src/v2/file.rs +++ b/vortex-file/src/v2/file.rs @@ -126,7 +126,7 @@ impl VortexFile { // Create a single LayoutReader that is reused for the entire scan. let reader: Arc = self .file_layout - .root_layout + .root_layout() .reader(segment_channel.reader(), self.ctx.clone())?; // Now we give one end of the channel to the layout reader... diff --git a/vortex-file/src/v2/footer/file_layout.rs b/vortex-file/src/v2/footer/file_layout.rs index a0b641990f..63c293bb19 100644 --- a/vortex-file/src/v2/footer/file_layout.rs +++ b/vortex-file/src/v2/footer/file_layout.rs @@ -1,5 +1,6 @@ use std::sync::Arc; +use itertools::Itertools; use vortex_dtype::DType; use vortex_flatbuffers::{footer2 as fb, FlatBufferRoot, WriteFlatBuffer}; use vortex_layout::LayoutData; @@ -9,11 +10,38 @@ use crate::v2::footer::segment::Segment; /// Captures the layout information of a Vortex file. #[derive(Debug, Clone)] pub struct FileLayout { - pub(crate) root_layout: LayoutData, - pub(crate) segments: Arc<[Segment]>, + root_layout: LayoutData, + segments: Arc<[Segment]>, } impl FileLayout { + /// Create a new `FileLayout` from the root layout and segments. + /// + /// ## Panics + /// + /// Panics if the segments are not ordered by byte offset. + pub fn new(root_layout: LayoutData, segments: Arc<[Segment]>) -> Self { + // Note this assertion is `<=` since we allow zero-length segments + assert!(segments + .iter() + .tuple_windows() + .all(|(a, b)| a.offset <= b.offset)); + Self { + root_layout, + segments, + } + } + + /// Returns the root [`LayoutData`] of the file. + pub fn root_layout(&self) -> &LayoutData { + &self.root_layout + } + + /// Returns the segment map of the file. + pub fn segment_map(&self) -> &Arc<[Segment]> { + &self.segments + } + /// Returns the [`DType`] of the file. pub fn dtype(&self) -> &DType { self.root_layout.dtype() diff --git a/vortex-file/src/v2/footer/segment.rs b/vortex-file/src/v2/footer/segment.rs index f988123f50..321ddd46fa 100644 --- a/vortex-file/src/v2/footer/segment.rs +++ b/vortex-file/src/v2/footer/segment.rs @@ -4,7 +4,7 @@ use vortex_flatbuffers::footer2 as fb; /// The location of a segment within a Vortex file. #[derive(Clone, Debug)] -pub(crate) struct Segment { +pub struct Segment { pub(crate) offset: u64, pub(crate) length: u32, pub(crate) alignment: Alignment, diff --git a/vortex-file/src/v2/io/file.rs b/vortex-file/src/v2/io/file.rs index f40818e232..1d6abfaf1e 100644 --- a/vortex-file/src/v2/io/file.rs +++ b/vortex-file/src/v2/io/file.rs @@ -1,5 +1,5 @@ +use std::cmp::Ordering; use std::future::Future; -use std::iter; use std::ops::Range; use std::sync::Arc; @@ -8,7 +8,7 @@ use futures::Stream; use futures_util::future::try_join_all; use futures_util::{stream, StreamExt}; use vortex_buffer::{ByteBuffer, ByteBufferMut}; -use vortex_error::{vortex_err, VortexExpect, VortexResult}; +use vortex_error::{vortex_err, vortex_panic, VortexExpect, VortexResult}; use vortex_io::VortexReadAt; use vortex_layout::segments::SegmentId; @@ -64,7 +64,7 @@ impl FileSegmentRequest { struct CoalescedSegmentRequest { /// The range of the file to read. pub(crate) byte_range: Range, - /// The original segment requests. + /// The original segment requests, ordered by segment ID. pub(crate) requests: Vec, } @@ -74,7 +74,7 @@ impl IoDriver for FileIoDriver { stream: impl Stream + 'static, ) -> impl Stream> + 'static { // We map the segment requests to their respective locations within the file. - let segment_map = self.file_layout.segments.clone(); + let segment_map = self.file_layout.segment_map().clone(); let stream = stream.filter_map(move |request| { let segment_map = segment_map.clone(); async move { @@ -142,7 +142,7 @@ impl IoDriver for FileIoDriver { // Submit the coalesced requests to the I/O. let read = self.read.clone(); - let segment_map = self.file_layout.segments.clone(); + let segment_map = self.file_layout.segment_map().clone(); let segment_cache = self.segment_cache.clone(); let stream = stream.map(move |request| { let read = read.clone(); @@ -187,34 +187,40 @@ async fn evaluate( let start = segment_map.partition_point(|s| s.offset < request.byte_range.start); let end = segment_map.partition_point(|s| s.offset < request.byte_range.end); - let mut requests = iter::repeat_with(|| None) - .take(end - start) - .collect::>(); - for req in request.requests { - let id = *req.id as usize; - requests[id - start] = Some(req); - } + // Note that we may have multiple requests for the same segment. + let mut requests_iter = request.requests.into_iter().peekable(); - let mut cache_futures = Vec::with_capacity(requests.len()); - for (i, (segment, maybe_req)) in segment_map[start..end] - .iter() - .zip(requests.into_iter()) - .enumerate() - { + let mut cache_futures = Vec::with_capacity(end - start); + for (i, segment) in segment_map[start..end].iter().enumerate() { + let segment_id = SegmentId::from(u32::try_from(i + start).vortex_expect("segment id")); let offset = usize::try_from(segment.offset - request.byte_range.start)?; let buf = buffer .slice(offset..offset + segment.length as usize) .aligned(segment.alignment); - // Send the callback - if let Some(req) = maybe_req { - req.callback - .send(Ok(buf.clone())) - .map_err(|_| vortex_err!("send failed"))?; + // Find any request callbacks and send the buffer + while let Some(req) = requests_iter.peek() { + // If the request is before the current segment, we should have already resolved it. + match req.id.cmp(&segment_id) { + Ordering::Less => { + // This should never happen, it means we missed a segment request. + vortex_panic!("Skipped segment request"); + } + Ordering::Equal => { + // Resolve the request + requests_iter + .next() + .vortex_expect("next request") + .resolve(Ok(buf.clone())); + } + Ordering::Greater => { + // No request for this segment, so we continue + break; + } + } } - let id = SegmentId::from(u32::try_from(i + start).vortex_expect("segment id")); - cache_futures.push(segment_cache.put(id, buf)); + cache_futures.push(segment_cache.put(segment_id, buf)); } // Populate the cache @@ -245,6 +251,11 @@ fn coalesce(requests: Vec) -> Vec { coalesced.as_mut_slice()[idx].requests.push(req); } + // Ensure we sort the requests by segment ID within the coalesced request. + for req in coalesced.iter_mut() { + req.requests.sort_unstable_by_key(|r| r.id); + } + coalesced } diff --git a/vortex-file/src/v2/open/mod.rs b/vortex-file/src/v2/open/mod.rs index 290208a19a..bf8b19e6fe 100644 --- a/vortex-file/src/v2/open/mod.rs +++ b/vortex-file/src/v2/open/mod.rs @@ -148,7 +148,7 @@ impl VortexOpenOptions { .into_driver(); // Compute the splits of the file. - let splits = self.split_by.splits(&file_layout.root_layout)?.into(); + let splits = self.split_by.splits(file_layout.root_layout())?.into(); // Finally, create the VortexFile. Ok(VortexFile { @@ -304,10 +304,7 @@ impl VortexOpenOptions { .ok_or_else(|| vortex_err!("FileLayout missing segments"))?; let segments = fb_segments.iter().map(Segment::try_from).try_collect()?; - Ok(FileLayout { - root_layout, - segments, - }) + Ok(FileLayout::new(root_layout, segments)) } /// Populate segments in the cache that were covered by the initial read. @@ -318,7 +315,7 @@ impl VortexOpenOptions { file_layout: &FileLayout, segments: &dyn SegmentCache, ) -> VortexResult<()> { - for (idx, segment) in file_layout.segments.iter().enumerate() { + for (idx, segment) in file_layout.segment_map().iter().enumerate() { if segment.offset < initial_offset { // Skip segments that aren't in the initial read. continue; diff --git a/vortex-file/src/v2/writer.rs b/vortex-file/src/v2/writer.rs index 8aec373d54..0092b5be81 100644 --- a/vortex-file/src/v2/writer.rs +++ b/vortex-file/src/v2/writer.rs @@ -67,13 +67,7 @@ impl VortexWriteOptions { // Write the DType + FileLayout segments let dtype_segment = self.write_flatbuffer(&mut write, stream.dtype()).await?; let file_layout_segment = self - .write_flatbuffer( - &mut write, - &FileLayout { - root_layout, - segments: segments.into(), - }, - ) + .write_flatbuffer(&mut write, &FileLayout::new(root_layout, segments.into())) .await?; // Assemble the postscript, and write it manually to avoid any framing. diff --git a/vortex-layout/src/layouts/flat/eval_expr.rs b/vortex-layout/src/layouts/flat/eval_expr.rs index 5b2132b378..dc583ccb1b 100644 --- a/vortex-layout/src/layouts/flat/eval_expr.rs +++ b/vortex-layout/src/layouts/flat/eval_expr.rs @@ -1,7 +1,7 @@ use async_trait::async_trait; use flatbuffers::root; use futures::future::try_join_all; -use vortex_array::compute::filter; +use vortex_array::compute::{filter, slice}; use vortex_array::parts::ArrayParts; use vortex_array::ArrayData; use vortex_error::{vortex_err, VortexExpect, VortexResult}; @@ -47,11 +47,24 @@ impl ExprEvaluator for FlatReader { // Decode into an ArrayData. let array = array_parts.decode(self.ctx(), self.dtype().clone())?; + assert_eq!( + array.len() as u64, + self.row_count(), + "FlatLayout array length mismatch {} != {}", + array.len(), + self.row_count() + ); - // And finally apply the expression // TODO(ngates): what's the best order to apply the filter mask / expression? - let array = expr.evaluate(&array)?; - filter(&array, &row_mask.into_filter_mask()?) + + // Filter the array based on the row mask. + let begin = usize::try_from(row_mask.begin()) + .vortex_expect("RowMask begin must fit within FlatLayout size"); + let array = slice(array, begin, begin + row_mask.len())?; + let array = filter(&array, row_mask.filter_mask())?; + + // Then apply the expression + expr.evaluate(&array) } } @@ -65,7 +78,7 @@ mod test { use vortex_array::validity::Validity; use vortex_array::{ArrayDType, IntoArrayVariant, ToArrayData}; use vortex_buffer::buffer; - use vortex_expr::{gt, lit, Identity}; + use vortex_expr::{gt, ident, lit, Identity}; use vortex_scan::RowMask; use crate::layouts::flat::writer::FlatLayoutWriter; @@ -122,4 +135,26 @@ mod test { ); }) } + + #[test] + fn flat_unaligned_row_mask() { + block_on(async { + let mut segments = TestSegments::default(); + let array = PrimitiveArray::new(buffer![1, 2, 3, 4, 5], Validity::AllValid); + let layout = FlatLayoutWriter::new(array.dtype().clone()) + .push_one(&mut segments, array.to_array()) + .unwrap(); + + let result = layout + .reader(Arc::new(segments), Default::default()) + .unwrap() + .evaluate_expr(RowMask::new_valid_between(2, 4), ident()) + .await + .unwrap() + .into_primitive() + .unwrap(); + + assert_eq!(result.as_slice::(), &[3, 4],); + }) + } } diff --git a/vortex-layout/src/strategies/mod.rs b/vortex-layout/src/strategies/mod.rs index e120d143d5..6264be74f4 100644 --- a/vortex-layout/src/strategies/mod.rs +++ b/vortex-layout/src/strategies/mod.rs @@ -66,7 +66,7 @@ pub trait LayoutWriterExt: LayoutWriter { impl LayoutWriterExt for L {} /// A trait for creating new layout writers given a DType. -pub trait LayoutStrategy: Send { +pub trait LayoutStrategy: Send + Sync { fn new_writer(&self, dtype: &DType) -> VortexResult>; } diff --git a/vortex-scan/src/lib.rs b/vortex-scan/src/lib.rs index 2974fb8a98..9e369e8c6d 100644 --- a/vortex-scan/src/lib.rs +++ b/vortex-scan/src/lib.rs @@ -78,7 +78,7 @@ impl Scan { Ok(RangeScan::new( self, row_mask.begin(), - row_mask.into_filter_mask()?, + row_mask.filter_mask().clone(), )) } } diff --git a/vortex-scan/src/row_mask.rs b/vortex-scan/src/row_mask.rs index 59deb8fb58..0673ae8d67 100644 --- a/vortex-scan/src/row_mask.rs +++ b/vortex-scan/src/row_mask.rs @@ -225,6 +225,11 @@ impl RowMask { self.mask.is_empty() } + /// Returns the [`FilterMask`] whose true values are relative to the range of this `RowMask`. + pub fn filter_mask(&self) -> &FilterMask { + &self.mask + } + /// Limit mask to `[begin..end)` range pub fn slice(&self, begin: u64, end: u64) -> VortexResult { let range_begin = max(self.begin, begin); @@ -304,12 +309,6 @@ impl RowMask { pub fn true_count(&self) -> usize { self.mask.true_count() } - - // Convert the [`RowMask`] into a [`FilterMask`]. - pub fn into_filter_mask(self) -> VortexResult { - let offset = self.begin; - Ok(self.shift(offset)?.mask) - } } #[cfg(test)] From b20b2a753b591c3df668f9051ec90977a1be4e3b Mon Sep 17 00:00:00 2001 From: Joe Isaacs Date: Wed, 15 Jan 2025 17:00:27 +0000 Subject: [PATCH 9/9] Added a ScanBuilder & dont evaluate a project with an empty `row_mask` in scan (#1960) --- bench-vortex/src/reader.rs | 3 +- vortex-datafusion/src/persistent/opener.rs | 7 +-- vortex-file/src/v2/file.rs | 61 ++++++++++++++++------ vortex-file/src/v2/tests.rs | 1 - vortex-scan/src/lib.rs | 51 ++++++++---------- vortex-scan/src/range_scan.rs | 24 +++++---- 6 files changed, 86 insertions(+), 61 deletions(-) diff --git a/bench-vortex/src/reader.rs b/bench-vortex/src/reader.rs index 106d4606ce..43e5132d16 100644 --- a/bench-vortex/src/reader.rs +++ b/bench-vortex/src/reader.rs @@ -28,10 +28,9 @@ use vortex::buffer::Buffer; use vortex::compress::CompressionStrategy; use vortex::dtype::DType; use vortex::error::VortexResult; -use vortex::file::v2::{VortexOpenOptions, VortexWriteOptions}; +use vortex::file::v2::{Scan, VortexOpenOptions, VortexWriteOptions}; use vortex::io::{ObjectStoreReadAt, TokioFile, VortexReadAt, VortexWrite}; use vortex::sampling_compressor::{SamplingCompressor, ALL_ENCODINGS_CONTEXT}; -use vortex::scan::Scan; use vortex::stream::ArrayStreamExt; use vortex::{ArrayData, IntoArrayData, IntoCanonical}; diff --git a/vortex-datafusion/src/persistent/opener.rs b/vortex-datafusion/src/persistent/opener.rs index d175ee0e8d..310a60af00 100644 --- a/vortex-datafusion/src/persistent/opener.rs +++ b/vortex-datafusion/src/persistent/opener.rs @@ -15,9 +15,8 @@ use vortex_error::VortexResult; use vortex_expr::datafusion::convert_expr_to_vortex; use vortex_expr::transform::simplify_typed::simplify_typed; use vortex_expr::{and, get_item, ident, lit, pack, ExprRef, Identity}; -use vortex_file::v2::{ExecutionMode, VortexOpenOptions}; +use vortex_file::v2::{ExecutionMode, Scan, VortexOpenOptions}; use vortex_io::ObjectStoreReadAt; -use vortex_scan::Scan; use super::cache::FileLayoutCache; @@ -87,8 +86,6 @@ impl FileOpener for VortexFileOpener { // Construct the projection expression based on the DataFusion projection mask. // Each index in the mask corresponds to the field position of the root DType. - let scan = Scan::new(self.projection.clone(), self.filter.clone()).into_arc(); - let read_at = ObjectStoreReadAt::new(this.object_store.clone(), file_meta.location().clone()); @@ -104,7 +101,7 @@ impl FileOpener for VortexFileOpener { .await?; Ok(vxf - .scan(scan)? + .scan(Scan::new(this.projection.clone(), this.filter.clone()))? .map_ok(RecordBatch::try_from) .map(|r| r.and_then(|inner| inner)) .map_err(|e| e.into()) diff --git a/vortex-file/src/v2/file.rs b/vortex-file/src/v2/file.rs index 231dff4a36..fdd7f04cf6 100644 --- a/vortex-file/src/v2/file.rs +++ b/vortex-file/src/v2/file.rs @@ -12,8 +12,9 @@ use vortex_array::{ArrayData, ContextRef}; use vortex_buffer::Buffer; use vortex_dtype::DType; use vortex_error::{vortex_bail, vortex_err, VortexExpect, VortexResult}; +use vortex_expr::{ExprRef, Identity}; use vortex_layout::{ExprEvaluator, LayoutReader}; -use vortex_scan::{RowMask, Scan}; +use vortex_scan::{RowMask, Scanner}; use crate::v2::exec::ExecDriver; use crate::v2::io::IoDriver; @@ -33,6 +34,28 @@ pub struct VortexFile { pub(crate) splits: Arc<[Range]>, } +pub struct Scan { + projection: ExprRef, + filter: Option, +} + +impl Scan { + pub fn all() -> Self { + Self { + projection: Identity::new_expr(), + filter: None, + } + } + + pub fn new(projection: ExprRef, filter: Option) -> Self { + Self { projection, filter } + } + + pub fn build(self, dtype: DType) -> VortexResult> { + Ok(Arc::new(Scanner::new(dtype, self.projection, self.filter)?)) + } +} + /// Async implementation of Vortex File. impl VortexFile { /// Returns the number of rows in the file. @@ -51,7 +74,7 @@ impl VortexFile { } /// Performs a scan operation over the file. - pub fn scan(&self, scan: Arc) -> VortexResult> { + pub fn scan(&self, scan: Scan) -> VortexResult> { self.scan_with_masks( ArcIter::new(self.splits.clone()) .map(|row_range| RowMask::new_valid_between(row_range.start, row_range.end)), @@ -64,7 +87,7 @@ impl VortexFile { pub fn take( &self, row_indices: Buffer, - scan: Arc, + scan: Scan, ) -> VortexResult> { if !row_indices.windows(2).all(|w| w[0] <= w[1]) { vortex_bail!("row indices must be sorted") @@ -113,12 +136,14 @@ impl VortexFile { fn scan_with_masks( &self, row_masks: R, - scan: Arc, + scan: Scan, ) -> VortexResult> where R: Iterator + Send + 'static, { - let result_dtype = scan.result_dtype(self.dtype())?; + let scanner = scan.build(self.dtype().clone())?; + + let result_dtype = scanner.result_dtype().clone(); // Set up a segment channel to collect segment requests from the execution stream. let segment_channel = SegmentChannel::new(); @@ -131,18 +156,22 @@ impl VortexFile { // Now we give one end of the channel to the layout reader... let exec_stream = stream::iter(row_masks) - .map(move |row_mask| match scan.clone().range_scan(row_mask) { - Ok(range_scan) => { - let reader = reader.clone(); - async move { - range_scan - .evaluate_async(|row_mask, expr| reader.evaluate_expr(row_mask, expr)) - .await + .map( + move |row_mask| match scanner.clone().range_scanner(row_mask) { + Ok(range_scan) => { + let reader = reader.clone(); + async move { + range_scan + .evaluate_async(|row_mask, expr| { + reader.evaluate_expr(row_mask, expr) + }) + .await + } + .boxed() } - .boxed() - } - Err(e) => futures::future::ready(Err(e)).boxed(), - }) + Err(e) => futures::future::ready(Err(e)).boxed(), + }, + ) .boxed(); let exec_stream = self.exec_driver.drive(exec_stream); diff --git a/vortex-file/src/v2/tests.rs b/vortex-file/src/v2/tests.rs index e2c8810b01..29118274c3 100644 --- a/vortex-file/src/v2/tests.rs +++ b/vortex-file/src/v2/tests.rs @@ -5,7 +5,6 @@ use vortex_array::stream::ArrayStreamExt; use vortex_array::{ContextRef, IntoArrayData, IntoArrayVariant}; use vortex_buffer::buffer; use vortex_error::{VortexExpect, VortexResult}; -use vortex_scan::Scan; use crate::v2::io::IoDriver; use crate::v2::*; diff --git a/vortex-scan/src/lib.rs b/vortex-scan/src/lib.rs index 9e369e8c6d..adbe422613 100644 --- a/vortex-scan/src/lib.rs +++ b/vortex-scan/src/lib.rs @@ -8,13 +8,13 @@ pub use row_mask::*; use vortex_array::{ArrayDType, Canonical, IntoArrayData}; use vortex_dtype::DType; use vortex_error::VortexResult; -use vortex_expr::{ExprRef, Identity}; +use vortex_expr::ExprRef; /// Represents a scan operation to read data from a set of rows, with an optional filter expression, /// and a projection expression. /// -/// A scan operation can be broken into many [`RangeScan`] operations, each of which leverages -/// shared statistics from the parent [`Scan`] to optimize the order in which filter and projection +/// A scan operation can be broken into many [`RangeScanner`] operations, each of which leverages +/// shared statistics from the parent [`Scanner`] to optimize the order in which filter and projection /// operations are applied. /// /// For example, if a filter expression has a top-level `AND` clause, it may be the case that one @@ -22,35 +22,34 @@ use vortex_expr::{ExprRef, Identity}; /// most selective filter first, then prune rows using result of the filter, before evaluating /// the second filter over the reduced set of rows. #[derive(Debug, Clone)] -pub struct Scan { +pub struct Scanner { + #[allow(dead_code)] + dtype: DType, projection: ExprRef, filter: Option, + projection_dtype: DType, // A sorted list of row indices to include in the scan. We store row indices since they may // produce a very sparse RowMask. // take_indices: Vec, // statistics: RwLock } -impl Scan { +impl Scanner { /// Create a new scan with the given projection and optional filter. - pub fn new(projection: ExprRef, filter: Option) -> Self { + pub fn new(dtype: DType, projection: ExprRef, filter: Option) -> VortexResult { // TODO(ngates): compute and cache a FieldMask based on the referenced fields. // Where FieldMask ~= Vec - Self { projection, filter } - } - - /// Convert this scan into an Arc. - pub fn into_arc(self) -> Arc { - Arc::new(self) - } + let result_dtype = projection + .evaluate(&Canonical::empty(&dtype)?.into_array())? + .dtype() + .clone(); - /// Scan all rows with the identity projection. - pub fn all() -> Arc { - Self { - projection: Identity::new_expr(), - filter: None, - } - .into_arc() + Ok(Self { + dtype, + projection, + filter, + projection_dtype: result_dtype, + }) } /// Returns the filter expression, if any. @@ -64,18 +63,14 @@ impl Scan { } /// Compute the result dtype of the scan given the input dtype. - pub fn result_dtype(&self, dtype: &DType) -> VortexResult { - Ok(self - .projection - .evaluate(&Canonical::empty(dtype)?.into_array())? - .dtype() - .clone()) + pub fn result_dtype(&self) -> &DType { + &self.projection_dtype } /// Instantiate a new scan for a specific range. The range scan will share statistics with this /// parent scan in order to optimize future range scans. - pub fn range_scan(self: Arc, row_mask: RowMask) -> VortexResult { - Ok(RangeScan::new( + pub fn range_scanner(self: Arc, row_mask: RowMask) -> VortexResult { + Ok(RangeScanner::new( self, row_mask.begin(), row_mask.filter_mask().clone(), diff --git a/vortex-scan/src/range_scan.rs b/vortex-scan/src/range_scan.rs index 2c44158827..0e5ed328c2 100644 --- a/vortex-scan/src/range_scan.rs +++ b/vortex-scan/src/range_scan.rs @@ -3,14 +3,14 @@ use std::ops::{BitAnd, Range}; use std::sync::Arc; use vortex_array::compute::FilterMask; -use vortex_array::{ArrayData, IntoArrayVariant}; +use vortex_array::{ArrayData, Canonical, IntoArrayData, IntoArrayVariant}; use vortex_error::VortexResult; use vortex_expr::ExprRef; -use crate::{RowMask, Scan}; +use crate::{RowMask, Scanner}; -pub struct RangeScan { - scan: Arc, +pub struct RangeScanner { + scan: Arc, row_range: Range, mask: FilterMask, state: State, @@ -56,8 +56,8 @@ pub enum NextOp { // If instead we make the projection API `project(row_mask, expr)`, then the project API is // identical to the filter API and there's no point having both. Hence, a single // `evaluate(row_mask, expr)` API. -impl RangeScan { - pub(crate) fn new(scan: Arc, row_offset: u64, mask: FilterMask) -> Self { +impl RangeScanner { + pub(crate) fn new(scan: Arc, row_offset: u64, mask: FilterMask) -> Self { let state = scan .filter() .map(|filter| { @@ -106,7 +106,13 @@ impl RangeScan { let mask = FilterMask::from_buffer(result.into_bool()?.boolean_buffer()); let mask = self.mask.bitand(&mask); // Then move onto the projection - self.state = State::Project((mask, self.scan.projection().clone())) + if mask.is_empty() { + // If the mask is empty, then we're done. + self.state = + State::Ready(Canonical::empty(self.scan.result_dtype())?.into_array()); + } else { + self.state = State::Project((mask, self.scan.projection().clone())) + } } State::Project(_) => { // We're done. @@ -117,7 +123,7 @@ impl RangeScan { Ok(()) } - /// Evaluate the [`RangeScan`] operation using a synchronous expression evaluator. + /// Evaluate the [`RangeScanner`] operation using a synchronous expression evaluator. pub fn evaluate(mut self, evaluator: E) -> VortexResult where E: Fn(RowMask, ExprRef) -> VortexResult, @@ -132,7 +138,7 @@ impl RangeScan { } } - /// Evaluate the [`RangeScan`] operation using an async expression evaluator. + /// Evaluate the [`RangeScanner`] operation using an async expression evaluator. pub async fn evaluate_async(mut self, evaluator: E) -> VortexResult where E: Fn(RowMask, ExprRef) -> F,