From bea8e17252ed3e4bb47a9c96e505e72204ceae9d Mon Sep 17 00:00:00 2001 From: Simon Lin Date: Wed, 18 Sep 2024 13:37:13 +0200 Subject: [PATCH] c --- crates/polars-io/src/parquet/read/reader.rs | 48 ++++---- crates/polars-io/src/parquet/read/utils.rs | 33 +++++- .../src/executors/scan/mod.rs | 28 ----- .../src/executors/scan/parquet.rs | 112 ++++++++---------- .../src/executors/sources/parquet.rs | 70 +++++------ py-polars/tests/unit/io/test_lazy_parquet.py | 59 +++++++++ 6 files changed, 201 insertions(+), 149 deletions(-) diff --git a/crates/polars-io/src/parquet/read/reader.rs b/crates/polars-io/src/parquet/read/reader.rs index 6d4a362c7198..af3f95046ae8 100644 --- a/crates/polars-io/src/parquet/read/reader.rs +++ b/crates/polars-io/src/parquet/read/reader.rs @@ -15,6 +15,7 @@ pub use super::read_impl::BatchedParquetReader; use super::read_impl::{compute_row_group_range, read_parquet, FetchRowGroupsFromMmapReader}; #[cfg(feature = "cloud")] use super::utils::materialize_empty_df; +use super::utils::projected_arrow_schema_to_projection_indices; #[cfg(feature = "cloud")] use crate::cloud::CloudOptions; use crate::mmap::MmapBytesReader; @@ -80,20 +81,23 @@ impl ParquetReader { self } - /// Ensure the schema of the file matches the given schema. Calling this - /// after setting the projection will ensure only the projected indices - /// are checked. - pub fn check_schema(mut self, schema: &ArrowSchema) -> PolarsResult { - let self_schema = self.schema()?; - let self_schema = self_schema.as_ref(); + /// Checks that the file contains all the columns in `projected_arrow_schema` with the same + /// dtype, and sets the projection indices. + pub fn with_projected_arrow_schema( + mut self, + first_schema: &ArrowSchema, + projected_arrow_schema: Option<&ArrowSchema>, + ) -> PolarsResult { + let schema = self.schema()?; - if let Some(projection) = self.projection.as_deref() { - ensure_matching_schema( - &schema.try_project_indices(projection)?, - &self_schema.try_project_indices(projection)?, + if let Some(projected_arrow_schema) = projected_arrow_schema { + self.projection = projected_arrow_schema_to_projection_indices( + schema.as_ref(), + projected_arrow_schema, )?; } else { - ensure_matching_schema(schema, self_schema)?; + self.projection = + projected_arrow_schema_to_projection_indices(schema.as_ref(), first_schema)?; } Ok(self) @@ -288,17 +292,21 @@ impl ParquetAsyncReader { }) } - pub async fn check_schema(mut self, schema: &ArrowSchema) -> PolarsResult { - let self_schema = self.schema().await?; - let self_schema = self_schema.as_ref(); - - if let Some(projection) = self.projection.as_deref() { - ensure_matching_schema( - &schema.try_project_indices(projection)?, - &self_schema.try_project_indices(projection)?, + pub async fn with_projected_arrow_schema( + mut self, + first_schema: &ArrowSchema, + projected_arrow_schema: Option<&ArrowSchema>, + ) -> PolarsResult { + let schema = self.schema().await?; + + if let Some(projected_arrow_schema) = projected_arrow_schema { + self.projection = projected_arrow_schema_to_projection_indices( + schema.as_ref(), + projected_arrow_schema, )?; } else { - ensure_matching_schema(schema, self_schema)?; + self.projection = + projected_arrow_schema_to_projection_indices(schema.as_ref(), first_schema)?; } Ok(self) diff --git a/crates/polars-io/src/parquet/read/utils.rs b/crates/polars-io/src/parquet/read/utils.rs index 34cc752dd782..129e49f2d075 100644 --- a/crates/polars-io/src/parquet/read/utils.rs +++ b/crates/polars-io/src/parquet/read/utils.rs @@ -1,6 +1,7 @@ use std::borrow::Cow; -use polars_core::prelude::{ArrowSchema, DataFrame, Series, IDX_DTYPE}; +use polars_core::prelude::{ArrowSchema, DataFrame, DataType, Series, IDX_DTYPE}; +use polars_error::{polars_bail, PolarsResult}; use crate::hive::materialize_hive_partitions; use crate::utils::apply_projection; @@ -28,3 +29,33 @@ pub fn materialize_empty_df( df } + +pub(super) fn projected_arrow_schema_to_projection_indices( + schema: &ArrowSchema, + projected_arrow_schema: &ArrowSchema, +) -> PolarsResult>> { + let mut projection_indices = Vec::with_capacity(projected_arrow_schema.len()); + let mut is_full_ordered_projection = projected_arrow_schema.len() == schema.len(); + + for (i, field) in projected_arrow_schema.iter_values().enumerate() { + let dtype = { + let Some((idx, _, field)) = schema.get_full(&field.name) else { + polars_bail!(SchemaMismatch: "did not find column: {}", field.name) + }; + + projection_indices.push(idx); + is_full_ordered_projection &= idx == i; + + DataType::from_arrow(&field.dtype, true) + }; + let expected_dtype = DataType::from_arrow(&field.dtype, true); + + if dtype.clone() != expected_dtype { + polars_bail!(SchemaMismatch: "data type mismatch for column {}: found: {}, expected: {}", + &field.name, dtype, expected_dtype + ) + } + } + + Ok((!is_full_ordered_projection).then_some(projection_indices)) +} diff --git a/crates/polars-mem-engine/src/executors/scan/mod.rs b/crates/polars-mem-engine/src/executors/scan/mod.rs index 59a98ea800d7..d4e623ed7df0 100644 --- a/crates/polars-mem-engine/src/executors/scan/mod.rs +++ b/crates/polars-mem-engine/src/executors/scan/mod.rs @@ -30,34 +30,6 @@ pub(crate) use self::python_scan::*; use super::*; use crate::prelude::*; -#[cfg(any(feature = "ipc", feature = "parquet"))] -type Projection = Option>; -#[cfg(any(feature = "ipc", feature = "parquet"))] -type Predicate = Option>; - -#[cfg(any(feature = "ipc", feature = "parquet"))] -fn prepare_scan_args( - predicate: Option>, - with_columns: &mut Option>, - schema: &mut SchemaRef, - has_row_index: bool, - hive_partitions: Option<&[Series]>, -) -> (Projection, Predicate) { - let with_columns = mem::take(with_columns); - let schema = mem::take(schema); - - let projection = materialize_projection( - with_columns.as_deref(), - &schema, - hive_partitions, - has_row_index, - ); - - let predicate = predicate.map(phys_expr_to_io_expr); - - (projection, predicate) -} - /// Producer of an in memory DataFrame pub struct DataFrameExec { pub(crate) df: Arc, diff --git a/crates/polars-mem-engine/src/executors/scan/parquet.rs b/crates/polars-mem-engine/src/executors/scan/parquet.rs index 27160e2356e6..bac328d347aa 100644 --- a/crates/polars-mem-engine/src/executors/scan/parquet.rs +++ b/crates/polars-mem-engine/src/executors/scan/parquet.rs @@ -62,6 +62,17 @@ impl ParquetExec { // Modified if we have a negative slice let mut first_source = 0; + let first_schema = self.file_info.reader_schema.clone().unwrap().unwrap_left(); + + let projected_arrow_schema = { + if let Some(with_columns) = self.file_options.with_columns.as_deref() { + Some(Arc::new(first_schema.try_project(with_columns)?)) + } else { + None + } + }; + let predicate = self.predicate.clone().map(phys_expr_to_io_expr); + // (offset, end) let (slice_offset, slice_end) = if let Some(slice) = self.file_options.slice { if slice.0 >= 0 { @@ -150,14 +161,6 @@ impl ParquetExec { .as_ref() .map(|x| x[i].materialize_partition_columns()); - let (projection, predicate) = prepare_scan_args( - self.predicate.clone(), - &mut self.file_options.with_columns.clone(), - &mut self.file_info.schema.clone(), - base_row_index.is_some(), - hive_partitions.as_deref(), - ); - let memslice = source.to_memslice()?; let mut reader = ParquetReader::new(std::io::Cursor::new(memslice)); @@ -181,9 +184,7 @@ impl ParquetExec { .map(|x| (x.clone(), Arc::from(source.to_include_path_name()))), ); - reader - .num_rows() - .map(|num_rows| (reader, num_rows, predicate, projection)) + reader.num_rows().map(|num_rows| (reader, num_rows)) }); // We do this in parallel because wide tables can take a long time deserializing metadata. @@ -192,7 +193,7 @@ impl ParquetExec { let current_offset_ref = &mut current_offset; let row_statistics = readers_and_metadata .iter() - .map(|(_, num_rows, _, _)| { + .map(|(_, num_rows)| { let cum_rows = *current_offset_ref; ( cum_rows, @@ -205,31 +206,24 @@ impl ParquetExec { readers_and_metadata .into_par_iter() .zip(row_statistics.into_par_iter()) - .map( - |((reader, _, predicate, projection), (cumulative_read, slice))| { - let row_index = base_row_index.as_ref().map(|rc| RowIndex { - name: rc.name.clone(), - offset: rc.offset + cumulative_read as IdxSize, - }); - - let df = reader - .with_slice(Some(slice)) - .with_row_index(row_index) - .with_predicate(predicate.clone()) - .with_projection(projection.clone()) - .check_schema( - self.file_info - .reader_schema - .clone() - .unwrap() - .unwrap_left() - .as_ref(), - )? - .finish()?; - - Ok(df) - }, - ) + .map(|((reader, _), (cumulative_read, slice))| { + let row_index = base_row_index.as_ref().map(|rc| RowIndex { + name: rc.name.clone(), + offset: rc.offset + cumulative_read as IdxSize, + }); + + let df = reader + .with_slice(Some(slice)) + .with_row_index(row_index) + .with_predicate(predicate.clone()) + .with_projected_arrow_schema( + first_schema.as_ref(), + projected_arrow_schema.as_deref(), + )? + .finish()?; + + Ok(df) + }) .collect::>>() })?; @@ -261,6 +255,17 @@ impl ParquetExec { eprintln!("POLARS PREFETCH_SIZE: {}", batch_size) } + let first_schema = self.file_info.reader_schema.clone().unwrap().unwrap_left(); + + let projected_arrow_schema = { + if let Some(with_columns) = self.file_options.with_columns.as_deref() { + Some(Arc::new(first_schema.try_project(with_columns)?)) + } else { + None + } + }; + let predicate = self.predicate.clone().map(phys_expr_to_io_expr); + // Modified if we have a negative slice let mut first_file_idx = 0; @@ -384,12 +389,12 @@ impl ParquetExec { .collect::>(); // Now read the actual data. - let file_info = &self.file_info; - let file_options = &self.file_options; let use_statistics = self.options.use_statistics; - let predicate = &self.predicate; let base_row_index_ref = &base_row_index; let include_file_paths = self.file_options.include_file_paths.as_ref(); + let first_schema = first_schema.clone(); + let projected_arrow_schema = projected_arrow_schema.clone(); + let predicate = predicate.clone(); if verbose { eprintln!("reading of {}/{} file...", processed, paths.len()); @@ -399,40 +404,27 @@ impl ParquetExec { .into_iter() .enumerate() .map(|(i, (_, reader))| { + let first_schema = first_schema.clone(); + let projected_arrow_schema = projected_arrow_schema.clone(); + let predicate = predicate.clone(); let (cumulative_read, slice) = row_statistics[i]; let hive_partitions = hive_parts .as_ref() .map(|x| x[i].materialize_partition_columns()); - let schema = self - .file_info - .reader_schema - .as_ref() - .unwrap() - .as_ref() - .unwrap_left() - .clone(); - async move { - let file_info = file_info.clone(); let row_index = base_row_index_ref.as_ref().map(|rc| RowIndex { name: rc.name.clone(), offset: rc.offset + cumulative_read as IdxSize, }); - let (projection, predicate) = prepare_scan_args( - predicate.clone(), - &mut file_options.with_columns.clone(), - &mut file_info.schema.clone(), - row_index.is_some(), - hive_partitions.as_deref(), - ); - let df = reader .with_slice(Some(slice)) .with_row_index(row_index) - .with_projection(projection) - .check_schema(schema.as_ref()) + .with_projected_arrow_schema( + first_schema.as_ref(), + projected_arrow_schema.as_deref(), + ) .await? .use_statistics(use_statistics) .with_predicate(predicate) diff --git a/crates/polars-pipe/src/executors/sources/parquet.rs b/crates/polars-pipe/src/executors/sources/parquet.rs index b5d4230cb52a..b4099c98e500 100644 --- a/crates/polars-pipe/src/executors/sources/parquet.rs +++ b/crates/polars-pipe/src/executors/sources/parquet.rs @@ -4,6 +4,7 @@ use std::path::PathBuf; use std::sync::atomic::AtomicUsize; use std::sync::Arc; +use arrow::datatypes::ArrowSchema; use futures::{StreamExt, TryStreamExt}; use polars_core::config::{self, get_file_prefetch_size}; use polars_core::error::*; @@ -15,7 +16,6 @@ use polars_io::parquet::read::{BatchedParquetReader, ParquetOptions, ParquetRead use polars_io::path_utils::is_cloud_url; use polars_io::pl_async::get_runtime; use polars_io::predicates::PhysicalIoExpr; -use polars_io::prelude::materialize_projection; #[cfg(feature = "async")] use polars_io::prelude::ParquetAsyncReader; use polars_io::utils::slice::split_slice_at_file; @@ -42,11 +42,12 @@ pub struct ParquetSource { #[allow(dead_code)] cloud_options: Option, first_metadata: Option, - file_info: FileInfo, hive_parts: Option>>, verbose: bool, run_async: bool, prefetch_size: usize, + first_schema: Arc, + projected_arrow_schema: Option>, predicate: Option>, } @@ -72,7 +73,6 @@ impl ParquetSource { &PathBuf, ParquetOptions, FileScanOptions, - Option>, usize, Option>, )> { @@ -83,35 +83,24 @@ impl ParquetSource { let path = &paths[index]; let options = self.options; let file_options = self.file_options.clone(); - let schema = self.file_info.schema.clone(); let hive_partitions = self .hive_parts .as_ref() .map(|x| x[index].materialize_partition_columns()); - let projection = materialize_projection( - file_options.with_columns.as_deref(), - &schema, - hive_partitions.as_deref(), - false, - ); - - let n_cols = projection.as_ref().map(|v| v.len()).unwrap_or(schema.len()); - let chunk_size = determine_chunk_size(n_cols, self.n_threads)?; + let chunk_size = determine_chunk_size( + self.projected_arrow_schema + .as_ref() + .map_or(self.first_schema.len(), |x| x.len()), + self.n_threads, + )?; if self.verbose { eprintln!("STREAMING CHUNK SIZE: {chunk_size} rows") } - Ok(( - path, - options, - file_options, - projection, - chunk_size, - hive_partitions, - )) + Ok((path, options, file_options, chunk_size, hive_partitions)) } fn init_reader_sync(&mut self) -> PolarsResult<()> { @@ -127,7 +116,7 @@ impl ParquetSource { } let predicate = self.predicate.clone(); - let (path, options, file_options, projection, chunk_size, hive_partitions) = + let (path, options, file_options, chunk_size, hive_partitions) = self.prepare_init_reader(index)?; let batched_reader = { @@ -142,14 +131,9 @@ impl ParquetSource { } let mut reader = reader - .with_projection(projection) - .check_schema( - self.file_info - .reader_schema - .as_ref() - .unwrap() - .as_ref() - .unwrap_left(), + .with_projected_arrow_schema( + &self.first_schema, + self.projected_arrow_schema.as_deref(), )? .with_row_index(file_options.row_index) .with_predicate(predicate.clone()) @@ -202,7 +186,7 @@ impl ParquetSource { let metadata = self.first_metadata.clone().filter(|_| index == 0); let predicate = self.predicate.clone(); let cloud_options = self.cloud_options.clone(); - let (path, options, file_options, projection, chunk_size, hive_partitions) = + let (path, options, file_options, chunk_size, hive_partitions) = self.prepare_init_reader(index)?; let batched_reader = { @@ -212,14 +196,9 @@ impl ParquetSource { ParquetAsyncReader::from_uri(&uri, cloud_options.as_ref(), metadata) .await? .with_row_index(file_options.row_index) - .with_projection(projection) - .check_schema( - self.file_info - .reader_schema - .as_ref() - .unwrap() - .as_ref() - .unwrap_left(), + .with_projected_arrow_schema( + &self.first_schema, + self.projected_arrow_schema.as_deref(), ) .await? .with_predicate(predicate.clone()) @@ -280,6 +259,16 @@ impl ParquetSource { } let run_async = paths.first().map(is_cloud_url).unwrap_or(false) || config::force_async(); + let first_schema = file_info.reader_schema.clone().unwrap().unwrap_left(); + + let projected_arrow_schema = { + if let Some(with_columns) = file_options.with_columns.as_deref() { + Some(Arc::new(first_schema.try_project(with_columns)?)) + } else { + None + } + }; + let mut source = ParquetSource { batched_readers: VecDeque::new(), n_threads, @@ -291,11 +280,12 @@ impl ParquetSource { sources, cloud_options, first_metadata, - file_info, hive_parts, verbose, run_async, prefetch_size, + first_schema, + projected_arrow_schema, predicate, }; // Already start downloading when we deal with cloud urls. diff --git a/py-polars/tests/unit/io/test_lazy_parquet.py b/py-polars/tests/unit/io/test_lazy_parquet.py index f9fd0bd00991..f3c2930667a0 100644 --- a/py-polars/tests/unit/io/test_lazy_parquet.py +++ b/py-polars/tests/unit/io/test_lazy_parquet.py @@ -478,6 +478,7 @@ def test_predicate_push_down_categorical_17744(tmp_path: Path) -> None: ) +@pytest.mark.write_disk @pytest.mark.parametrize("streaming", [True, False]) def test_parquet_slice_pushdown_non_zero_offset( tmp_path: Path, streaming: bool @@ -531,6 +532,7 @@ def trim_to_metadata(path: str | Path) -> None: ) +@pytest.mark.write_disk @pytest.mark.parametrize("streaming", [True, False]) def test_parquet_row_groups_shift_bug_18739(tmp_path: Path, streaming: bool) -> None: tmp_path.mkdir(exist_ok=True) @@ -563,3 +565,60 @@ def remove_metadata(path: str | Path) -> None: remove_metadata(path) assert_frame_equal(lf.collect(streaming=streaming), df) + + +@pytest.mark.write_disk +@pytest.mark.parametrize("streaming", [True, False]) +def test_parquet_unaligned_schema_read(tmp_path: Path, streaming: bool) -> None: + dfs = [ + pl.DataFrame({"a": 1, "b": 10}), + pl.DataFrame({"b": 11, "a": 2}), + pl.DataFrame({"x": 3, "a": 3, "y": 3, "b": 12}), + ] + + paths = [tmp_path / "1", tmp_path / "2", tmp_path / "3"] + + for df, path in zip(dfs, paths): + df.write_parquet(path) + + lf = pl.scan_parquet(paths) + + assert_frame_equal( + lf.select("a").collect(streaming=streaming), pl.DataFrame({"a": [1, 2, 3]}) + ) + + assert_frame_equal( + lf.select("b", "a").collect(streaming=streaming), + pl.DataFrame({"b": [10, 11, 12], "a": [1, 2, 3]}), + ) + + assert_frame_equal( + pl.scan_parquet(paths[:2]).collect(streaming=streaming), + pl.DataFrame({"a": [1, 2], "b": [10, 11]}), + ) + + assert_frame_equal( + lf.collect(streaming=streaming), + pl.DataFrame({"a": [1, 2, 3], "b": [10, 11, 12]}), + ) + + +@pytest.mark.write_disk +@pytest.mark.parametrize("streaming", [True, False]) +def test_parquet_unaligned_schema_read_dtype_mismatch( + tmp_path: Path, streaming: bool +) -> None: + dfs = [ + pl.DataFrame({"a": 1, "b": 10}), + pl.DataFrame({"b": "11", "a": "2"}), + ] + + paths = [tmp_path / "1", tmp_path / "2"] + + for df, path in zip(dfs, paths): + df.write_parquet(path) + + lf = pl.scan_parquet(paths) + + with pytest.raises(pl.exceptions.SchemaError, match="data type mismatch"): + lf.collect(streaming=streaming)