diff --git a/poetry.lock b/poetry.lock index c3fe78920..f98504e51 100644 --- a/poetry.lock +++ b/poetry.lock @@ -4578,4 +4578,4 @@ zstandard = ["zstandard"] [metadata] lock-version = "2.0" python-versions = ">=3.8, <4.0, !=3.9.7" -content-hash = "377283aadd2c34c5cd2003281a820a3dc82ce529e46674c67c2fe84e2958cf8e" +content-hash = "20cefc9b5bbebb7580b7f15e9a016e3fa395ff495a2ad5dc03e2e89895a2b21d" diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index b99c3b170..7b9a35c94 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -1197,7 +1197,7 @@ def _task_to_record_batches( positional_deletes: Optional[List[ChunkedArray]], case_sensitive: bool, name_mapping: Optional[NameMapping] = None, - use_large_types: bool = True, + use_large_types: Optional[bool] = None, ) -> Iterator[pa.RecordBatch]: _, _, path = PyArrowFileIO.parse_location(task.file.file_path) arrow_format = ds.ParquetFileFormat(pre_buffer=True, buffer_size=(ONE_MEGABYTE * 8)) @@ -1220,15 +1220,16 @@ def _task_to_record_batches( if file_schema is None: raise ValueError(f"Missing Iceberg schema in Metadata for file: {path}") + projected_file_schema = None + if use_large_types is not None: + if use_large_types is True: + projected_file_schema = _pyarrow_schema_ensure_large_types(physical_schema) + else: + projected_file_schema = _pyarrow_schema_ensure_small_types(physical_schema) + fragment_scanner = ds.Scanner.from_fragment( fragment=fragment, - # With PyArrow 16.0.0 there is an issue with casting record-batches: - # https://github.com/apache/arrow/issues/41884 - # https://github.com/apache/arrow/issues/43183 - # Would be good to remove this later on - schema=_pyarrow_schema_ensure_large_types(physical_schema) - if use_large_types - else (_pyarrow_schema_ensure_small_types(physical_schema)), + schema=projected_file_schema, # This will push down the query to Arrow. # But in case there are positional deletes, we have to apply them first filter=pyarrow_filter if not positional_deletes else None, @@ -1246,14 +1247,9 @@ def _task_to_record_batches( batch = batch.take(indices) # Apply the user filter if pyarrow_filter is not None: - # we need to switch back and forth between RecordBatch and Table - # as Expression filter isn't yet supported in RecordBatch - # https://github.com/apache/arrow/issues/39220 - arrow_table = pa.Table.from_batches([batch]) - arrow_table = arrow_table.filter(pyarrow_filter) - if len(arrow_table) == 0: + batch = batch.filter(pyarrow_filter) + if len(batch) == 0: continue - batch = arrow_table.to_batches()[0] yield _to_requested_schema( projected_schema, file_project_schema, batch, downcast_ns_timestamp_to_us=True, use_large_types=use_large_types ) @@ -1268,7 +1264,7 @@ def _task_to_table( positional_deletes: Optional[List[ChunkedArray]], case_sensitive: bool, name_mapping: Optional[NameMapping] = None, - use_large_types: bool = True, + use_large_types: Optional[bool] = None, ) -> Optional[pa.Table]: batches = list( _task_to_record_batches( @@ -1348,7 +1344,9 @@ def project_table( # When FsSpec is not installed raise ValueError(f"Expected PyArrowFileIO or FsspecFileIO, got: {io}") from e - use_large_types = property_as_bool(io.properties, PYARROW_USE_LARGE_TYPES_ON_READ, True) + use_large_types = None + if PYARROW_USE_LARGE_TYPES_ON_READ in io.properties: + use_large_types = property_as_bool(io.properties, PYARROW_USE_LARGE_TYPES_ON_READ, True) bound_row_filter = bind(table_metadata.schema(), row_filter, case_sensitive=case_sensitive) diff --git a/pyproject.toml b/pyproject.toml index f3caba970..7dbcbcc9b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -61,7 +61,7 @@ fsspec = ">=2023.1.0,<2025.1.0" pyparsing = ">=3.1.0,<4.0.0" zstandard = ">=0.13.0,<1.0.0" tenacity = ">=8.2.3,<10.0.0" -pyarrow = { version = ">=9.0.0,<18.0.0", optional = true } +pyarrow = { version = ">=17.0.0,<18.0.0", optional = true } pandas = { version = ">=1.0.0,<3.0.0", optional = true } duckdb = { version = ">=0.5.0,<2.0.0", optional = true } ray = [