Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Relaxed schema alignment for parquet file list read #18803

Merged
merged 1 commit into from
Sep 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 28 additions & 20 deletions crates/polars-io/src/parquet/read/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ pub use super::read_impl::BatchedParquetReader;
use super::read_impl::{compute_row_group_range, read_parquet, FetchRowGroupsFromMmapReader};
#[cfg(feature = "cloud")]
use super::utils::materialize_empty_df;
use super::utils::projected_arrow_schema_to_projection_indices;
#[cfg(feature = "cloud")]
use crate::cloud::CloudOptions;
use crate::mmap::MmapBytesReader;
Expand Down Expand Up @@ -80,20 +81,23 @@ impl<R: MmapBytesReader> ParquetReader<R> {
self
}

/// Ensure the schema of the file matches the given schema. Calling this
/// after setting the projection will ensure only the projected indices
/// are checked.
pub fn check_schema(mut self, schema: &ArrowSchema) -> PolarsResult<Self> {
let self_schema = self.schema()?;
let self_schema = self_schema.as_ref();
/// Checks that the file contains all the columns in `projected_arrow_schema` with the same
/// dtype, and sets the projection indices.
pub fn with_projected_arrow_schema(
mut self,
first_schema: &ArrowSchema,
projected_arrow_schema: Option<&ArrowSchema>,
) -> PolarsResult<Self> {
let schema = self.schema()?;

if let Some(projection) = self.projection.as_deref() {
ensure_matching_schema(
&schema.try_project_indices(projection)?,
&self_schema.try_project_indices(projection)?,
if let Some(projected_arrow_schema) = projected_arrow_schema {
self.projection = projected_arrow_schema_to_projection_indices(
schema.as_ref(),
projected_arrow_schema,
)?;
} else {
ensure_matching_schema(schema, self_schema)?;
self.projection =
projected_arrow_schema_to_projection_indices(schema.as_ref(), first_schema)?;
}

Ok(self)
Expand Down Expand Up @@ -288,17 +292,21 @@ impl ParquetAsyncReader {
})
}

pub async fn check_schema(mut self, schema: &ArrowSchema) -> PolarsResult<Self> {
let self_schema = self.schema().await?;
let self_schema = self_schema.as_ref();

if let Some(projection) = self.projection.as_deref() {
ensure_matching_schema(
&schema.try_project_indices(projection)?,
&self_schema.try_project_indices(projection)?,
pub async fn with_projected_arrow_schema(
mut self,
first_schema: &ArrowSchema,
projected_arrow_schema: Option<&ArrowSchema>,
) -> PolarsResult<Self> {
let schema = self.schema().await?;

if let Some(projected_arrow_schema) = projected_arrow_schema {
self.projection = projected_arrow_schema_to_projection_indices(
schema.as_ref(),
projected_arrow_schema,
)?;
} else {
ensure_matching_schema(schema, self_schema)?;
self.projection =
projected_arrow_schema_to_projection_indices(schema.as_ref(), first_schema)?;
}

Ok(self)
Expand Down
33 changes: 32 additions & 1 deletion crates/polars-io/src/parquet/read/utils.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::borrow::Cow;

use polars_core::prelude::{ArrowSchema, DataFrame, Series, IDX_DTYPE};
use polars_core::prelude::{ArrowSchema, DataFrame, DataType, Series, IDX_DTYPE};
use polars_error::{polars_bail, PolarsResult};

use crate::hive::materialize_hive_partitions;
use crate::utils::apply_projection;
Expand Down Expand Up @@ -28,3 +29,33 @@ pub fn materialize_empty_df(

df
}

pub(super) fn projected_arrow_schema_to_projection_indices(
schema: &ArrowSchema,
projected_arrow_schema: &ArrowSchema,
) -> PolarsResult<Option<Vec<usize>>> {
let mut projection_indices = Vec::with_capacity(projected_arrow_schema.len());
let mut is_full_ordered_projection = projected_arrow_schema.len() == schema.len();

for (i, field) in projected_arrow_schema.iter_values().enumerate() {
let dtype = {
let Some((idx, _, field)) = schema.get_full(&field.name) else {
polars_bail!(SchemaMismatch: "did not find column in file: {}", field.name)
};

projection_indices.push(idx);
is_full_ordered_projection &= idx == i;

DataType::from_arrow(&field.dtype, true)
};
let expected_dtype = DataType::from_arrow(&field.dtype, true);

if dtype.clone() != expected_dtype {
polars_bail!(SchemaMismatch: "data type mismatch for column {}: found: {}, expected: {}",
&field.name, dtype, expected_dtype
)
}
}

Ok((!is_full_ordered_projection).then_some(projection_indices))
}
28 changes: 0 additions & 28 deletions crates/polars-mem-engine/src/executors/scan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,34 +30,6 @@ pub(crate) use self::python_scan::*;
use super::*;
use crate::prelude::*;

#[cfg(any(feature = "ipc", feature = "parquet"))]
type Projection = Option<Vec<usize>>;
#[cfg(any(feature = "ipc", feature = "parquet"))]
type Predicate = Option<Arc<dyn PhysicalIoExpr>>;

#[cfg(any(feature = "ipc", feature = "parquet"))]
fn prepare_scan_args(
predicate: Option<Arc<dyn PhysicalExpr>>,
with_columns: &mut Option<Arc<[PlSmallStr]>>,
schema: &mut SchemaRef,
has_row_index: bool,
hive_partitions: Option<&[Series]>,
) -> (Projection, Predicate) {
let with_columns = mem::take(with_columns);
let schema = mem::take(schema);

let projection = materialize_projection(
with_columns.as_deref(),
&schema,
hive_partitions,
has_row_index,
);

let predicate = predicate.map(phys_expr_to_io_expr);

(projection, predicate)
}

/// Producer of an in memory DataFrame
pub struct DataFrameExec {
pub(crate) df: Arc<DataFrame>,
Expand Down
112 changes: 52 additions & 60 deletions crates/polars-mem-engine/src/executors/scan/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,17 @@ impl ParquetExec {
// Modified if we have a negative slice
let mut first_source = 0;

let first_schema = self.file_info.reader_schema.clone().unwrap().unwrap_left();

let projected_arrow_schema = {
if let Some(with_columns) = self.file_options.with_columns.as_deref() {
Some(Arc::new(first_schema.try_project(with_columns)?))
} else {
None
}
};
let predicate = self.predicate.clone().map(phys_expr_to_io_expr);

// (offset, end)
let (slice_offset, slice_end) = if let Some(slice) = self.file_options.slice {
if slice.0 >= 0 {
Expand Down Expand Up @@ -150,14 +161,6 @@ impl ParquetExec {
.as_ref()
.map(|x| x[i].materialize_partition_columns());

let (projection, predicate) = prepare_scan_args(
self.predicate.clone(),
&mut self.file_options.with_columns.clone(),
&mut self.file_info.schema.clone(),
base_row_index.is_some(),
hive_partitions.as_deref(),
);

let memslice = source.to_memslice()?;

let mut reader = ParquetReader::new(std::io::Cursor::new(memslice));
Expand All @@ -181,9 +184,7 @@ impl ParquetExec {
.map(|x| (x.clone(), Arc::from(source.to_include_path_name()))),
);

reader
.num_rows()
.map(|num_rows| (reader, num_rows, predicate, projection))
reader.num_rows().map(|num_rows| (reader, num_rows))
});

// We do this in parallel because wide tables can take a long time deserializing metadata.
Expand All @@ -192,7 +193,7 @@ impl ParquetExec {
let current_offset_ref = &mut current_offset;
let row_statistics = readers_and_metadata
.iter()
.map(|(_, num_rows, _, _)| {
.map(|(_, num_rows)| {
let cum_rows = *current_offset_ref;
(
cum_rows,
Expand All @@ -205,31 +206,24 @@ impl ParquetExec {
readers_and_metadata
.into_par_iter()
.zip(row_statistics.into_par_iter())
.map(
|((reader, _, predicate, projection), (cumulative_read, slice))| {
let row_index = base_row_index.as_ref().map(|rc| RowIndex {
name: rc.name.clone(),
offset: rc.offset + cumulative_read as IdxSize,
});

let df = reader
.with_slice(Some(slice))
.with_row_index(row_index)
.with_predicate(predicate.clone())
.with_projection(projection.clone())
.check_schema(
self.file_info
.reader_schema
.clone()
.unwrap()
.unwrap_left()
.as_ref(),
)?
.finish()?;

Ok(df)
},
)
.map(|((reader, _), (cumulative_read, slice))| {
let row_index = base_row_index.as_ref().map(|rc| RowIndex {
name: rc.name.clone(),
offset: rc.offset + cumulative_read as IdxSize,
});

let df = reader
.with_slice(Some(slice))
.with_row_index(row_index)
.with_predicate(predicate.clone())
.with_projected_arrow_schema(
first_schema.as_ref(),
projected_arrow_schema.as_deref(),
)?
.finish()?;

Ok(df)
})
.collect::<PolarsResult<Vec<_>>>()
})?;

Expand Down Expand Up @@ -261,6 +255,17 @@ impl ParquetExec {
eprintln!("POLARS PREFETCH_SIZE: {}", batch_size)
}

let first_schema = self.file_info.reader_schema.clone().unwrap().unwrap_left();

let projected_arrow_schema = {
if let Some(with_columns) = self.file_options.with_columns.as_deref() {
Some(Arc::new(first_schema.try_project(with_columns)?))
} else {
None
}
};
let predicate = self.predicate.clone().map(phys_expr_to_io_expr);

// Modified if we have a negative slice
let mut first_file_idx = 0;

Expand Down Expand Up @@ -384,12 +389,12 @@ impl ParquetExec {
.collect::<Vec<_>>();

// Now read the actual data.
let file_info = &self.file_info;
let file_options = &self.file_options;
let use_statistics = self.options.use_statistics;
let predicate = &self.predicate;
let base_row_index_ref = &base_row_index;
let include_file_paths = self.file_options.include_file_paths.as_ref();
let first_schema = first_schema.clone();
let projected_arrow_schema = projected_arrow_schema.clone();
let predicate = predicate.clone();

if verbose {
eprintln!("reading of {}/{} file...", processed, paths.len());
Expand All @@ -399,40 +404,27 @@ impl ParquetExec {
.into_iter()
.enumerate()
.map(|(i, (_, reader))| {
let first_schema = first_schema.clone();
let projected_arrow_schema = projected_arrow_schema.clone();
let predicate = predicate.clone();
let (cumulative_read, slice) = row_statistics[i];
let hive_partitions = hive_parts
.as_ref()
.map(|x| x[i].materialize_partition_columns());

let schema = self
.file_info
.reader_schema
.as_ref()
.unwrap()
.as_ref()
.unwrap_left()
.clone();

async move {
let file_info = file_info.clone();
let row_index = base_row_index_ref.as_ref().map(|rc| RowIndex {
name: rc.name.clone(),
offset: rc.offset + cumulative_read as IdxSize,
});

let (projection, predicate) = prepare_scan_args(
predicate.clone(),
&mut file_options.with_columns.clone(),
&mut file_info.schema.clone(),
row_index.is_some(),
hive_partitions.as_deref(),
);

let df = reader
.with_slice(Some(slice))
.with_row_index(row_index)
.with_projection(projection)
.check_schema(schema.as_ref())
.with_projected_arrow_schema(
first_schema.as_ref(),
projected_arrow_schema.as_deref(),
)
.await?
.use_statistics(use_statistics)
.with_predicate(predicate)
Expand Down
Loading