Skip to content

Commit

Permalink
Test again PyArrow 17.0.0
Browse files Browse the repository at this point in the history
  • Loading branch information
Fokko committed Jul 19, 2024
1 parent 4282d2f commit a396149
Show file tree
Hide file tree
Showing 5 changed files with 9 additions and 18 deletions.
12 changes: 1 addition & 11 deletions pyiceberg/io/pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -1050,11 +1050,6 @@ def _task_to_record_batches(

fragment_scanner = ds.Scanner.from_fragment(
fragment=fragment,
# With PyArrow 16.0.0 there is an issue with casting record-batches:
# https://github.com/apache/arrow/issues/41884
# https://github.com/apache/arrow/issues/43183
# Would be good to remove this later on
schema=_pyarrow_schema_ensure_large_types(physical_schema),
# This will push down the query to Arrow.
# But in case there are positional deletes, we have to apply them first
filter=pyarrow_filter if not positional_deletes else None,
Expand All @@ -1070,12 +1065,7 @@ def _task_to_record_batches(
batch = batch.take(indices)
# Apply the user filter
if pyarrow_filter is not None:
# we need to switch back and forth between RecordBatch and Table
# as Expression filter isn't yet supported in RecordBatch
# https://github.com/apache/arrow/issues/39220
arrow_table = pa.Table.from_batches([batch])
arrow_table = arrow_table.filter(pyarrow_filter)
batch = arrow_table.to_batches()[0]
batch = batch.filter(pyarrow_filter)
yield _to_requested_schema(projected_schema, file_project_schema, batch, downcast_ns_timestamp_to_us=True)
current_index += len(batch)

Expand Down
2 changes: 1 addition & 1 deletion pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2020,7 +2020,7 @@ def to_arrow_batch_reader(self) -> pa.RecordBatchReader:
case_sensitive=self.case_sensitive,
limit=self.limit,
),
)
).cast(target_schema=target_schema)

def to_pandas(self, **kwargs: Any) -> pd.DataFrame:
return self.to_arrow().to_pandas(**kwargs)
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ fsspec = ">=2023.1.0,<2025.1.0"
pyparsing = ">=3.1.0,<4.0.0"
zstandard = ">=0.13.0,<1.0.0"
tenacity = ">=8.2.3,<9.0.0"
pyarrow = { version = ">=9.0.0,<18.0.0", optional = true }
pyarrow = { version = ">=17.0.0,<18.0.0", optional = true }
pandas = { version = ">=1.0.0,<3.0.0", optional = true }
duckdb = { version = ">=0.5.0,<2.0.0", optional = true }
ray = { version = ">=2.0.0,<2.10.0", optional = true }
Expand Down
2 changes: 1 addition & 1 deletion tests/integration/test_add_files.py
Original file line number Diff line number Diff line change
Expand Up @@ -549,7 +549,7 @@ def test_add_files_with_large_and_regular_schema(spark: SparkSession, session_ca
tbl.add_files([file_path])

table_schema = tbl.scan().to_arrow().schema
assert table_schema == arrow_schema_large
assert table_schema == arrow_schema

file_path_large = f"s3://warehouse/default/unpartitioned_with_large_types/v{format_version}/test-1.parquet"
_write_parquet(
Expand Down
9 changes: 5 additions & 4 deletions tests/integration/test_writes/test_writes.py
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,8 @@ def test_python_writes_dictionary_encoded_column_with_spark_reads(
tbl.overwrite(arrow_table)
spark_df = spark.sql(f"SELECT * FROM {identifier}").toPandas()
pyiceberg_df = tbl.scan().to_pandas()
assert spark_df.equals(pyiceberg_df)
assert spark_df['id'].equals(pyiceberg_df['id'])
assert all(spark_df['name'].values == pyiceberg_df['name'].values)


@pytest.mark.integration
Expand Down Expand Up @@ -401,12 +402,12 @@ def test_python_writes_with_small_and_large_types_spark_reads(
assert arrow_table_on_read.schema == pa.schema([
pa.field("foo", pa.large_string()),
pa.field("id", pa.int32()),
pa.field("name", pa.large_string()),
pa.field("name", pa.string()),
pa.field(
"address",
pa.struct([
pa.field("street", pa.large_string()),
pa.field("city", pa.large_string()),
pa.field("street", pa.string()),
pa.field("city", pa.string()),
pa.field("zip", pa.int32()),
pa.field("bar", pa.large_string()),
]),
Expand Down

0 comments on commit a396149

Please sign in to comment.