Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pyarrow IO property for configuring large v small types on read #986

Merged
merged 11 commits into from
Aug 7, 2024

Conversation

sungwy
Copy link
Collaborator

@sungwy sungwy commented Jul 31, 2024

This addresses the issue discussed in the formal proposal discussed in the Google Doc.

The current behavior to always cast to large types results in RSS memory usage explosion as is highlighted in the benchmark discussed in the documentation.

@sungwy sungwy requested a review from Fokko July 31, 2024 18:43
@sungwy
Copy link
Collaborator Author

sungwy commented Jul 31, 2024

Once approved/merged, I'd like to bring this up on the discussion thread to add this item to 0.7.1 patch release as well. It's a small feature, and it would help with alleviate the memory issues we are running into (I expect other users would as well)

Copy link
Contributor

@kevinjqliu kevinjqliu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for working on this, I left a few comments!

@@ -80,6 +80,7 @@
GCS_ENDPOINT = "gcs.endpoint"
GCS_DEFAULT_LOCATION = "gcs.default-bucket-location"
GCS_VERSION_AWARE = "gcs.version-aware"
PYARROW_USE_LARGE_TYPES_ON_READ = "pyarrow.use-large-types-on-read"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: if this is a pyarrow specific setting, lets move it to the pyarrow file

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought of that, but decided to leave it here because I liked having the FileIO properties together in one place. WDYT?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pyarrow is one of the FileIO implementations and this setting is specifically for Pyarrow. In the future, when we add more FileIO implementations, such as the rust one, it'll be good to have a clear separation between the FileIO settings.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it also makes more sense to move this inside of the Arrow file.

Copy link
Collaborator Author

@sungwy sungwy Aug 7, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @Fokko and @kevinjqliu - I'll keep this in mind the next time I touch these files 🙂

pyiceberg/io/pyarrow.py Outdated Show resolved Hide resolved
pyiceberg/io/pyarrow.py Outdated Show resolved Hide resolved
@@ -1146,6 +1152,31 @@ def primitive(self, primitive: pa.DataType) -> pa.DataType:
return primitive


class _ConvertToSmallTypes(PyArrowSchemaVisitor[Union[pa.DataType, pa.Schema]]):
def schema(self, schema: pa.Schema, struct_result: pa.StructType) -> pa.Schema:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: looks like this is the same function definition as the one in _ConvertToLargeTypes, as with other functions here.
Perhaps abstract into a common class and extend/override specific functions.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought of that, but I didn't like naming one as _ConvertToLargeTypes, and then having an arg like reverse: bool

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking something with inheritance like:
_ConvertToArrowTypes
_ConvertToLargeTypes(_ConvertToArrowTypes)
_ConvertToSmallTypes(_ConvertToArrowTypes)

tests/integration/test_reads.py Outdated Show resolved Hide resolved
tests/integration/test_reads.py Outdated Show resolved Hide resolved
tests/integration/test_reads.py Outdated Show resolved Hide resolved
@@ -596,6 +597,11 @@ def test_pyarrow_schema_ensure_large_types(pyarrow_schema_nested_without_ids: pa
assert _pyarrow_schema_ensure_large_types(pyarrow_schema_nested_without_ids) == expected_schema


def test_pyarrow_schema_ensure_small_types(pyarrow_schema_nested_without_ids: pa.Schema) -> None:
schema_with_large_types = _pyarrow_schema_ensure_small_types(pyarrow_schema_nested_without_ids)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: name is large_type, function is small_type

what is this function testing for?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was for testing the roundtrip conversion - fixed it to use the correct function

Copy link
Collaborator Author

@sungwy sungwy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the review feedback @kevinjqliu ! Adopted most of the feedback and left some comments for the others.

pyiceberg/io/pyarrow.py Outdated Show resolved Hide resolved
@@ -80,6 +80,7 @@
GCS_ENDPOINT = "gcs.endpoint"
GCS_DEFAULT_LOCATION = "gcs.default-bucket-location"
GCS_VERSION_AWARE = "gcs.version-aware"
PYARROW_USE_LARGE_TYPES_ON_READ = "pyarrow.use-large-types-on-read"
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought of that, but decided to leave it here because I liked having the FileIO properties together in one place. WDYT?

Copy link
Contributor

@kevinjqliu kevinjqliu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

@@ -1146,6 +1152,31 @@ def primitive(self, primitive: pa.DataType) -> pa.DataType:
return primitive


class _ConvertToSmallTypes(PyArrowSchemaVisitor[Union[pa.DataType, pa.Schema]]):
def schema(self, schema: pa.Schema, struct_result: pa.StructType) -> pa.Schema:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking something with inheritance like:
_ConvertToArrowTypes
_ConvertToLargeTypes(_ConvertToArrowTypes)
_ConvertToSmallTypes(_ConvertToArrowTypes)

@sungwy
Copy link
Collaborator Author

sungwy commented Aug 2, 2024

Thanks for the review @kevinjqliu ! Just updated it to make use of @ndrluis 's cleaned up function property_as_bool

Copy link
Contributor

@kevinjqliu kevinjqliu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

mkdocs/docs/configuration.md Outdated Show resolved Hide resolved
mkdocs/docs/configuration.md Outdated Show resolved Hide resolved
Copy link
Contributor

@kevinjqliu kevinjqliu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

@HonahX
Copy link
Contributor

HonahX commented Aug 5, 2024

@sungwy Thanks for working on this!

It seems we also need to update schema_to_pyarrow/_cast_if_needed to honor the new property. Otherwise

def _cast_if_needed(self, field: NestedField, values: pa.Array) -> pa.Array:
file_field = self._file_schema.find_field(field.field_id)
if field.field_type.is_primitive:
if field.field_type != file_field.field_type:
return values.cast(
schema_to_pyarrow(promote(file_field.field_type, field.field_type), include_field_ids=self._include_field_ids)
)

If we have type promotion from string to binary, the schema_to_parrow will convert BinaryType() to pa.large_binary

Example to reproduce:

@pytest.mark.integration
@pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive")])
def test_table_scan_override_with_small_types(catalog: Catalog) -> None:
    identifier = "default.test_table_scan_override_with_small_types"
    arrow_table = pa.Table.from_arrays(
        [pa.array(["a", "b", "c"]), pa.array([b"a", b"b", b"c"]), pa.array([["a", "b"], ["c", "d"], ["e", "f"]])],
        names=["string", "binary", "list"],
    )
    try:
        catalog.drop_table(identifier)
    except NoSuchTableError:
        pass

    tbl = catalog.create_table(
        identifier,
        schema=arrow_table.schema,
    )

    tbl.append(arrow_table)

    with tbl.update_schema() as update_schema:
        update_schema.update_column("string", BinaryType())

    tbl.io.properties[PYARROW_USE_LARGE_TYPES_ON_READ] = "False"
    result_table = tbl.scan().to_arrow()

    expected_schema = pa.schema([
        pa.field("string", pa.large_binary()), # should be pa.binary()
        pa.field("binary", pa.binary()),
        pa.field("list", pa.list_(pa.string())),
    ])
    assert result_table.schema.equals(expected_schema)

##### result_table.schema #####
string: large_binary
binary: binary
list: list<element: string>
  child 0, element: string

@sungwy
Copy link
Collaborator Author

sungwy commented Aug 5, 2024

@sungwy Thanks for working on this!

It seems we also need to update schema_to_pyarrow/_cast_if_needed to honor the new property.

Thanks @HonahX ! I've updated the code in order to accommodate this edge case.

@sungwy sungwy added this to the PyIceberg 0.8.0 release milestone Aug 6, 2024
@@ -80,6 +80,7 @@
GCS_ENDPOINT = "gcs.endpoint"
GCS_DEFAULT_LOCATION = "gcs.default-bucket-location"
GCS_VERSION_AWARE = "gcs.version-aware"
PYARROW_USE_LARGE_TYPES_ON_READ = "pyarrow.use-large-types-on-read"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it also makes more sense to move this inside of the Arrow file.

@@ -1303,6 +1345,8 @@ def project_table(
# When FsSpec is not installed
raise ValueError(f"Expected PyArrowFileIO or FsspecFileIO, got: {io}") from e

use_large_types = property_as_bool(io.properties, PYARROW_USE_LARGE_TYPES_ON_READ, True)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the only part I wouldn't say I like where we now force the table to use large or normal tables. When we read record batches I agree that we need to force the schema, but for the table, we have to read all the footers anyway.

Once #929 goes in, I think we still need to change that, but let's defer that question for now.

@Fokko Fokko merged commit 8aeab49 into apache:main Aug 7, 2024
7 checks passed
@sungwy sungwy deleted the small-types-option branch August 7, 2024 12:39
@fusion2222
Copy link

Does anyone know when this will be released?

@sungwy
Copy link
Collaborator Author

sungwy commented Aug 12, 2024

Hi @fusion2222 - This will be released with 0.8.0, which will be a few months away (roughly 1~3 months)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants