Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix clickbench #1956

Merged
merged 7 commits into from
Jan 15, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 6 additions & 8 deletions bench-vortex/src/clickbench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,7 @@ use datafusion::datasource::listing::{
ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl,
};
use datafusion::prelude::{ParquetReadOptions, SessionContext};
use futures::executor::block_on;
use itertools::Itertools;
use rayon::prelude::*;
use futures::{stream, StreamExt, TryStreamExt};
use tokio::fs::{create_dir_all, OpenOptions};
use vortex::aliases::hash_map::HashMap;
use vortex::array::{ChunkedArray, StructArray};
Expand Down Expand Up @@ -152,9 +150,7 @@ pub async fn register_vortex_files(
let vortex_dir = input_path.join("vortex");
create_dir_all(&vortex_dir).await?;

(0..100)
.collect_vec()
.par_iter()
stream::iter(0..100)
.map(|idx| {
let parquet_file_path = input_path
.join("parquet")
Expand All @@ -163,7 +159,7 @@ pub async fn register_vortex_files(
let session = session.clone();
let schema = schema.clone();

block_on(async move {
tokio::spawn(async move {
let output_path = output_path.clone();
idempotent_async(&output_path, move |vtx_file| async move {
eprintln!("Processing file {idx}");
Expand Down Expand Up @@ -234,7 +230,9 @@ pub async fn register_vortex_files(
.expect("Failed to write Vortex file")
})
})
.collect::<Vec<_>>();
.buffered(16)
.try_collect::<Vec<_>>()
.await?;

let format = Arc::new(VortexFormat::new(CTX.clone()));
let table_path = vortex_dir
Expand Down
2 changes: 1 addition & 1 deletion vortex-file/src/v2/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ impl<I: IoDriver> VortexFile<I> {
// Create a single LayoutReader that is reused for the entire scan.
let reader: Arc<dyn LayoutReader> = self
.file_layout
.root_layout
.root_layout()
.reader(segment_channel.reader(), self.ctx.clone())?;

// Now we give one end of the channel to the layout reader...
Expand Down
32 changes: 30 additions & 2 deletions vortex-file/src/v2/footer/file_layout.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::sync::Arc;

use itertools::Itertools;
use vortex_dtype::DType;
use vortex_flatbuffers::{footer2 as fb, FlatBufferRoot, WriteFlatBuffer};
use vortex_layout::LayoutData;
Expand All @@ -9,11 +10,38 @@ use crate::v2::footer::segment::Segment;
/// Captures the layout information of a Vortex file.
#[derive(Debug, Clone)]
pub struct FileLayout {
pub(crate) root_layout: LayoutData,
pub(crate) segments: Arc<[Segment]>,
root_layout: LayoutData,
segments: Arc<[Segment]>,
}

impl FileLayout {
/// Create a new `FileLayout` from the root layout and segments.
///
/// ## Panics
///
/// Panics if the segments are not ordered by byte offset.
pub fn new(root_layout: LayoutData, segments: Arc<[Segment]>) -> Self {
// Note this assertion is `<=` since we allow zero-length segments
assert!(segments
.iter()
.tuple_windows()
.all(|(a, b)| a.offset <= b.offset));
Self {
root_layout,
segments,
}
}

/// Returns the root [`LayoutData`] of the file.
pub fn root_layout(&self) -> &LayoutData {
&self.root_layout
}

/// Returns the segment map of the file.
pub fn segment_map(&self) -> &Arc<[Segment]> {
&self.segments
}

/// Returns the [`DType`] of the file.
pub fn dtype(&self) -> &DType {
self.root_layout.dtype()
Expand Down
2 changes: 1 addition & 1 deletion vortex-file/src/v2/footer/segment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use vortex_flatbuffers::footer2 as fb;

/// The location of a segment within a Vortex file.
#[derive(Clone, Debug)]
pub(crate) struct Segment {
pub struct Segment {
pub(crate) offset: u64,
pub(crate) length: u32,
pub(crate) alignment: Alignment,
Expand Down
61 changes: 36 additions & 25 deletions vortex-file/src/v2/io/file.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use std::cmp::Ordering;
use std::future::Future;
use std::iter;
use std::ops::Range;
use std::sync::Arc;

Expand All @@ -8,7 +8,7 @@ use futures::Stream;
use futures_util::future::try_join_all;
use futures_util::{stream, StreamExt};
use vortex_buffer::{ByteBuffer, ByteBufferMut};
use vortex_error::{vortex_err, VortexExpect, VortexResult};
use vortex_error::{vortex_err, vortex_panic, VortexExpect, VortexResult};
use vortex_io::VortexReadAt;
use vortex_layout::segments::SegmentId;

Expand Down Expand Up @@ -64,7 +64,7 @@ impl FileSegmentRequest {
struct CoalescedSegmentRequest {
/// The range of the file to read.
pub(crate) byte_range: Range<u64>,
/// The original segment requests.
/// The original segment requests, ordered by segment ID.
pub(crate) requests: Vec<FileSegmentRequest>,
}

Expand All @@ -74,7 +74,7 @@ impl<R: VortexReadAt> IoDriver for FileIoDriver<R> {
stream: impl Stream<Item = SegmentRequest> + 'static,
) -> impl Stream<Item = VortexResult<()>> + 'static {
// We map the segment requests to their respective locations within the file.
let segment_map = self.file_layout.segments.clone();
let segment_map = self.file_layout.segment_map().clone();
let stream = stream.filter_map(move |request| {
let segment_map = segment_map.clone();
async move {
Expand Down Expand Up @@ -142,7 +142,7 @@ impl<R: VortexReadAt> IoDriver for FileIoDriver<R> {

// Submit the coalesced requests to the I/O.
let read = self.read.clone();
let segment_map = self.file_layout.segments.clone();
let segment_map = self.file_layout.segment_map().clone();
let segment_cache = self.segment_cache.clone();
let stream = stream.map(move |request| {
let read = read.clone();
Expand Down Expand Up @@ -187,34 +187,40 @@ async fn evaluate<R: VortexReadAt>(
let start = segment_map.partition_point(|s| s.offset < request.byte_range.start);
let end = segment_map.partition_point(|s| s.offset < request.byte_range.end);

let mut requests = iter::repeat_with(|| None)
.take(end - start)
.collect::<Vec<_>>();
for req in request.requests {
let id = *req.id as usize;
requests[id - start] = Some(req);
}
// Note that we may have multiple requests for the same segment.
let mut requests_iter = request.requests.into_iter().peekable();

let mut cache_futures = Vec::with_capacity(requests.len());
for (i, (segment, maybe_req)) in segment_map[start..end]
.iter()
.zip(requests.into_iter())
.enumerate()
{
let mut cache_futures = Vec::with_capacity(end - start);
for (i, segment) in segment_map[start..end].iter().enumerate() {
let segment_id = SegmentId::from(u32::try_from(i + start).vortex_expect("segment id"));
let offset = usize::try_from(segment.offset - request.byte_range.start)?;
let buf = buffer
.slice(offset..offset + segment.length as usize)
.aligned(segment.alignment);

// Send the callback
if let Some(req) = maybe_req {
req.callback
.send(Ok(buf.clone()))
.map_err(|_| vortex_err!("send failed"))?;
// Find any request callbacks and send the buffer
while let Some(req) = requests_iter.peek() {
// If the request is before the current segment, we should have already resolved it.
match req.id.cmp(&segment_id) {
Ordering::Less => {
// This should never happen, it means we missed a segment request.
vortex_panic!("Skipped segment request");
}
Ordering::Equal => {
// Resolve the request
requests_iter
.next()
.vortex_expect("next request")
.resolve(Ok(buf.clone()));
}
Ordering::Greater => {
// No request for this segment, so we continue
break;
}
}
}

let id = SegmentId::from(u32::try_from(i + start).vortex_expect("segment id"));
cache_futures.push(segment_cache.put(id, buf));
cache_futures.push(segment_cache.put(segment_id, buf));
}

// Populate the cache
Expand Down Expand Up @@ -245,6 +251,11 @@ fn coalesce(requests: Vec<FileSegmentRequest>) -> Vec<CoalescedSegmentRequest> {
coalesced.as_mut_slice()[idx].requests.push(req);
}

// Ensure we sort the requests by segment ID within the coalesced request.
for req in coalesced.iter_mut() {
req.requests.sort_unstable_by_key(|r| r.id);
}

coalesced
}

Expand Down
9 changes: 3 additions & 6 deletions vortex-file/src/v2/open/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ impl VortexOpenOptions {
.into_driver();

// Compute the splits of the file.
let splits = self.split_by.splits(&file_layout.root_layout)?.into();
let splits = self.split_by.splits(file_layout.root_layout())?.into();

// Finally, create the VortexFile.
Ok(VortexFile {
Expand Down Expand Up @@ -304,10 +304,7 @@ impl VortexOpenOptions {
.ok_or_else(|| vortex_err!("FileLayout missing segments"))?;
let segments = fb_segments.iter().map(Segment::try_from).try_collect()?;

Ok(FileLayout {
root_layout,
segments,
})
Ok(FileLayout::new(root_layout, segments))
}

/// Populate segments in the cache that were covered by the initial read.
Expand All @@ -318,7 +315,7 @@ impl VortexOpenOptions {
file_layout: &FileLayout,
segments: &dyn SegmentCache,
) -> VortexResult<()> {
for (idx, segment) in file_layout.segments.iter().enumerate() {
for (idx, segment) in file_layout.segment_map().iter().enumerate() {
if segment.offset < initial_offset {
// Skip segments that aren't in the initial read.
continue;
Expand Down
8 changes: 1 addition & 7 deletions vortex-file/src/v2/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,7 @@ impl VortexWriteOptions {
// Write the DType + FileLayout segments
let dtype_segment = self.write_flatbuffer(&mut write, stream.dtype()).await?;
let file_layout_segment = self
.write_flatbuffer(
&mut write,
&FileLayout {
root_layout,
segments: segments.into(),
},
)
.write_flatbuffer(&mut write, &FileLayout::new(root_layout, segments.into()))
.await?;

// Assemble the postscript, and write it manually to avoid any framing.
Expand Down
45 changes: 40 additions & 5 deletions vortex-layout/src/layouts/flat/eval_expr.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use async_trait::async_trait;
use flatbuffers::root;
use futures::future::try_join_all;
use vortex_array::compute::filter;
use vortex_array::compute::{filter, slice};
use vortex_array::parts::ArrayParts;
use vortex_array::ArrayData;
use vortex_error::{vortex_err, VortexExpect, VortexResult};
Expand Down Expand Up @@ -47,11 +47,24 @@ impl ExprEvaluator for FlatReader {

// Decode into an ArrayData.
let array = array_parts.decode(self.ctx(), self.dtype().clone())?;
assert_eq!(
array.len() as u64,
self.row_count(),
"FlatLayout array length mismatch {} != {}",
array.len(),
self.row_count()
);

// And finally apply the expression
// TODO(ngates): what's the best order to apply the filter mask / expression?
let array = expr.evaluate(&array)?;
filter(&array, &row_mask.into_filter_mask()?)

// Filter the array based on the row mask.
let begin = usize::try_from(row_mask.begin())
.vortex_expect("RowMask begin must fit within FlatLayout size");
let array = slice(array, begin, begin + row_mask.len())?;
let array = filter(&array, row_mask.filter_mask())?;

// Then apply the expression
expr.evaluate(&array)
}
}

Expand All @@ -65,7 +78,7 @@ mod test {
use vortex_array::validity::Validity;
use vortex_array::{ArrayDType, IntoArrayVariant, ToArrayData};
use vortex_buffer::buffer;
use vortex_expr::{gt, lit, Identity};
use vortex_expr::{gt, ident, lit, Identity};
use vortex_scan::RowMask;

use crate::layouts::flat::writer::FlatLayoutWriter;
Expand Down Expand Up @@ -122,4 +135,26 @@ mod test {
);
})
}

#[test]
fn flat_unaligned_row_mask() {
block_on(async {
let mut segments = TestSegments::default();
let array = PrimitiveArray::new(buffer![1, 2, 3, 4, 5], Validity::AllValid);
let layout = FlatLayoutWriter::new(array.dtype().clone())
.push_one(&mut segments, array.to_array())
.unwrap();

let result = layout
.reader(Arc::new(segments), Default::default())
.unwrap()
.evaluate_expr(RowMask::new_valid_between(2, 4), ident())
.await
.unwrap()
.into_primitive()
.unwrap();

assert_eq!(result.as_slice::<i32>(), &[3, 4],);
})
}
}
2 changes: 1 addition & 1 deletion vortex-layout/src/strategies/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ pub trait LayoutWriterExt: LayoutWriter {
impl<L: LayoutWriter> LayoutWriterExt for L {}

/// A trait for creating new layout writers given a DType.
pub trait LayoutStrategy: Send {
pub trait LayoutStrategy: Send + Sync {
fn new_writer(&self, dtype: &DType) -> VortexResult<Box<dyn LayoutWriter>>;
}

Expand Down
2 changes: 1 addition & 1 deletion vortex-scan/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ impl Scan {
Ok(RangeScan::new(
self,
row_mask.begin(),
row_mask.into_filter_mask()?,
row_mask.filter_mask().clone(),
))
}
}
11 changes: 5 additions & 6 deletions vortex-scan/src/row_mask.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,11 @@ impl RowMask {
self.mask.is_empty()
}

/// Returns the [`FilterMask`] whose true values are relative to the range of this `RowMask`.
pub fn filter_mask(&self) -> &FilterMask {
&self.mask
}

/// Limit mask to `[begin..end)` range
pub fn slice(&self, begin: u64, end: u64) -> VortexResult<Self> {
let range_begin = max(self.begin, begin);
Expand Down Expand Up @@ -304,12 +309,6 @@ impl RowMask {
pub fn true_count(&self) -> usize {
self.mask.true_count()
}

// Convert the [`RowMask`] into a [`FilterMask`].
pub fn into_filter_mask(self) -> VortexResult<FilterMask> {
let offset = self.begin;
Ok(self.shift(offset)?.mask)
}
}

#[cfg(test)]
Expand Down
Loading