diff --git a/bench-vortex/src/clickbench.rs b/bench-vortex/src/clickbench.rs index f35a093c2..fa5ab090a 100644 --- a/bench-vortex/src/clickbench.rs +++ b/bench-vortex/src/clickbench.rs @@ -7,9 +7,7 @@ use datafusion::datasource::listing::{ ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl, }; use datafusion::prelude::{ParquetReadOptions, SessionContext}; -use futures::executor::block_on; -use itertools::Itertools; -use rayon::prelude::*; +use futures::{stream, StreamExt, TryStreamExt}; use tokio::fs::{create_dir_all, OpenOptions}; use vortex::aliases::hash_map::HashMap; use vortex::array::{ChunkedArray, StructArray}; @@ -152,9 +150,7 @@ pub async fn register_vortex_files( let vortex_dir = input_path.join("vortex"); create_dir_all(&vortex_dir).await?; - (0..100) - .collect_vec() - .par_iter() + stream::iter(0..100) .map(|idx| { let parquet_file_path = input_path .join("parquet") @@ -163,7 +159,7 @@ pub async fn register_vortex_files( let session = session.clone(); let schema = schema.clone(); - block_on(async move { + tokio::spawn(async move { let output_path = output_path.clone(); idempotent_async(&output_path, move |vtx_file| async move { eprintln!("Processing file {idx}"); @@ -234,7 +230,9 @@ pub async fn register_vortex_files( .expect("Failed to write Vortex file") }) }) - .collect::>(); + .buffered(16) + .try_collect::>() + .await?; let format = Arc::new(VortexFormat::new(CTX.clone())); let table_path = vortex_dir diff --git a/vortex-file/src/v2/file.rs b/vortex-file/src/v2/file.rs index 96f985eef..231dff4a3 100644 --- a/vortex-file/src/v2/file.rs +++ b/vortex-file/src/v2/file.rs @@ -126,7 +126,7 @@ impl VortexFile { // Create a single LayoutReader that is reused for the entire scan. let reader: Arc = self .file_layout - .root_layout + .root_layout() .reader(segment_channel.reader(), self.ctx.clone())?; // Now we give one end of the channel to the layout reader... diff --git a/vortex-file/src/v2/footer/file_layout.rs b/vortex-file/src/v2/footer/file_layout.rs index a0b641990..63c293bb1 100644 --- a/vortex-file/src/v2/footer/file_layout.rs +++ b/vortex-file/src/v2/footer/file_layout.rs @@ -1,5 +1,6 @@ use std::sync::Arc; +use itertools::Itertools; use vortex_dtype::DType; use vortex_flatbuffers::{footer2 as fb, FlatBufferRoot, WriteFlatBuffer}; use vortex_layout::LayoutData; @@ -9,11 +10,38 @@ use crate::v2::footer::segment::Segment; /// Captures the layout information of a Vortex file. #[derive(Debug, Clone)] pub struct FileLayout { - pub(crate) root_layout: LayoutData, - pub(crate) segments: Arc<[Segment]>, + root_layout: LayoutData, + segments: Arc<[Segment]>, } impl FileLayout { + /// Create a new `FileLayout` from the root layout and segments. + /// + /// ## Panics + /// + /// Panics if the segments are not ordered by byte offset. + pub fn new(root_layout: LayoutData, segments: Arc<[Segment]>) -> Self { + // Note this assertion is `<=` since we allow zero-length segments + assert!(segments + .iter() + .tuple_windows() + .all(|(a, b)| a.offset <= b.offset)); + Self { + root_layout, + segments, + } + } + + /// Returns the root [`LayoutData`] of the file. + pub fn root_layout(&self) -> &LayoutData { + &self.root_layout + } + + /// Returns the segment map of the file. + pub fn segment_map(&self) -> &Arc<[Segment]> { + &self.segments + } + /// Returns the [`DType`] of the file. pub fn dtype(&self) -> &DType { self.root_layout.dtype() diff --git a/vortex-file/src/v2/footer/segment.rs b/vortex-file/src/v2/footer/segment.rs index f988123f5..321ddd46f 100644 --- a/vortex-file/src/v2/footer/segment.rs +++ b/vortex-file/src/v2/footer/segment.rs @@ -4,7 +4,7 @@ use vortex_flatbuffers::footer2 as fb; /// The location of a segment within a Vortex file. #[derive(Clone, Debug)] -pub(crate) struct Segment { +pub struct Segment { pub(crate) offset: u64, pub(crate) length: u32, pub(crate) alignment: Alignment, diff --git a/vortex-file/src/v2/io/file.rs b/vortex-file/src/v2/io/file.rs index f40818e23..1d6abfaf1 100644 --- a/vortex-file/src/v2/io/file.rs +++ b/vortex-file/src/v2/io/file.rs @@ -1,5 +1,5 @@ +use std::cmp::Ordering; use std::future::Future; -use std::iter; use std::ops::Range; use std::sync::Arc; @@ -8,7 +8,7 @@ use futures::Stream; use futures_util::future::try_join_all; use futures_util::{stream, StreamExt}; use vortex_buffer::{ByteBuffer, ByteBufferMut}; -use vortex_error::{vortex_err, VortexExpect, VortexResult}; +use vortex_error::{vortex_err, vortex_panic, VortexExpect, VortexResult}; use vortex_io::VortexReadAt; use vortex_layout::segments::SegmentId; @@ -64,7 +64,7 @@ impl FileSegmentRequest { struct CoalescedSegmentRequest { /// The range of the file to read. pub(crate) byte_range: Range, - /// The original segment requests. + /// The original segment requests, ordered by segment ID. pub(crate) requests: Vec, } @@ -74,7 +74,7 @@ impl IoDriver for FileIoDriver { stream: impl Stream + 'static, ) -> impl Stream> + 'static { // We map the segment requests to their respective locations within the file. - let segment_map = self.file_layout.segments.clone(); + let segment_map = self.file_layout.segment_map().clone(); let stream = stream.filter_map(move |request| { let segment_map = segment_map.clone(); async move { @@ -142,7 +142,7 @@ impl IoDriver for FileIoDriver { // Submit the coalesced requests to the I/O. let read = self.read.clone(); - let segment_map = self.file_layout.segments.clone(); + let segment_map = self.file_layout.segment_map().clone(); let segment_cache = self.segment_cache.clone(); let stream = stream.map(move |request| { let read = read.clone(); @@ -187,34 +187,40 @@ async fn evaluate( let start = segment_map.partition_point(|s| s.offset < request.byte_range.start); let end = segment_map.partition_point(|s| s.offset < request.byte_range.end); - let mut requests = iter::repeat_with(|| None) - .take(end - start) - .collect::>(); - for req in request.requests { - let id = *req.id as usize; - requests[id - start] = Some(req); - } + // Note that we may have multiple requests for the same segment. + let mut requests_iter = request.requests.into_iter().peekable(); - let mut cache_futures = Vec::with_capacity(requests.len()); - for (i, (segment, maybe_req)) in segment_map[start..end] - .iter() - .zip(requests.into_iter()) - .enumerate() - { + let mut cache_futures = Vec::with_capacity(end - start); + for (i, segment) in segment_map[start..end].iter().enumerate() { + let segment_id = SegmentId::from(u32::try_from(i + start).vortex_expect("segment id")); let offset = usize::try_from(segment.offset - request.byte_range.start)?; let buf = buffer .slice(offset..offset + segment.length as usize) .aligned(segment.alignment); - // Send the callback - if let Some(req) = maybe_req { - req.callback - .send(Ok(buf.clone())) - .map_err(|_| vortex_err!("send failed"))?; + // Find any request callbacks and send the buffer + while let Some(req) = requests_iter.peek() { + // If the request is before the current segment, we should have already resolved it. + match req.id.cmp(&segment_id) { + Ordering::Less => { + // This should never happen, it means we missed a segment request. + vortex_panic!("Skipped segment request"); + } + Ordering::Equal => { + // Resolve the request + requests_iter + .next() + .vortex_expect("next request") + .resolve(Ok(buf.clone())); + } + Ordering::Greater => { + // No request for this segment, so we continue + break; + } + } } - let id = SegmentId::from(u32::try_from(i + start).vortex_expect("segment id")); - cache_futures.push(segment_cache.put(id, buf)); + cache_futures.push(segment_cache.put(segment_id, buf)); } // Populate the cache @@ -245,6 +251,11 @@ fn coalesce(requests: Vec) -> Vec { coalesced.as_mut_slice()[idx].requests.push(req); } + // Ensure we sort the requests by segment ID within the coalesced request. + for req in coalesced.iter_mut() { + req.requests.sort_unstable_by_key(|r| r.id); + } + coalesced } diff --git a/vortex-file/src/v2/open/mod.rs b/vortex-file/src/v2/open/mod.rs index 290208a19..bf8b19e6f 100644 --- a/vortex-file/src/v2/open/mod.rs +++ b/vortex-file/src/v2/open/mod.rs @@ -148,7 +148,7 @@ impl VortexOpenOptions { .into_driver(); // Compute the splits of the file. - let splits = self.split_by.splits(&file_layout.root_layout)?.into(); + let splits = self.split_by.splits(file_layout.root_layout())?.into(); // Finally, create the VortexFile. Ok(VortexFile { @@ -304,10 +304,7 @@ impl VortexOpenOptions { .ok_or_else(|| vortex_err!("FileLayout missing segments"))?; let segments = fb_segments.iter().map(Segment::try_from).try_collect()?; - Ok(FileLayout { - root_layout, - segments, - }) + Ok(FileLayout::new(root_layout, segments)) } /// Populate segments in the cache that were covered by the initial read. @@ -318,7 +315,7 @@ impl VortexOpenOptions { file_layout: &FileLayout, segments: &dyn SegmentCache, ) -> VortexResult<()> { - for (idx, segment) in file_layout.segments.iter().enumerate() { + for (idx, segment) in file_layout.segment_map().iter().enumerate() { if segment.offset < initial_offset { // Skip segments that aren't in the initial read. continue; diff --git a/vortex-file/src/v2/writer.rs b/vortex-file/src/v2/writer.rs index 8aec373d5..0092b5be8 100644 --- a/vortex-file/src/v2/writer.rs +++ b/vortex-file/src/v2/writer.rs @@ -67,13 +67,7 @@ impl VortexWriteOptions { // Write the DType + FileLayout segments let dtype_segment = self.write_flatbuffer(&mut write, stream.dtype()).await?; let file_layout_segment = self - .write_flatbuffer( - &mut write, - &FileLayout { - root_layout, - segments: segments.into(), - }, - ) + .write_flatbuffer(&mut write, &FileLayout::new(root_layout, segments.into())) .await?; // Assemble the postscript, and write it manually to avoid any framing. diff --git a/vortex-layout/src/layouts/flat/eval_expr.rs b/vortex-layout/src/layouts/flat/eval_expr.rs index 5b2132b37..dc583ccb1 100644 --- a/vortex-layout/src/layouts/flat/eval_expr.rs +++ b/vortex-layout/src/layouts/flat/eval_expr.rs @@ -1,7 +1,7 @@ use async_trait::async_trait; use flatbuffers::root; use futures::future::try_join_all; -use vortex_array::compute::filter; +use vortex_array::compute::{filter, slice}; use vortex_array::parts::ArrayParts; use vortex_array::ArrayData; use vortex_error::{vortex_err, VortexExpect, VortexResult}; @@ -47,11 +47,24 @@ impl ExprEvaluator for FlatReader { // Decode into an ArrayData. let array = array_parts.decode(self.ctx(), self.dtype().clone())?; + assert_eq!( + array.len() as u64, + self.row_count(), + "FlatLayout array length mismatch {} != {}", + array.len(), + self.row_count() + ); - // And finally apply the expression // TODO(ngates): what's the best order to apply the filter mask / expression? - let array = expr.evaluate(&array)?; - filter(&array, &row_mask.into_filter_mask()?) + + // Filter the array based on the row mask. + let begin = usize::try_from(row_mask.begin()) + .vortex_expect("RowMask begin must fit within FlatLayout size"); + let array = slice(array, begin, begin + row_mask.len())?; + let array = filter(&array, row_mask.filter_mask())?; + + // Then apply the expression + expr.evaluate(&array) } } @@ -65,7 +78,7 @@ mod test { use vortex_array::validity::Validity; use vortex_array::{ArrayDType, IntoArrayVariant, ToArrayData}; use vortex_buffer::buffer; - use vortex_expr::{gt, lit, Identity}; + use vortex_expr::{gt, ident, lit, Identity}; use vortex_scan::RowMask; use crate::layouts::flat::writer::FlatLayoutWriter; @@ -122,4 +135,26 @@ mod test { ); }) } + + #[test] + fn flat_unaligned_row_mask() { + block_on(async { + let mut segments = TestSegments::default(); + let array = PrimitiveArray::new(buffer![1, 2, 3, 4, 5], Validity::AllValid); + let layout = FlatLayoutWriter::new(array.dtype().clone()) + .push_one(&mut segments, array.to_array()) + .unwrap(); + + let result = layout + .reader(Arc::new(segments), Default::default()) + .unwrap() + .evaluate_expr(RowMask::new_valid_between(2, 4), ident()) + .await + .unwrap() + .into_primitive() + .unwrap(); + + assert_eq!(result.as_slice::(), &[3, 4],); + }) + } } diff --git a/vortex-layout/src/strategies/mod.rs b/vortex-layout/src/strategies/mod.rs index e120d143d..6264be74f 100644 --- a/vortex-layout/src/strategies/mod.rs +++ b/vortex-layout/src/strategies/mod.rs @@ -66,7 +66,7 @@ pub trait LayoutWriterExt: LayoutWriter { impl LayoutWriterExt for L {} /// A trait for creating new layout writers given a DType. -pub trait LayoutStrategy: Send { +pub trait LayoutStrategy: Send + Sync { fn new_writer(&self, dtype: &DType) -> VortexResult>; } diff --git a/vortex-scan/src/lib.rs b/vortex-scan/src/lib.rs index 2974fb8a9..9e369e8c6 100644 --- a/vortex-scan/src/lib.rs +++ b/vortex-scan/src/lib.rs @@ -78,7 +78,7 @@ impl Scan { Ok(RangeScan::new( self, row_mask.begin(), - row_mask.into_filter_mask()?, + row_mask.filter_mask().clone(), )) } } diff --git a/vortex-scan/src/row_mask.rs b/vortex-scan/src/row_mask.rs index 59deb8fb5..0673ae8d6 100644 --- a/vortex-scan/src/row_mask.rs +++ b/vortex-scan/src/row_mask.rs @@ -225,6 +225,11 @@ impl RowMask { self.mask.is_empty() } + /// Returns the [`FilterMask`] whose true values are relative to the range of this `RowMask`. + pub fn filter_mask(&self) -> &FilterMask { + &self.mask + } + /// Limit mask to `[begin..end)` range pub fn slice(&self, begin: u64, end: u64) -> VortexResult { let range_begin = max(self.begin, begin); @@ -304,12 +309,6 @@ impl RowMask { pub fn true_count(&self) -> usize { self.mask.true_count() } - - // Convert the [`RowMask`] into a [`FilterMask`]. - pub fn into_filter_mask(self) -> VortexResult { - let offset = self.begin; - Ok(self.shift(offset)?.mask) - } } #[cfg(test)]