Skip to content

Commit

Permalink
enhance: Remove redundant statistics from FileScanConfig
Browse files Browse the repository at this point in the history
Signed-off-by: Alan Tang <[email protected]>

chore: fix some fmt errors

Signed-off-by: Alan Tang <[email protected]>
  • Loading branch information
Standing-Man committed Mar 1, 2025
1 parent 52d750f commit f3eb12a
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 30 deletions.
5 changes: 4 additions & 1 deletion datafusion/core/tests/physical_optimizer/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,10 @@ pub(crate) fn parquet_exec_with_stats() -> Arc<DataSourceExec> {
)
.with_file(PartitionedFile::new("x".to_string(), 10000))
.with_statistics(statistics);
assert_eq!(config.statistics.num_rows, Precision::Inexact(10));
assert_eq!(
config.file_source.statistics().unwrap().num_rows,
Precision::Inexact(10)
);

config.build()
}
Expand Down
24 changes: 11 additions & 13 deletions datafusion/datasource/src/file_scan_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,9 +148,6 @@ pub struct FileScanConfig {
pub file_groups: Vec<Vec<PartitionedFile>>,
/// Table constraints
pub constraints: Constraints,
/// Estimated overall statistics of the files, taking `filters` into account.
/// Defaults to [`Statistics::new_unknown`].
pub statistics: Statistics,
/// Columns on which to project the data. Indexes that are higher than the
/// number of columns of `file_schema` refer to `table_partition_cols`.
pub projection: Option<Vec<usize>>,
Expand Down Expand Up @@ -302,13 +299,13 @@ impl FileScanConfig {
file_source: Arc<dyn FileSource>,
) -> Self {
let statistics = Statistics::new_unknown(&file_schema);
file_source.with_statistics(statistics);

let mut config = Self {
object_store_url,
file_schema,
file_groups: vec![],
constraints: Constraints::empty(),
statistics,
projection: None,
limit: None,
table_partition_cols: vec![],
Expand All @@ -324,7 +321,8 @@ impl FileScanConfig {

/// Set the file source
pub fn with_source(mut self, file_source: Arc<dyn FileSource>) -> Self {
self.file_source = file_source.with_statistics(self.statistics.clone());
let statistics = Statistics::new_unknown(&self.file_schema);
self.file_source = file_source.with_statistics(statistics);
self
}

Expand All @@ -336,8 +334,7 @@ impl FileScanConfig {

/// Set the statistics of the files
pub fn with_statistics(mut self, statistics: Statistics) -> Self {
self.statistics = statistics.clone();
self.file_source = self.file_source.with_statistics(statistics);
self.file_source = self.file_source.with_statistics(statistics.clone());
self
}

Expand All @@ -351,10 +348,7 @@ impl FileScanConfig {
}

fn projected_stats(&self) -> Statistics {
let statistics = self
.file_source
.statistics()
.unwrap_or(self.statistics.clone());
let statistics = self.file_source.statistics().unwrap();

let table_cols_stats = self
.projection_indices()
Expand Down Expand Up @@ -487,7 +481,7 @@ impl FileScanConfig {
return (
Arc::clone(&self.file_schema),
self.constraints.clone(),
self.statistics.clone(),
self.file_source.statistics().unwrap().clone(),
self.output_ordering.clone(),
);
}
Expand Down Expand Up @@ -630,7 +624,11 @@ impl Debug for FileScanConfig {
write!(f, "FileScanConfig {{")?;
write!(f, "object_store_url={:?}, ", self.object_store_url)?;

write!(f, "statistics={:?}, ", self.statistics)?;
write!(
f,
"statistics={:?}, ",
self.file_source.statistics().unwrap()
)?;

DisplayAs::fmt_as(self, DisplayFormatType::Verbose, f)?;
write!(f, "}}")
Expand Down
2 changes: 1 addition & 1 deletion datafusion/proto/src/physical_plan/to_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -507,7 +507,7 @@ pub fn serialize_file_scan_config(

Ok(protobuf::FileScanExecConf {
file_groups,
statistics: Some((&conf.statistics).into()),
statistics: Some((&conf.file_source.statistics().unwrap()).into()),
limit: conf.limit.map(|l| protobuf::ScanLimit { limit: l as u32 }),
projection: conf
.projection
Expand Down
30 changes: 15 additions & 15 deletions datafusion/proto/tests/cases/roundtrip_physical_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -742,6 +742,14 @@ fn roundtrip_parquet_exec_with_pruning_predicate() -> Result<()> {
ParquetSource::new(options).with_predicate(Arc::clone(&file_schema), predicate),
);

source.with_statistics(Statistics {
num_rows: Precision::Inexact(100),
total_byte_size: Precision::Inexact(1024),
column_statistics: Statistics::unknown_column(&Arc::new(Schema::new(vec![
Field::new("col", DataType::Utf8, false),
]))),
});

let scan_config = FileScanConfig {
object_store_url: ObjectStoreUrl::local_filesystem(),
file_schema,
Expand All @@ -750,13 +758,6 @@ fn roundtrip_parquet_exec_with_pruning_predicate() -> Result<()> {
1024,
)]],
constraints: Constraints::empty(),
statistics: Statistics {
num_rows: Precision::Inexact(100),
total_byte_size: Precision::Inexact(1024),
column_statistics: Statistics::unknown_column(&Arc::new(Schema::new(vec![
Field::new("col", DataType::Utf8, false),
]))),
},
projection: None,
limit: None,
table_partition_cols: vec![],
Expand Down Expand Up @@ -805,6 +806,13 @@ fn roundtrip_parquet_exec_with_custom_predicate_expr() -> Result<()> {
ParquetSource::default()
.with_predicate(Arc::clone(&file_schema), custom_predicate_expr),
);
source.with_statistics(Statistics {
num_rows: Precision::Inexact(100),
total_byte_size: Precision::Inexact(1024),
column_statistics: Statistics::unknown_column(&Arc::new(Schema::new(vec![
Field::new("col", DataType::Utf8, false),
]))),
});

let scan_config = FileScanConfig {
object_store_url: ObjectStoreUrl::local_filesystem(),
Expand All @@ -814,13 +822,6 @@ fn roundtrip_parquet_exec_with_custom_predicate_expr() -> Result<()> {
1024,
)]],
constraints: Constraints::empty(),
statistics: Statistics {
num_rows: Precision::Inexact(100),
total_byte_size: Precision::Inexact(1024),
column_statistics: Statistics::unknown_column(&Arc::new(Schema::new(vec![
Field::new("col", DataType::Utf8, false),
]))),
},
projection: None,
limit: None,
table_partition_cols: vec![],
Expand Down Expand Up @@ -1616,7 +1617,6 @@ async fn roundtrip_projection_source() -> Result<()> {
1024,
)]],
constraints: Constraints::empty(),
statistics,
file_schema: schema.clone(),
projection: Some(vec![0, 1, 2]),
limit: None,
Expand Down

0 comments on commit f3eb12a

Please sign in to comment.