Skip to content

Commit

Permalink
non-send owned
Browse files Browse the repository at this point in the history
  • Loading branch information
gatesn committed Jan 15, 2025
1 parent 253ccf6 commit 2809907
Show file tree
Hide file tree
Showing 4 changed files with 149 additions and 88 deletions.
148 changes: 70 additions & 78 deletions vortex-datafusion/src/persistent/format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@ use datafusion_expr::Expr;
use datafusion_physical_expr::{LexRequirement, PhysicalExpr};
use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
use datafusion_physical_plan::ExecutionPlan;
use futures::{stream, FutureExt, StreamExt as _, TryStreamExt as _};
use futures::{stream, StreamExt as _, TryStreamExt as _};
use object_store::{ObjectMeta, ObjectStore};
use vortex_array::arrow::infer_schema;
use vortex_array::stats::Stat;
use vortex_array::ContextRef;
use vortex_dtype::FieldPath;
use vortex_error::VortexResult;
use vortex_error::{vortex_err, VortexExpect, VortexResult};
use vortex_file::v2::VortexOpenOptions;
use vortex_file::VORTEX_FILE_EXTENSION;
use vortex_io::ObjectStoreReadAt;
Expand Down Expand Up @@ -130,88 +130,80 @@ impl FileFormat for VortexFormat {
table_schema: SchemaRef,
object: &ObjectMeta,
) -> DFResult<Statistics> {
let store = store.clone();
let read_at = ObjectStoreReadAt::new(store.clone(), object.location.clone());
let vxf = VortexOpenOptions::new(self.context.clone())
.with_file_layout(
self.file_layout_cache
.try_get(object, store.clone())
.await?,
)
.open(read_at)
.await?;

async move {
let field_paths = table_schema
.fields()
.iter()
.map(|f| FieldPath::from_name(f.name().to_owned()))
.collect();

let file_layout = self
.file_layout_cache
.try_get(object, store.clone())
.await?;

let stats = {
let vxf = VortexOpenOptions::new(self.context.clone())
.with_file_layout(file_layout)
.open(read_at)
.await?;

vxf.statistics(
field_paths,
[
Stat::Min,
Stat::Max,
Stat::NullCount,
Stat::UncompressedSizeInBytes,
]
.into(),
)?
.await?
};

let total_byte_size = Precision::Inexact(
stats
.iter()
.map(|s| {
s.get_as::<usize>(Stat::UncompressedSizeInBytes)
.unwrap_or_default()
})
.sum(),
);
// Evaluate the statistics for each column that we are able to return to DataFusion.
let field_paths = table_schema
.fields()
.iter()
.map(|f| FieldPath::from_name(f.name().to_owned()))
.collect();
let stats = vxf
.statistics(
field_paths,
[
Stat::Min,
Stat::Max,
Stat::NullCount,
Stat::UncompressedSizeInBytes,
]
.into(),
)?
.await?;

let column_statistics = stats
.into_iter()
// Sum up the total byte size across all the columns.
let total_byte_size = Precision::Inexact(
stats
.iter()
.map(|s| {
let null_count = s.get_as::<usize>(Stat::NullCount);
let min = s
.get(Stat::Min)
.cloned()
.and_then(|s| ScalarValue::try_from(s).ok());
let max = s
.get(Stat::Max)
.cloned()
.and_then(|s| ScalarValue::try_from(s).ok());
ColumnStatistics {
null_count: null_count
.map(Precision::Exact)
.unwrap_or(Precision::Absent),
max_value: max.map(Precision::Exact).unwrap_or(Precision::Absent),
min_value: min.map(Precision::Exact).unwrap_or(Precision::Absent),
distinct_count: Precision::Absent,
}
s.get_as::<usize>(Stat::UncompressedSizeInBytes)
.unwrap_or_default()
})
.collect::<Vec<_>>();

println!("column_stats: {:?}", column_statistics);

Ok(Statistics {
// num_rows: Precision::Exact(
// usize::try_from(row_count)
// .map_err(|_| vortex_err!("Row count overflow"))
// .vortex_expect("Row count overflow"),
// ),
num_rows: Precision::Absent,
total_byte_size,
column_statistics,
.sum(),
);

let column_statistics = stats
.into_iter()
.map(|s| {
let null_count = s.get_as::<usize>(Stat::NullCount);
let min = s
.get(Stat::Min)
.cloned()
.and_then(|s| ScalarValue::try_from(s).ok());
let max = s
.get(Stat::Max)
.cloned()
.and_then(|s| ScalarValue::try_from(s).ok());
ColumnStatistics {
null_count: null_count
.map(Precision::Exact)
.unwrap_or(Precision::Absent),
max_value: max.map(Precision::Exact).unwrap_or(Precision::Absent),
min_value: min.map(Precision::Exact).unwrap_or(Precision::Absent),
distinct_count: Precision::Absent,
}
})
}
.boxed()
.await
.collect::<Vec<_>>();

println!("column_stats: {:?}", column_statistics);

Ok(Statistics {
num_rows: Precision::Exact(
usize::try_from(vxf.row_count())
.map_err(|_| vortex_err!("Row count overflow"))
.vortex_expect("Row count overflow"),
),
total_byte_size,
column_statistics,
})
}

async fn create_physical_plan(
Expand Down
2 changes: 1 addition & 1 deletion vortex-file/src/v2/exec/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use vortex_error::VortexResult;
///
/// Note that the futures encapsulate heavy CPU code such as filtering and decompression. To
/// offload keep I/O work separate, please see the [`crate::v2::io::IoDriver`] trait.
pub trait ExecDriver: Send {
pub trait ExecDriver: Send + Sync {
fn drive(
&self,
stream: BoxStream<'static, BoxFuture<'static, VortexResult<ArrayData>>>,
Expand Down
27 changes: 22 additions & 5 deletions vortex-layout/src/layouts/chunked/eval_stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,27 @@ impl StatsEvaluator for ChunkedReader {
field_paths: Arc<[FieldPath]>,
stats: Arc<[Stat]>,
) -> VortexResult<Vec<StatsSet>> {
println!(
"ChunkedReader::evaluate_stats {:?} {:?}",
field_paths, stats
);
Ok(vec![StatsSet::default(); field_paths.len()])
if field_paths.is_empty() {
return Ok(vec![]);
}

// Otherwise, fetch the stats table
let Some(stats_table) = self.stats_table().await? else {
return Ok(vec![StatsSet::empty(); field_paths.len()]);
};

let mut stat_sets = Vec::with_capacity(field_paths.len());
for field_path in field_paths.iter() {
if !field_path.is_root() {
// TODO(ngates): the stats table only stores a single array, so we can only answer
// stats if the field path == root.
// See <https://github.com/spiraldb/vortex/issues/1835> for more details.
stat_sets.push(StatsSet::empty());
continue;
}
stat_sets.push(stats_table.to_stats_set(&stats)?);
}

Ok(stat_sets)
}
}
60 changes: 56 additions & 4 deletions vortex-layout/src/layouts/chunked/stats_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@ use std::sync::Arc;
use itertools::Itertools;
use vortex_array::array::StructArray;
use vortex_array::builders::{builder_with_capacity, ArrayBuilder, ArrayBuilderExt};
use vortex_array::stats::{ArrayStatistics as _, Stat};
use vortex_array::compute::try_cast;
use vortex_array::stats::{ArrayStatistics as _, Stat, StatsSet};
use vortex_array::validity::{ArrayValidity, Validity};
use vortex_array::{ArrayDType, ArrayData, IntoArrayData};
use vortex_dtype::{DType, Nullability, StructDType};
use vortex_error::{vortex_bail, VortexResult};
use vortex_array::{ArrayDType, ArrayData, IntoArrayData, IntoArrayVariant};
use vortex_dtype::{DType, Nullability, PType, StructDType};
use vortex_error::{vortex_bail, VortexExpect, VortexResult};

/// A table of statistics for a column.
/// Each row of the stats table corresponds to a chunk of the column.
Expand Down Expand Up @@ -68,6 +69,57 @@ impl StatsTable {
pub fn present_stats(&self) -> &[Stat] {
&self.stats
}

/// Return an aggregated stats set for the table.
pub fn to_stats_set(&self, stats: &[Stat]) -> VortexResult<StatsSet> {
let mut stats_set = StatsSet::default();
for stat in stats {
let Some(array) = self.get_stat(*stat)? else {
continue;
};

// Different stats need different aggregations
match stat {
// For stats that are associative, we can just compute them over the stat column
Stat::Min | Stat::Max => {
if let Some(s) = array.statistics().compute(*stat) {
stats_set.set(*stat, s)
}
}
// These stats sum up
Stat::TrueCount | Stat::NullCount | Stat::UncompressedSizeInBytes => {
// TODO(ngates): use Stat::Sum when we add it.
let parray =
try_cast(array, &DType::Primitive(PType::U64, Nullability::Nullable))?
.into_primitive()?;
let sum: u64 = parray
.as_slice::<u64>()
.iter()
.enumerate()
.filter_map(|(i, v)| parray.validity().is_valid(i).then_some(*v))
.sum();
stats_set.set(*stat, sum);
}
// We could implement these aggregations in the future, but for now they're unused
Stat::BitWidthFreq
| Stat::TrailingZeroFreq
| Stat::RunCount
| Stat::IsConstant
| Stat::IsSorted
| Stat::IsStrictSorted => {}
}
}
Ok(stats_set)
}

/// Return the array for a given stat.
pub fn get_stat(&self, stat: Stat) -> VortexResult<Option<ArrayData>> {
Ok(self
.array
.as_struct_array()
.vortex_expect("Stats table must be a struct array")
.maybe_null_field_by_name(stat.name()))
}
}

/// Accumulates statistics for a column.
Expand Down

0 comments on commit 2809907

Please sign in to comment.