diff --git a/vortex-array/src/array/varbin/mod.rs b/vortex-array/src/array/varbin/mod.rs index be490ccd7e..18fba05a8f 100644 --- a/vortex-array/src/array/varbin/mod.rs +++ b/vortex-array/src/array/varbin/mod.rs @@ -2,7 +2,7 @@ use std::fmt::{Debug, Display}; use num_traits::AsPrimitive; use serde::{Deserialize, Serialize}; -pub use stats::compute_stats; +pub use stats::compute_varbin_statistics; use vortex_buffer::Buffer; use vortex_dtype::{match_each_native_ptype, DType, NativePType, Nullability, PType}; use vortex_error::{ diff --git a/vortex-array/src/array/varbin/stats.rs b/vortex-array/src/array/varbin/stats.rs index 61c82974be..8608686419 100644 --- a/vortex-array/src/array/varbin/stats.rs +++ b/vortex-array/src/array/varbin/stats.rs @@ -1,129 +1,148 @@ use std::cmp::Ordering; +use itertools::{Itertools, MinMaxResult}; use vortex_buffer::Buffer; -use vortex_dtype::DType; -use vortex_error::VortexResult; +use vortex_error::{vortex_panic, VortexResult}; +use super::varbin_scalar; use crate::accessor::ArrayAccessor; -use crate::array::varbin::{varbin_scalar, VarBinArray}; +use crate::array::varbin::VarBinArray; use crate::array::VarBinEncoding; -use crate::nbytes::ArrayNBytes; +use crate::compute::unary::scalar_at; use crate::stats::{Stat, StatisticsVTable, StatsSet}; -use crate::{ArrayDType, ArrayLen}; +use crate::ArrayTrait; impl StatisticsVTable for VarBinEncoding { fn compute_statistics(&self, array: &VarBinArray, stat: Stat) -> VortexResult { - if stat == Stat::UncompressedSizeInBytes { - return Ok(StatsSet::of(stat, array.nbytes())); - } - - if array.is_empty() { - return Ok(StatsSet::default()); - } - array.with_iterator(|iter| compute_stats(iter, array.dtype())) + compute_varbin_statistics(array, stat) } } -pub fn compute_stats(iter: &mut dyn Iterator>, dtype: &DType) -> StatsSet { - let mut leading_nulls: usize = 0; - let mut first_value: Option<&[u8]> = None; - for v in &mut *iter { - if v.is_none() { - leading_nulls += 1; - } else { - first_value = v; - break; - } +pub fn compute_varbin_statistics>( + array: &T, + stat: Stat, +) -> VortexResult { + if stat == Stat::UncompressedSizeInBytes { + return Ok(StatsSet::of(stat, array.nbytes())); } - if let Some(first_non_null) = first_value { - let mut acc = VarBinAccumulator::new(first_non_null); - acc.n_nulls(leading_nulls); - iter.for_each(|n| acc.nullable_next(n)); - acc.finish(dtype) - } else { - StatsSet::nulls(leading_nulls, dtype) + if array.is_empty() + || stat == Stat::TrueCount + || stat == Stat::RunCount + || stat == Stat::BitWidthFreq + || stat == Stat::TrailingZeroFreq + { + return Ok(StatsSet::default()); } -} -pub struct VarBinAccumulator<'a> { - min: &'a [u8], - max: &'a [u8], - is_sorted: bool, - is_strict_sorted: bool, - last_value: &'a [u8], - null_count: usize, - runs: usize, - len: usize, -} + Ok(match stat { + Stat::NullCount => { + let null_count = array.logical_validity().null_count(array.len())?; + if null_count == array.len() { + return Ok(StatsSet::nulls(array.len(), array.dtype())); + } -impl<'a> VarBinAccumulator<'a> { - pub fn new(value: &'a [u8]) -> Self { - Self { - min: value, - max: value, - is_sorted: true, - is_strict_sorted: true, - last_value: value, - runs: 1, - null_count: 0, - len: 1, + let mut stats = StatsSet::of(Stat::NullCount, null_count); + if null_count > 0 { + // we know that there is at least one null, but not all nulls, so it's not constant + stats.set(Stat::IsConstant, false); + } + stats } - } - - pub fn nullable_next(&mut self, val: Option<&'a [u8]>) { - match val { - None => { - self.null_count += 1; - self.len += 1; + Stat::IsConstant => { + let is_constant = array.with_iterator(compute_is_constant)?; + if is_constant { + // we know that the array is not empty + StatsSet::constant(scalar_at(array, 0)?, array.len()) + } else { + StatsSet::of(Stat::IsConstant, is_constant) } - Some(v) => self.next(v), } - } + Stat::Min | Stat::Max => compute_min_max(array)?, + Stat::IsSorted => { + let is_sorted = array.with_iterator(|iter| iter.flatten().is_sorted())?; + let mut stats = StatsSet::of(Stat::IsSorted, is_sorted); + if !is_sorted { + stats.set(Stat::IsStrictSorted, false); + } + stats + } + Stat::IsStrictSorted => { + let is_strict_sorted = array.with_iterator(|iter| { + iter.flatten() + .is_sorted_by(|a, b| matches!(a.cmp(b), Ordering::Less)) + })?; + let mut stats = StatsSet::of(Stat::IsStrictSorted, is_strict_sorted); + if is_strict_sorted { + stats.set(Stat::IsSorted, true); + } + stats + } + Stat::UncompressedSizeInBytes + | Stat::TrueCount + | Stat::RunCount + | Stat::BitWidthFreq + | Stat::TrailingZeroFreq => { + vortex_panic!( + "Unreachable, stat {} should have already been handled", + stat + ) + } + }) +} - pub fn n_nulls(&mut self, null_count: usize) { - self.len += null_count; - self.null_count += null_count; +fn compute_is_constant(iter: &mut dyn Iterator>) -> bool { + let Some(first_value) = iter.next() else { + return true; // empty array is constant + }; + for v in iter { + if v != first_value { + return false; + } } + true +} - pub fn next(&mut self, val: &'a [u8]) { - self.len += 1; +fn compute_min_max>(array: &T) -> VortexResult { + let mut stats = StatsSet::default(); + if array.is_empty() { + return Ok(stats); + } - if val < self.min { - self.min.clone_from(&val); - } else if val > self.max { - self.max.clone_from(&val); + let minmax = array.with_iterator(|iter| match iter.flatten().minmax() { + MinMaxResult::NoElements => None, + MinMaxResult::OneElement(value) => { + let scalar = varbin_scalar(Buffer::from(value), array.dtype()); + Some((scalar.clone(), scalar)) } - - match val.cmp(self.last_value) { - Ordering::Less => { - self.is_sorted = false; - self.is_strict_sorted = false; - } - Ordering::Equal => { - self.is_strict_sorted = false; - return; - } - Ordering::Greater => {} + MinMaxResult::MinMax(min, max) => Some(( + varbin_scalar(Buffer::from(min), array.dtype()), + varbin_scalar(Buffer::from(max), array.dtype()), + )), + })?; + let Some((min, max)) = minmax else { + // we know that the array is not empty, so it must be all nulls + return Ok(StatsSet::nulls(array.len(), array.dtype())); + }; + + if min == max { + // get (don't compute) null count if `min == max` to determine if it's constant + if array + .statistics() + .get_as::(Stat::NullCount) + .map_or(false, |null_count| null_count == 0) + { + // if there are no nulls, then the array is constant + return Ok(StatsSet::constant(min, array.len())); } - self.last_value = val; - self.runs += 1; + } else { + stats.set(Stat::IsConstant, false); } - pub fn finish(&self, dtype: &DType) -> StatsSet { - let is_constant = - (self.min == self.max && self.null_count == 0) || self.null_count == self.len; - - StatsSet::from_iter([ - (Stat::Min, varbin_scalar(Buffer::from(self.min), dtype)), - (Stat::Max, varbin_scalar(Buffer::from(self.max), dtype)), - (Stat::RunCount, self.runs.into()), - (Stat::IsSorted, self.is_sorted.into()), - (Stat::IsStrictSorted, self.is_strict_sorted.into()), - (Stat::IsConstant, is_constant.into()), - (Stat::NullCount, self.null_count.into()), - ]) - } + stats.set(Stat::Min, min); + stats.set(Stat::Max, max); + + Ok(stats) } #[cfg(test)] @@ -154,7 +173,6 @@ mod test { arr.statistics().compute_max::().unwrap(), BufferString::from("hello world this is a long string".to_string()) ); - assert_eq!(arr.statistics().compute_run_count().unwrap(), 2); assert!(!arr.statistics().compute_is_constant().unwrap()); assert!(arr.statistics().compute_is_sorted().unwrap()); } @@ -170,7 +188,6 @@ mod test { arr.statistics().compute_max::().unwrap().deref(), "hello world this is a long string".as_bytes() ); - assert_eq!(arr.statistics().compute_run_count().unwrap(), 2); assert!(!arr.statistics().compute_is_constant().unwrap()); assert!(arr.statistics().compute_is_sorted().unwrap()); } diff --git a/vortex-array/src/array/varbinview/stats.rs b/vortex-array/src/array/varbinview/stats.rs index 50af697bab..ad66b3f709 100644 --- a/vortex-array/src/array/varbinview/stats.rs +++ b/vortex-array/src/array/varbinview/stats.rs @@ -1,23 +1,12 @@ use vortex_error::VortexResult; -use crate::accessor::ArrayAccessor; -use crate::array::varbin::compute_stats; +use crate::array::varbin::compute_varbin_statistics; use crate::array::varbinview::VarBinViewArray; use crate::array::VarBinViewEncoding; -use crate::nbytes::ArrayNBytes; use crate::stats::{Stat, StatisticsVTable, StatsSet}; -use crate::{ArrayDType, ArrayLen}; impl StatisticsVTable for VarBinViewEncoding { fn compute_statistics(&self, array: &VarBinViewArray, stat: Stat) -> VortexResult { - if stat == Stat::UncompressedSizeInBytes { - return Ok(StatsSet::of(stat, array.nbytes())); - } - - if array.is_empty() { - return Ok(StatsSet::default()); - } - - array.with_iterator(|iter| compute_stats(iter, array.dtype())) + compute_varbin_statistics(array, stat) } } diff --git a/vortex-array/src/stats/mod.rs b/vortex-array/src/stats/mod.rs index 8e25adf2ba..bf1d73314e 100644 --- a/vortex-array/src/stats/mod.rs +++ b/vortex-array/src/stats/mod.rs @@ -31,9 +31,9 @@ pub enum Stat { /// Whether all values are the same (nulls are not equal to other non-null values, /// so this is true iff all values are null or all values are the same non-null value) IsConstant, - /// Whether the array is sorted + /// Whether the non-null values in the array are sorted (i.e., we skip nulls) IsSorted, - /// Whether the array is strictly sorted (i.e., sorted with no duplicates) + /// Whether the non-null values in the array are strictly sorted (i.e., sorted with no duplicates) IsStrictSorted, /// The maximum value in the array (ignoring nulls, unless all values are null) Max, diff --git a/vortex-array/src/stats/statsset.rs b/vortex-array/src/stats/statsset.rs index 0f7faaab9f..df883bdc43 100644 --- a/vortex-array/src/stats/statsset.rs +++ b/vortex-array/src/stats/statsset.rs @@ -339,8 +339,7 @@ impl FromIterator<(Stat, Scalar)> for StatsSet { impl Extend<(Stat, Scalar)> for StatsSet { #[inline] fn extend>(&mut self, iter: T) { - let stats = iter.into_iter().collect_vec(); - stats.into_iter().for_each(|(stat, scalar)| { + iter.into_iter().for_each(|(stat, scalar)| { self.set(stat, scalar); }); } diff --git a/vortex-array/src/validity.rs b/vortex-array/src/validity.rs index 07ea97dfcc..271ea1fae5 100644 --- a/vortex-array/src/validity.rs +++ b/vortex-array/src/validity.rs @@ -473,6 +473,27 @@ impl LogicalValidity { Self::Array(a) => Validity::Array(a), } } + + pub fn null_count(&self, length: usize) -> VortexResult { + match self { + Self::AllValid(_) => Ok(0), + Self::AllInvalid(_) => Ok(length), + Self::Array(a) => { + let validity_len = a.len(); + if validity_len != length { + vortex_bail!( + "Validity array length {} doesn't match array length {}", + validity_len, + length + ) + } + let true_count = a.statistics().compute_true_count().ok_or_else(|| { + vortex_err!("Failed to compute true count from validity array") + })?; + Ok(length - true_count) + } + } + } } impl TryFrom for LogicalValidity { diff --git a/vortex-sampling-compressor/src/compressors/fsst.rs b/vortex-sampling-compressor/src/compressors/fsst.rs index 0e63bb244a..a9b711dffa 100644 --- a/vortex-sampling-compressor/src/compressors/fsst.rs +++ b/vortex-sampling-compressor/src/compressors/fsst.rs @@ -102,7 +102,7 @@ impl EncodingCompressor for FSSTCompressor { let codes = fsst_array.codes(); let compressed_codes = ctx - .named("fsst_codes") + .auxiliary("fsst_codes") .excluding(self) .including_only(&[ &VarBinCompressor,