Skip to content

Commit

Permalink
fix and add test
Browse files Browse the repository at this point in the history
  • Loading branch information
waruto210 committed Sep 18, 2024
1 parent d4cf452 commit a7606b4
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -747,7 +747,7 @@ impl ExecutionPlan for ParquetExec {
|| self.page_pruning_predicate.is_some()
|| (self.predicate.is_some() && self.pushdown_filters())
{
Statistics::new_unknown(&self.schema())
self.projected_statistics.clone().to_inexact()
} else {
self.projected_statistics.clone()
};
Expand Down
26 changes: 26 additions & 0 deletions datafusion/core/tests/parquet/file_statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use datafusion::datasource::TableProvider;
use datafusion::execution::context::SessionState;
use datafusion::prelude::SessionContext;
use datafusion_common::stats::Precision;
use datafusion_common::{Column, ScalarValue};
use datafusion_execution::cache::cache_manager::CacheManagerConfig;
use datafusion_execution::cache::cache_unit;
use datafusion_execution::cache::cache_unit::{
Expand All @@ -36,8 +37,33 @@ use datafusion_execution::config::SessionConfig;
use datafusion_execution::runtime_env::RuntimeEnvBuilder;

use datafusion::execution::session_state::SessionStateBuilder;
use datafusion_expr::{BinaryExpr, Expr};
use tempfile::tempdir;

#[tokio::test]
async fn check_stats_precision_with_filter_pushdown() {
let testdata = datafusion::test_util::parquet_test_data();
let filename = format!("{}/{}", testdata, "alltypes_plain.parquet");
let table_path = ListingTableUrl::parse(filename).unwrap();

let opt = ListingOptions::new(Arc::new(ParquetFormat::default()));
let table = get_listing_table(&table_path, None, &opt).await;
let (_, _, state) = get_cache_runtime_state();
// Scan without filter, stats are exact
let exec = table.scan(&state, None, &[], None).await.unwrap();
assert_eq!(exec.statistics().unwrap().num_rows, Precision::Exact(8));

// Scan with filter pushdown, stats are inexact
let filter = Expr::BinaryExpr(BinaryExpr {
left: Box::new(Expr::Column(Column::from_name("id"))),
op: datafusion_expr::Operator::And,
right: Box::new(Expr::Literal(ScalarValue::UInt64(Some(1)))),
});

let exec = table.scan(&state, None, &[filter], None).await.unwrap();
assert_eq!(exec.statistics().unwrap().num_rows, Precision::Inexact(8));
}

#[tokio::test]
async fn load_table_stats_with_session_level_cache() {
let testdata = datafusion::test_util::parquet_test_data();
Expand Down
15 changes: 0 additions & 15 deletions datafusion/core/tests/sql/path_partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -493,21 +493,6 @@ async fn parquet_statistics() -> Result<()> {
// TODO assert partition column stats once implemented (#1186)
assert_eq!(stat_cols[1], ColumnStatistics::new_unknown());

//// WITH Filter PushDown to [`ParquetExec`] ////
let dataframe = ctx
.sql("SELECT mycol, year FROM t WHERE mycol='value'")
.await?;
let physical_plan = dataframe.create_physical_plan().await?;
let schema = physical_plan.schema();
assert_eq!(schema.fields().len(), 2);

let stat_cols = physical_plan.statistics()?.column_statistics;
assert_eq!(stat_cols.len(), 2);
// stats for the first col are absent due to filter pushdown
assert_eq!(stat_cols[0].null_count, Precision::Absent);
// TODO assert partition column stats once implemented (#1186)
assert_eq!(stat_cols[1], ColumnStatistics::new_unknown());

Ok(())
}

Expand Down

0 comments on commit a7606b4

Please sign in to comment.