Skip to content

Commit

Permalink
Move to PyArrow 17
Browse files Browse the repository at this point in the history
  • Loading branch information
Fokko committed Aug 12, 2024
1 parent d8b5c17 commit 921cd84
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 18 deletions.
32 changes: 15 additions & 17 deletions pyiceberg/io/pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -1197,7 +1197,7 @@ def _task_to_record_batches(
positional_deletes: Optional[List[ChunkedArray]],
case_sensitive: bool,
name_mapping: Optional[NameMapping] = None,
use_large_types: bool = True,
use_large_types: Optional[bool] = None,
) -> Iterator[pa.RecordBatch]:
_, _, path = PyArrowFileIO.parse_location(task.file.file_path)
arrow_format = ds.ParquetFileFormat(pre_buffer=True, buffer_size=(ONE_MEGABYTE * 8))
Expand All @@ -1220,15 +1220,16 @@ def _task_to_record_batches(
if file_schema is None:
raise ValueError(f"Missing Iceberg schema in Metadata for file: {path}")

projected_file_schema = None
if use_large_types is not None:
if use_large_types is True:
projected_file_schema = _pyarrow_schema_ensure_large_types(physical_schema)
else:
projected_file_schema = _pyarrow_schema_ensure_small_types(physical_schema)

fragment_scanner = ds.Scanner.from_fragment(
fragment=fragment,
# With PyArrow 16.0.0 there is an issue with casting record-batches:
# https://github.com/apache/arrow/issues/41884
# https://github.com/apache/arrow/issues/43183
# Would be good to remove this later on
schema=_pyarrow_schema_ensure_large_types(physical_schema)
if use_large_types
else (_pyarrow_schema_ensure_small_types(physical_schema)),
schema=projected_file_schema,
# This will push down the query to Arrow.
# But in case there are positional deletes, we have to apply them first
filter=pyarrow_filter if not positional_deletes else None,
Expand All @@ -1246,14 +1247,9 @@ def _task_to_record_batches(
batch = batch.take(indices)
# Apply the user filter
if pyarrow_filter is not None:
# we need to switch back and forth between RecordBatch and Table
# as Expression filter isn't yet supported in RecordBatch
# https://github.com/apache/arrow/issues/39220
arrow_table = pa.Table.from_batches([batch])
arrow_table = arrow_table.filter(pyarrow_filter)
if len(arrow_table) == 0:
batch = batch.filter(pyarrow_filter)
if len(batch) == 0:
continue
batch = arrow_table.to_batches()[0]
yield _to_requested_schema(
projected_schema, file_project_schema, batch, downcast_ns_timestamp_to_us=True, use_large_types=use_large_types
)
Expand All @@ -1268,7 +1264,7 @@ def _task_to_table(
positional_deletes: Optional[List[ChunkedArray]],
case_sensitive: bool,
name_mapping: Optional[NameMapping] = None,
use_large_types: bool = True,
use_large_types: Optional[bool] = None,
) -> Optional[pa.Table]:
batches = list(
_task_to_record_batches(
Expand Down Expand Up @@ -1348,7 +1344,9 @@ def project_table(
# When FsSpec is not installed
raise ValueError(f"Expected PyArrowFileIO or FsspecFileIO, got: {io}") from e

use_large_types = property_as_bool(io.properties, PYARROW_USE_LARGE_TYPES_ON_READ, True)
use_large_types = None
if PYARROW_USE_LARGE_TYPES_ON_READ in io.properties:
use_large_types = property_as_bool(io.properties, PYARROW_USE_LARGE_TYPES_ON_READ, True)

bound_row_filter = bind(table_metadata.schema(), row_filter, case_sensitive=case_sensitive)

Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ fsspec = ">=2023.1.0,<2025.1.0"
pyparsing = ">=3.1.0,<4.0.0"
zstandard = ">=0.13.0,<1.0.0"
tenacity = ">=8.2.3,<10.0.0"
pyarrow = { version = ">=9.0.0,<18.0.0", optional = true }
pyarrow = { version = ">=17.0.0,<18.0.0", optional = true }
pandas = { version = ">=1.0.0,<3.0.0", optional = true }
duckdb = { version = ">=0.5.0,<2.0.0", optional = true }
ray = [
Expand Down

0 comments on commit 921cd84

Please sign in to comment.