Skip to content

Commit

Permalink
Added a ScanBuilder & dont evaluate a project with an empty `row_mask…
Browse files Browse the repository at this point in the history
…` in scan (#1960)
  • Loading branch information
joseph-isaacs authored Jan 15, 2025
1 parent d3bb8f6 commit b20b2a7
Show file tree
Hide file tree
Showing 6 changed files with 86 additions and 61 deletions.
3 changes: 1 addition & 2 deletions bench-vortex/src/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,9 @@ use vortex::buffer::Buffer;
use vortex::compress::CompressionStrategy;
use vortex::dtype::DType;
use vortex::error::VortexResult;
use vortex::file::v2::{VortexOpenOptions, VortexWriteOptions};
use vortex::file::v2::{Scan, VortexOpenOptions, VortexWriteOptions};
use vortex::io::{ObjectStoreReadAt, TokioFile, VortexReadAt, VortexWrite};
use vortex::sampling_compressor::{SamplingCompressor, ALL_ENCODINGS_CONTEXT};
use vortex::scan::Scan;
use vortex::stream::ArrayStreamExt;
use vortex::{ArrayData, IntoArrayData, IntoCanonical};

Expand Down
7 changes: 2 additions & 5 deletions vortex-datafusion/src/persistent/opener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,8 @@ use vortex_error::VortexResult;
use vortex_expr::datafusion::convert_expr_to_vortex;
use vortex_expr::transform::simplify_typed::simplify_typed;
use vortex_expr::{and, get_item, ident, lit, pack, ExprRef, Identity};
use vortex_file::v2::{ExecutionMode, VortexOpenOptions};
use vortex_file::v2::{ExecutionMode, Scan, VortexOpenOptions};
use vortex_io::ObjectStoreReadAt;
use vortex_scan::Scan;

use super::cache::FileLayoutCache;

Expand Down Expand Up @@ -87,8 +86,6 @@ impl FileOpener for VortexFileOpener {
// Construct the projection expression based on the DataFusion projection mask.
// Each index in the mask corresponds to the field position of the root DType.

let scan = Scan::new(self.projection.clone(), self.filter.clone()).into_arc();

let read_at =
ObjectStoreReadAt::new(this.object_store.clone(), file_meta.location().clone());

Expand All @@ -104,7 +101,7 @@ impl FileOpener for VortexFileOpener {
.await?;

Ok(vxf
.scan(scan)?
.scan(Scan::new(this.projection.clone(), this.filter.clone()))?
.map_ok(RecordBatch::try_from)
.map(|r| r.and_then(|inner| inner))
.map_err(|e| e.into())
Expand Down
61 changes: 45 additions & 16 deletions vortex-file/src/v2/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,9 @@ use vortex_array::{ArrayData, ContextRef};
use vortex_buffer::Buffer;
use vortex_dtype::DType;
use vortex_error::{vortex_bail, vortex_err, VortexExpect, VortexResult};
use vortex_expr::{ExprRef, Identity};
use vortex_layout::{ExprEvaluator, LayoutReader};
use vortex_scan::{RowMask, Scan};
use vortex_scan::{RowMask, Scanner};

use crate::v2::exec::ExecDriver;
use crate::v2::io::IoDriver;
Expand All @@ -33,6 +34,28 @@ pub struct VortexFile<I> {
pub(crate) splits: Arc<[Range<u64>]>,
}

pub struct Scan {
projection: ExprRef,
filter: Option<ExprRef>,
}

impl Scan {
pub fn all() -> Self {
Self {
projection: Identity::new_expr(),
filter: None,
}
}

pub fn new(projection: ExprRef, filter: Option<ExprRef>) -> Self {
Self { projection, filter }
}

pub fn build(self, dtype: DType) -> VortexResult<Arc<Scanner>> {
Ok(Arc::new(Scanner::new(dtype, self.projection, self.filter)?))
}
}

/// Async implementation of Vortex File.
impl<I: IoDriver> VortexFile<I> {
/// Returns the number of rows in the file.
Expand All @@ -51,7 +74,7 @@ impl<I: IoDriver> VortexFile<I> {
}

/// Performs a scan operation over the file.
pub fn scan(&self, scan: Arc<Scan>) -> VortexResult<impl ArrayStream + 'static + use<'_, I>> {
pub fn scan(&self, scan: Scan) -> VortexResult<impl ArrayStream + 'static + use<'_, I>> {
self.scan_with_masks(
ArcIter::new(self.splits.clone())
.map(|row_range| RowMask::new_valid_between(row_range.start, row_range.end)),
Expand All @@ -64,7 +87,7 @@ impl<I: IoDriver> VortexFile<I> {
pub fn take(
&self,
row_indices: Buffer<u64>,
scan: Arc<Scan>,
scan: Scan,
) -> VortexResult<impl ArrayStream + 'static + use<'_, I>> {
if !row_indices.windows(2).all(|w| w[0] <= w[1]) {
vortex_bail!("row indices must be sorted")
Expand Down Expand Up @@ -113,12 +136,14 @@ impl<I: IoDriver> VortexFile<I> {
fn scan_with_masks<R>(
&self,
row_masks: R,
scan: Arc<Scan>,
scan: Scan,
) -> VortexResult<impl ArrayStream + 'static + use<'_, I, R>>
where
R: Iterator<Item = RowMask> + Send + 'static,
{
let result_dtype = scan.result_dtype(self.dtype())?;
let scanner = scan.build(self.dtype().clone())?;

let result_dtype = scanner.result_dtype().clone();

// Set up a segment channel to collect segment requests from the execution stream.
let segment_channel = SegmentChannel::new();
Expand All @@ -131,18 +156,22 @@ impl<I: IoDriver> VortexFile<I> {

// Now we give one end of the channel to the layout reader...
let exec_stream = stream::iter(row_masks)
.map(move |row_mask| match scan.clone().range_scan(row_mask) {
Ok(range_scan) => {
let reader = reader.clone();
async move {
range_scan
.evaluate_async(|row_mask, expr| reader.evaluate_expr(row_mask, expr))
.await
.map(
move |row_mask| match scanner.clone().range_scanner(row_mask) {
Ok(range_scan) => {
let reader = reader.clone();
async move {
range_scan
.evaluate_async(|row_mask, expr| {
reader.evaluate_expr(row_mask, expr)
})
.await
}
.boxed()
}
.boxed()
}
Err(e) => futures::future::ready(Err(e)).boxed(),
})
Err(e) => futures::future::ready(Err(e)).boxed(),
},
)
.boxed();
let exec_stream = self.exec_driver.drive(exec_stream);

Expand Down
1 change: 0 additions & 1 deletion vortex-file/src/v2/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use vortex_array::stream::ArrayStreamExt;
use vortex_array::{ContextRef, IntoArrayData, IntoArrayVariant};
use vortex_buffer::buffer;
use vortex_error::{VortexExpect, VortexResult};
use vortex_scan::Scan;

use crate::v2::io::IoDriver;
use crate::v2::*;
Expand Down
51 changes: 23 additions & 28 deletions vortex-scan/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,49 +8,48 @@ pub use row_mask::*;
use vortex_array::{ArrayDType, Canonical, IntoArrayData};
use vortex_dtype::DType;
use vortex_error::VortexResult;
use vortex_expr::{ExprRef, Identity};
use vortex_expr::ExprRef;

/// Represents a scan operation to read data from a set of rows, with an optional filter expression,
/// and a projection expression.
///
/// A scan operation can be broken into many [`RangeScan`] operations, each of which leverages
/// shared statistics from the parent [`Scan`] to optimize the order in which filter and projection
/// A scan operation can be broken into many [`RangeScanner`] operations, each of which leverages
/// shared statistics from the parent [`Scanner`] to optimize the order in which filter and projection
/// operations are applied.
///
/// For example, if a filter expression has a top-level `AND` clause, it may be the case that one
/// clause is significantly more selective than the other. In this case, we may want to compute the
/// most selective filter first, then prune rows using result of the filter, before evaluating
/// the second filter over the reduced set of rows.
#[derive(Debug, Clone)]
pub struct Scan {
pub struct Scanner {
#[allow(dead_code)]
dtype: DType,
projection: ExprRef,
filter: Option<ExprRef>,
projection_dtype: DType,
// A sorted list of row indices to include in the scan. We store row indices since they may
// produce a very sparse RowMask.
// take_indices: Vec<u64>,
// statistics: RwLock<Statistics>
}

impl Scan {
impl Scanner {
/// Create a new scan with the given projection and optional filter.
pub fn new(projection: ExprRef, filter: Option<ExprRef>) -> Self {
pub fn new(dtype: DType, projection: ExprRef, filter: Option<ExprRef>) -> VortexResult<Self> {
// TODO(ngates): compute and cache a FieldMask based on the referenced fields.
// Where FieldMask ~= Vec<FieldPath>
Self { projection, filter }
}

/// Convert this scan into an Arc.
pub fn into_arc(self) -> Arc<Self> {
Arc::new(self)
}
let result_dtype = projection
.evaluate(&Canonical::empty(&dtype)?.into_array())?
.dtype()
.clone();

/// Scan all rows with the identity projection.
pub fn all() -> Arc<Self> {
Self {
projection: Identity::new_expr(),
filter: None,
}
.into_arc()
Ok(Self {
dtype,
projection,
filter,
projection_dtype: result_dtype,
})
}

/// Returns the filter expression, if any.
Expand All @@ -64,18 +63,14 @@ impl Scan {
}

/// Compute the result dtype of the scan given the input dtype.
pub fn result_dtype(&self, dtype: &DType) -> VortexResult<DType> {
Ok(self
.projection
.evaluate(&Canonical::empty(dtype)?.into_array())?
.dtype()
.clone())
pub fn result_dtype(&self) -> &DType {
&self.projection_dtype
}

/// Instantiate a new scan for a specific range. The range scan will share statistics with this
/// parent scan in order to optimize future range scans.
pub fn range_scan(self: Arc<Self>, row_mask: RowMask) -> VortexResult<RangeScan> {
Ok(RangeScan::new(
pub fn range_scanner(self: Arc<Self>, row_mask: RowMask) -> VortexResult<RangeScanner> {
Ok(RangeScanner::new(
self,
row_mask.begin(),
row_mask.filter_mask().clone(),
Expand Down
24 changes: 15 additions & 9 deletions vortex-scan/src/range_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@ use std::ops::{BitAnd, Range};
use std::sync::Arc;

use vortex_array::compute::FilterMask;
use vortex_array::{ArrayData, IntoArrayVariant};
use vortex_array::{ArrayData, Canonical, IntoArrayData, IntoArrayVariant};
use vortex_error::VortexResult;
use vortex_expr::ExprRef;

use crate::{RowMask, Scan};
use crate::{RowMask, Scanner};

pub struct RangeScan {
scan: Arc<Scan>,
pub struct RangeScanner {
scan: Arc<Scanner>,
row_range: Range<u64>,
mask: FilterMask,
state: State,
Expand Down Expand Up @@ -56,8 +56,8 @@ pub enum NextOp {
// If instead we make the projection API `project(row_mask, expr)`, then the project API is
// identical to the filter API and there's no point having both. Hence, a single
// `evaluate(row_mask, expr)` API.
impl RangeScan {
pub(crate) fn new(scan: Arc<Scan>, row_offset: u64, mask: FilterMask) -> Self {
impl RangeScanner {
pub(crate) fn new(scan: Arc<Scanner>, row_offset: u64, mask: FilterMask) -> Self {
let state = scan
.filter()
.map(|filter| {
Expand Down Expand Up @@ -106,7 +106,13 @@ impl RangeScan {
let mask = FilterMask::from_buffer(result.into_bool()?.boolean_buffer());
let mask = self.mask.bitand(&mask);
// Then move onto the projection
self.state = State::Project((mask, self.scan.projection().clone()))
if mask.is_empty() {
// If the mask is empty, then we're done.
self.state =
State::Ready(Canonical::empty(self.scan.result_dtype())?.into_array());
} else {
self.state = State::Project((mask, self.scan.projection().clone()))
}
}
State::Project(_) => {
// We're done.
Expand All @@ -117,7 +123,7 @@ impl RangeScan {
Ok(())
}

/// Evaluate the [`RangeScan`] operation using a synchronous expression evaluator.
/// Evaluate the [`RangeScanner`] operation using a synchronous expression evaluator.
pub fn evaluate<E>(mut self, evaluator: E) -> VortexResult<ArrayData>
where
E: Fn(RowMask, ExprRef) -> VortexResult<ArrayData>,
Expand All @@ -132,7 +138,7 @@ impl RangeScan {
}
}

/// Evaluate the [`RangeScan`] operation using an async expression evaluator.
/// Evaluate the [`RangeScanner`] operation using an async expression evaluator.
pub async fn evaluate_async<E, F>(mut self, evaluator: E) -> VortexResult<ArrayData>
where
E: Fn(RowMask, ExprRef) -> F,
Expand Down

0 comments on commit b20b2a7

Please sign in to comment.