Skip to content

Commit

Permalink
Layout Stats
Browse files Browse the repository at this point in the history
  • Loading branch information
gatesn committed Jan 15, 2025
1 parent 08ed8aa commit 27f8b13
Show file tree
Hide file tree
Showing 11 changed files with 250 additions and 29 deletions.
7 changes: 7 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions vortex-array/src/stats/statsset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,13 @@ impl StatsSet {
Self { values }
}

/// Create a new, empty StatsSet.
///
/// If you are planning to add stats to the set, consider using [StatsSet::default] instead.
pub fn empty() -> Self {
Self { values: vec![] }
}

/// Specialized constructor for the case where the StatsSet represents
/// an array consisting entirely of [null](vortex_dtype::DType::Null) values.
pub fn nulls(len: usize, dtype: &DType) -> Self {
Expand Down
1 change: 1 addition & 0 deletions vortex-datafusion/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ name = "vortex_datafusion"
path = "src/lib.rs"

[dependencies]
always_send = "0.1.1"
arrow-array = { workspace = true }
arrow-schema = { workspace = true }
async-trait = { workspace = true }
Expand Down
101 changes: 93 additions & 8 deletions vortex-datafusion/src/persistent/format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,24 @@ use datafusion::datasource::file_format::{FileFormat, FilePushdownSupport};
use datafusion::datasource::physical_plan::{FileScanConfig, FileSinkConfig};
use datafusion::execution::SessionState;
use datafusion_common::parsers::CompressionTypeVariant;
use datafusion_common::{not_impl_err, DataFusionError, Result as DFResult, Statistics};
use datafusion_common::stats::Precision;
use datafusion_common::{
not_impl_err, ColumnStatistics, DataFusionError, Result as DFResult, ScalarValue, Statistics,
};
use datafusion_expr::Expr;
use datafusion_physical_expr::{LexRequirement, PhysicalExpr};
use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
use datafusion_physical_plan::ExecutionPlan;
use futures::{stream, StreamExt as _, TryStreamExt as _};
use futures::{stream, FutureExt, StreamExt as _, TryStreamExt as _};
use object_store::{ObjectMeta, ObjectStore};
use vortex_array::arrow::infer_schema;
use vortex_array::stats::Stat;
use vortex_array::ContextRef;
use vortex_dtype::FieldPath;
use vortex_error::VortexResult;
use vortex_file::v2::VortexOpenOptions;
use vortex_file::VORTEX_FILE_EXTENSION;
use vortex_io::ObjectStoreReadAt;

use super::cache::FileLayoutCache;
use super::execution::VortexExec;
Expand Down Expand Up @@ -119,14 +126,92 @@ impl FileFormat for VortexFormat {
async fn infer_stats(
&self,
_state: &SessionState,
_store: &Arc<dyn ObjectStore>,
store: &Arc<dyn ObjectStore>,
table_schema: SchemaRef,
_object: &ObjectMeta,
object: &ObjectMeta,
) -> DFResult<Statistics> {
// TODO(ngates): we should decide if it's worth returning file statistics. Since this
// call doesn't have projection information, I think it's better to wait until we can
// return per-partition statistics from VortexExpr ExecutionPlan node.
Ok(Statistics::new_unknown(table_schema.as_ref()))
let store = store.clone();
let read_at = ObjectStoreReadAt::new(store.clone(), object.location.clone());

async move {
let field_paths = table_schema
.fields()
.iter()
.map(|f| FieldPath::from_name(f.name().to_owned()))
.collect();

let file_layout = self
.file_layout_cache
.try_get(object, store.clone())
.await?;

let stats = {
let vxf = VortexOpenOptions::new(self.context.clone())
.with_file_layout(file_layout)
.open(read_at)
.await?;

vxf.statistics(
field_paths,
[
Stat::Min,
Stat::Max,
Stat::NullCount,
Stat::UncompressedSizeInBytes,
]
.into(),
)?
.await?
};

let total_byte_size = Precision::Inexact(
stats
.iter()
.map(|s| {
s.get_as::<usize>(Stat::UncompressedSizeInBytes)
.unwrap_or_default()
})
.sum(),
);

let column_statistics = stats
.into_iter()
.map(|s| {
let null_count = s.get_as::<usize>(Stat::NullCount);
let min = s
.get(Stat::Min)
.cloned()
.and_then(|s| ScalarValue::try_from(s).ok());
let max = s
.get(Stat::Max)
.cloned()
.and_then(|s| ScalarValue::try_from(s).ok());
ColumnStatistics {
null_count: null_count
.map(Precision::Exact)
.unwrap_or(Precision::Absent),
max_value: max.map(Precision::Exact).unwrap_or(Precision::Absent),
min_value: min.map(Precision::Exact).unwrap_or(Precision::Absent),
distinct_count: Precision::Absent,
}
})
.collect::<Vec<_>>();

println!("column_stats: {:?}", column_statistics);

Ok(Statistics {
// num_rows: Precision::Exact(
// usize::try_from(row_count)
// .map_err(|_| vortex_err!("Row count overflow"))
// .vortex_expect("Row count overflow"),
// ),
num_rows: Precision::Absent,
total_byte_size,
column_statistics,
})
}
.boxed()
.await
}

async fn create_physical_plan(
Expand Down
11 changes: 11 additions & 0 deletions vortex-dtype/src/field.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@ impl Field {

/// A path through a (possibly nested) struct, composed of a sequence of field selectors
// TODO(ngates): wrap `Vec<Field>` in Option for cheaper "root" path.
// TODO(ngates): we should probably reverse the path. Or better yet, store a Arc<[Field]> along
// with a positional index to allow cheap step_into.
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
pub struct FieldPath(Vec<Field>);
Expand Down Expand Up @@ -128,6 +130,15 @@ impl FieldPath {
pub fn push<F: Into<Field>>(&mut self, field: F) {
self.0.push(field.into());
}

/// Steps into the next field in the path
pub fn step_into(mut self) -> VortexResult<Self> {
if self.0.is_empty() {
return Err(vortex_err!("Cannot step into root path"));
}
self.0 = self.0.iter().skip(1).cloned().collect();
Ok(self)
}
}

impl FromIterator<Field> for FieldPath {
Expand Down
2 changes: 1 addition & 1 deletion vortex-file/src/v2/exec/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use vortex_error::VortexResult;
///
/// Note that the futures encapsulate heavy CPU code such as filtering and decompression. To
/// offload keep I/O work separate, please see the [`crate::v2::io::IoDriver`] trait.
pub trait ExecDriver {
pub trait ExecDriver: Send {
fn drive(
&self,
stream: BoxStream<'static, BoxFuture<'static, VortexResult<ArrayData>>>,
Expand Down
84 changes: 77 additions & 7 deletions vortex-file/src/v2/file.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
use std::future::Future;
use std::ops::Range;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};

use futures::Stream;
use futures_util::{stream, FutureExt, StreamExt, TryStreamExt};
use futures_util::{stream, FutureExt, StreamExt, TryFutureExt, TryStreamExt};
use pin_project_lite::pin_project;
use vortex_array::stats::{Stat, StatsSet};
use vortex_array::stream::{ArrayStream, ArrayStreamAdapter};
use vortex_array::{ArrayData, ContextRef};
use vortex_dtype::DType;
use vortex_array::ContextRef;
use vortex_dtype::{DType, FieldPath};
use vortex_error::{vortex_err, VortexResult};
use vortex_layout::{ExprEvaluator, LayoutReader};
use vortex_layout::{ExprEvaluator, LayoutReader, StatsEvaluator};
use vortex_scan::Scan;

use crate::v2::exec::ExecDriver;
Expand Down Expand Up @@ -91,6 +93,31 @@ impl<I: IoDriver> VortexFile<I> {
},
))
}

/// Resolves the requested statistics for the file.
pub fn statistics(
&self,
field_paths: Arc<[FieldPath]>,
stats: Arc<[Stat]>,
) -> VortexResult<impl Future<Output = VortexResult<Vec<StatsSet>>> + 'static + use<'_, I>>
{
// Set up a segment channel to collect segment requests from the execution stream.
let segment_channel = SegmentChannel::new();

// Create a single LayoutReader that is reused for the entire scan.
let reader: Arc<dyn LayoutReader> = self
.file_layout
.root_layout
.reader(segment_channel.reader(), self.ctx.clone())?;

let exec_future = async move { reader.evaluate_stats(field_paths, stats).await }.boxed();
let io_stream = self.io_driver.drive(segment_channel.into_stream());

Ok(UnifiedDriverFuture {
exec_future,
io_stream,
})
}
}

/// There is no `IntoIterator` for `Arc<[T]>` so to avoid copying into a Vec<T>, we define our own.
Expand Down Expand Up @@ -137,12 +164,12 @@ pin_project! {
}
}

impl<R, S> Stream for UnifiedDriverStream<R, S>
impl<T, R, S> Stream for UnifiedDriverStream<R, S>
where
R: Stream<Item = VortexResult<ArrayData>>,
R: Stream<Item = VortexResult<T>>,
S: Stream<Item = VortexResult<()>>,
{
type Item = VortexResult<ArrayData>;
type Item = VortexResult<T>;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.project();
Expand Down Expand Up @@ -170,3 +197,46 @@ where
}
}
}

pin_project! {
struct UnifiedDriverFuture<R, S> {
#[pin]
exec_future: R,
#[pin]
io_stream: S,
}
}

impl<T, R, S> Future for UnifiedDriverFuture<R, S>
where
R: Future<Output = VortexResult<T>>,
S: Stream<Item = VortexResult<()>>,
{
type Output = VortexResult<T>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.project();
loop {
// If the exec stream is ready, then we can return the result.
// If it's pending, then we try polling the I/O stream.
if let Poll::Ready(r) = this.exec_future.try_poll_unpin(cx) {
return Poll::Ready(r);
}

match this.io_stream.as_mut().try_poll_next_unpin(cx) {
// If the I/O stream made progress, it returns Ok.
Poll::Ready(Some(Ok(()))) => {}
// If the I/O stream failed, then propagate the error.
Poll::Ready(Some(Err(result))) => {
return Poll::Ready(Err(result));
}
// Unexpected end of stream.
Poll::Ready(None) => {
return Poll::Ready(Err(vortex_err!("unexpected end of I/O stream")));
}
// If the I/O stream is not ready, then we return Pending and wait for the next wakeup.
Poll::Pending => return Poll::Pending,
}
}
}
}
10 changes: 8 additions & 2 deletions vortex-layout/src/layouts/chunked/eval_stats.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::sync::Arc;

use async_trait::async_trait;
use vortex_array::stats::{Stat, StatsSet};
use vortex_dtype::FieldPath;
Expand All @@ -10,9 +12,13 @@ use crate::StatsEvaluator;
impl StatsEvaluator for ChunkedReader {
async fn evaluate_stats(
&self,
field_paths: &[FieldPath],
_stats: &[Stat],
field_paths: Arc<[FieldPath]>,
stats: Arc<[Stat]>,
) -> VortexResult<Vec<StatsSet>> {
println!(
"ChunkedReader::evaluate_stats {:?} {:?}",
field_paths, stats
);
Ok(vec![StatsSet::default(); field_paths.len()])
}
}
6 changes: 4 additions & 2 deletions vortex-layout/src/layouts/flat/eval_stats.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::sync::Arc;

use async_trait::async_trait;
use vortex_array::stats::{Stat, StatsSet};
use vortex_dtype::FieldPath;
Expand All @@ -10,8 +12,8 @@ use crate::StatsEvaluator;
impl StatsEvaluator for FlatReader {
async fn evaluate_stats(
&self,
field_paths: &[FieldPath],
_stats: &[Stat],
field_paths: Arc<[FieldPath]>,
_stats: Arc<[Stat]>,
) -> VortexResult<Vec<StatsSet>> {
Ok(vec![StatsSet::default(); field_paths.len()])
}
Expand Down
Loading

0 comments on commit 27f8b13

Please sign in to comment.