Skip to content

Commit

Permalink
state as conjuncts
Browse files Browse the repository at this point in the history
  • Loading branch information
joseph-isaacs committed Jan 15, 2025
1 parent f998086 commit d6d7614
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 32 deletions.
2 changes: 1 addition & 1 deletion vortex-array/src/compute/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ impl FilterMask {

#[inline]
pub fn is_empty(&self) -> bool {
self.0.len == 0
self.0.true_count == 0
}

/// Get the true count of the mask.
Expand Down
5 changes: 0 additions & 5 deletions vortex-scan/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,6 @@ impl Scanner {
})
}

/// Returns the filter expression, if any.
pub fn filter(&self) -> &[ExprRef] {
self.filter.as_ref()
}

/// Returns the projection expression.
pub fn projection(&self) -> &ExprRef {
&self.projection
Expand Down
56 changes: 30 additions & 26 deletions vortex-scan/src/range_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,11 @@ pub struct RangeScanner {
row_range: Range<u64>,
mask: FilterMask,
state: State,
remaining_filter_conjuncts: Vec<ExprRef>,
}

enum State {
// First we run the filter expression over the full row range.
FilterEval((FilterMask, ExprRef)),
FilterEval((FilterMask, ExprRef, Vec<ExprRef>)),
// Then we project the selected rows.
Project((FilterMask, ExprRef)),
// And then we're done.
Expand Down Expand Up @@ -59,28 +58,27 @@ pub enum NextOp {
// `evaluate(row_mask, expr)` API.
impl RangeScanner {
pub(crate) fn new(scan: Arc<Scanner>, row_offset: u64, mask: FilterMask) -> Self {
let first_filter = scan.filter.first().cloned();
let remaining_filter_conjuncts = scan.filter().iter().skip(1).cloned().collect();
let state = first_filter
.map(|filter| {
// If we have a filter expression, then for now we evaluate it against all rows
// of the range.
// TODO(ngates): we should decide based on mask.true_count() whether to include the
// current mask or not. But note that the resulting expression would need to be
// aligned and intersected with the given mask.
State::FilterEval((FilterMask::new_true(mask.len()), filter))
})
.unwrap_or_else(|| {
// If there is no filter expression, then we immediately perform a mask + project.
State::Project((mask.clone(), scan.projection().clone()))
});
let state = if let Some(conjunct) = scan.filter.first() {
// If we have a filter expression, then for now we evaluate it against all rows
// of the range.
// TODO(ngates): we should decide based on mask.true_count() whether to include the
// current mask or not. But note that the resulting expression would need to be
// aligned and intersected with the given mask.
State::FilterEval((
FilterMask::new_true(mask.len()),
conjunct.clone(),
scan.filter.iter().skip(1).cloned().collect(),
))
} else {
// If there is no filter expression, then we immediately perform a mask + project.
State::Project((mask.clone(), scan.projection().clone()))
};

Self {
scan,
row_range: row_offset..row_offset + mask.len() as u64,
mask,
state,
remaining_filter_conjuncts,
}
}

Expand All @@ -91,8 +89,8 @@ impl RangeScanner {
// forces the eager evaluation of the iterators.
fn next(&self) -> NextOp {
match &self.state {
State::FilterEval((mask, expr)) => {
NextOp::Eval((self.row_range.clone(), mask.clone(), expr.clone()))
State::FilterEval((mask, conjunct, _)) => {
NextOp::Eval((self.row_range.clone(), mask.clone(), conjunct.clone()))
}
State::Project((mask, expr)) => {
NextOp::Eval((self.row_range.clone(), mask.clone(), expr.clone()))
Expand All @@ -104,7 +102,7 @@ impl RangeScanner {
/// Post the result of the last expression evaluation back to the range scan.
fn post(&mut self, result: ArrayData) -> VortexResult<()> {
match &self.state {
State::FilterEval(_) => {
State::FilterEval((_, _, rest)) => {
// Intersect the result of the filter expression with our initial row mask.
let mask = FilterMask::from_buffer(result.into_bool()?.boolean_buffer());
let mask = self.mask.bitand(&mask);
Expand All @@ -113,15 +111,21 @@ impl RangeScanner {
// If the mask is empty, then we're done.
self.state =
State::Ready(Canonical::empty(self.scan.result_dtype())?.into_array());
} else if self.remaining_filter_conjuncts.is_empty() {
self.state = State::Project((mask, self.scan.projection().clone()))
} else {
} else if let Some(conj) = rest.first() {
self.mask = mask;
let mask = if self.mask.selectivity() < 0.4 {
self.mask.clone()
} else {
FilterMask::new_true(self.mask.len())
};
// If there are many conjuncts apply each one in turn, over the whole array and stop if any make the mask empty.
self.state = State::FilterEval((
FilterMask::new_true(self.mask.len()),
self.remaining_filter_conjuncts.remove(0),
mask,
conj.clone(),
rest.iter().skip(1).cloned().collect(),
))
} else {
self.state = State::Project((mask, self.scan.projection().clone()))
}
}
State::Project(_) => {
Expand Down

0 comments on commit d6d7614

Please sign in to comment.