Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cannot append to REPEATED field when using client.load_table_from_file with parquet file #2008

Open
henryharbeck opened this issue Sep 2, 2024 · 1 comment
Assignees
Labels
api: bigquery Issues related to the googleapis/python-bigquery API.

Comments

@henryharbeck
Copy link

Environment details

  • OS type and version: Ubuntu 20.04 (WSL)
  • Python version: 3.11.8
  • pip version: 24.2
  • google-cloud-bigquery version: 3.25.0

Steps to reproduce

  1. Create a table that has a single field with mode=REPEATED
  2. Attempt to append to the table using client.load_table_from_file with a parquet file written from memory to a BytesIO buffer
    - If no schema is provided to bigquery.LoadJobConfig, the operation fails
    - If the table schema is provided to bigquery.LoadJobConfig, the operation does not raise, but instead incorrectly inserts empty arrays into the table

Issue details

I am unable to use client.load_table_from_file with a parquet file to append to an existing table with a REPEATED field.

This issue is somewhat similar to #1981, except related to REPEATED fields rather than REQUIRED fields.

Code example

Apologies, in advance that the example is a bit long.

It demonstrates Parquet files written to BytesIO buffers from both Polars and PyArrow unable to be written to a BigQuery table with mode=REPEATED.

Code example
from io import BytesIO

import pyarrow as pa
import pyarrow.parquet as pq
from google.cloud import bigquery

import polars as pl

PROJECT = "<project>"


def create_and_return_table(table_name: str, client: bigquery.Client) -> bigquery.Table:
    schema = [bigquery.SchemaField("foo", "INTEGER", mode="REPEATED")]
    table = bigquery.Table(f"{PROJECT}.testing.{table_name}", schema=schema)

    client.delete_table(table, not_found_ok=True)
    return client.create_table(table)


def polars_way(table: bigquery.Table, client: bigquery.Client):
    df = pl.DataFrame({"foo": [[1, 2], [3, 4]]})

    with BytesIO() as stream:
        df.write_parquet(stream)

        job_config = bigquery.LoadJobConfig(
            source_format=bigquery.SourceFormat.PARQUET,
            # Default option, but make it explicit
            write_disposition=bigquery.WriteDisposition.WRITE_APPEND,
            # If the schema is provided, the operation succeeds, but the data is not
            # correctly inserted. Empty lists are inserted instead.
            # schema=table.schema,
        )

        job = client.load_table_from_file(
            stream,
            destination=table,
            rewind=True,
            job_config=job_config,
        )

    job.result()


def pyarrow_way(table: bigquery.Table, client: bigquery.Client):
    pyarrow_schema = pa.schema([pa.field("foo", pa.large_list(pa.int64()))])
    pyarrow_table = pa.Table.from_pydict(
        {"foo": [[1, 2], [3, 4]]}, schema=pyarrow_schema
    )

    with BytesIO() as stream:
        writer = pq.ParquetWriter(stream, pyarrow_schema)
        writer.write(pyarrow_table)
        writer.close()

        job_config = bigquery.LoadJobConfig(
            source_format=bigquery.SourceFormat.PARQUET,
            # Default option, but make it explicit
            write_disposition=bigquery.WriteDisposition.WRITE_APPEND,
            # If the schema is provided, the operation succeeds, but the data is not
            # correctly inserted. Empty lists are inserted instead.
            # schema=table.schema,
        )
        job = client.load_table_from_file(
            stream,
            destination=table,
            rewind=True,
            job_config=job_config,
        )

    job.result()


def main():
    client = bigquery.Client()
    table = create_and_return_table("test_pl", client)
    polars_way(table, client)
    table = create_and_return_table("test_pa", client)
    pyarrow_way(table, client)

    # Both "ways" raise the below error

    # google.api_core.exceptions.BadRequest: 400 Provided Schema does not match Table
    # project:dataset.table. Field foo has changed type from INTEGER to RECORD; reason:
    # invalid, message: Provided Schema does not match Table project:dataset.table. Field
    # foo has changed type from INTEGER to RECORD

    # Unless the table schema is provided, in which case the operation succeeds, but the
    # data is inserted as empty arrays


if __name__ == "__main__":
    main()

Stack trace

Both the polars_way and the pyarrow_way raise with the error. Here they both are.

# polars_way
Traceback (most recent call last):
  File "/home/henry/development/polars_bq/combined.py", line 93, in <module>
    main()
  File "/home/henry/development/polars_bq/combined.py", line 77, in main
    polars_way(table, client)
  File "/home/henry/development/polars_bq/combined.py", line 42, in polars_way
    job.result()
  File "/home/henry/development/polars_bq/.venv/lib/python3.11/site-packages/google/cloud/bigquery/job/base.py", line 966, in result
    return super(_AsyncJob, self).result(timeout=timeout, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/henry/development/polars_bq/.venv/lib/python3.11/site-packages/google/api_core/future/polling.py", line 261, in result
    raise self._exception
google.api_core.exceptions.BadRequest: 400 Provided Schema does not match Table <project>:testing.test_pl. Field foo has changed type from INTEGER to RECORD; reason: invalid, message: Provided Schema does not match Table <project>:testing.test_pl. Field foo has changed type from INTEGER to RECORD

# pyarrow_way
Traceback (most recent call last):
  File "/home/henry/development/polars_bq/combined.py", line 93, in <module>
    main()
  File "/home/henry/development/polars_bq/combined.py", line 79, in main
    pyarrow_way(table, client)
  File "/home/henry/development/polars_bq/combined.py", line 71, in pyarrow_way
    job.result()
  File "/home/henry/development/polars_bq/.venv/lib/python3.11/site-packages/google/cloud/bigquery/job/base.py", line 966, in result
    return super(_AsyncJob, self).result(timeout=timeout, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/henry/development/polars_bq/.venv/lib/python3.11/site-packages/google/api_core/future/polling.py", line 261, in result
    raise self._exception
google.api_core.exceptions.BadRequest: 400 Provided Schema does not match Table <project>:testing.test_pa. Field foo has changed type from INTEGER to RECORD; reason: invalid, message: Provided Schema does not match Table <project>:testing.test_pa. Field foo has changed type from INTEGER to RECORD
@product-auto-label product-auto-label bot added the api: bigquery Issues related to the googleapis/python-bigquery API. label Sep 2, 2024
@tswast
Copy link
Contributor

tswast commented Sep 6, 2024

Since you are using load_table_from_file directly, there's a couple of things to watch out for:

My teammate, Micah, commented on a similar Arrow + BigQuery issue with what might be going on.

For (1) He mentions a feature in BigQuery "enable_list_inference". We do this automatically in the "load pandas DataFrame into BigQuery" code path:

parquet_options.enable_list_inference = True
).

We also enable compliant nested types in that code path:

{"use_compliant_nested_type": True}

For parquet files from polars, it'd be good to double check that the parquet schema produced are compatible with BigQuery via "compliant nested types".

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api: bigquery Issues related to the googleapis/python-bigquery API.
Projects
None yet
Development

No branches or pull requests

4 participants