Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Data Files from Parquet Files to UnPartitioned Table #506

Merged
merged 12 commits into from
Mar 16, 2024

Conversation

sungwy
Copy link
Collaborator

@sungwy sungwy commented Mar 8, 2024

PyIceberg's version of add_files Spark migration procedure.

Some early ideas on its implementation:

  • instead of staying with the input interface for Spark's Procedure, we could just allow the users to pass a list of full file_paths instead. This approach allows users to submit explicit file_paths, instead of leaving Iceberg to make assumptions based on the file_paths which has led to issues in the Java implementation.
  • current implementation infers the partition values from the path and doesn't validate if the files themselves have the partition values. We could instead of statistics from the parquet metadata to check the min and max values of the partition columns (min and max should be the same for a partition column) and use that value to derive the partition record value instead. If the statistic is present, this would be more accurate than inferring the value through string match on the partition path. Without these checks, there's a possibility that files with wrong partition values in the manifest, versus in the parquet file will be added to the table.
  • only Identity Transforms are currently supported. This is because in order to construct the manifest entries for the data files from the partition path, we need to convert human string values to their respective internal partition representations that get encoded as the partition values in the avro files. This is challenging to do for the Transform partitions, since the we will need to create a reverse transformation of the human string to partition representation for every supported type of IcebergType+Transform pairs. Related issue in Java.

EDIT: Supporting addition of parquet files as data files to partitioned tables will be introduced in a separate PR. Options have been discussed in the comments on this PR, and we are breaking it up to make code reviews easier

@sungwy sungwy marked this pull request as draft March 8, 2024 03:29
@sungwy
Copy link
Collaborator Author

sungwy commented Mar 12, 2024

Updates from offline discussions:

  1. The task of creating the correct Iceberg Table Schema with the desired Partition Spec, from an external table (like Hive) is out of scope of this PR. Atomically creating a table and adding files will be supported through the combination of this PR, and CreateTableTransaction (WIP)
  2. We will replace file_path based partition inference with parquet metadata footer based partition inference. Currently we only support IdentityPartitions, and we can infer the partition values from the metadata footer's statistics. (upper and lower bounds should be equal). This will also allow us to create extend partition inference to numeric Transforms (YearTransform, etc) by applying the transforms on the lower and upper bounds.
  3. Overwrites are acknowledged as a valid modes of adding files. This is out of scope of this PR, and it can be supported atomically by deleting Expression values + adding files all within the same transaction block
  4. There are a lot of gotchas in ensuring transactional guarantees - we'll think through possible race conditions and try to put all updates into the same transaction block if possible

@sungwy
Copy link
Collaborator Author

sungwy commented Mar 13, 2024

We will replace file_path based partition inference with parquet metadata footer based partition inference. Currently we only support IdentityPartitions, and we can infer the partition values from the metadata footer's statistics. (upper and lower bounds should be equal). This will also allow us to create extend partition inference to numeric Transforms (YearTransform, etc) by applying the transforms on the lower and upper bounds.

I just realized that this approach won't work if we want to add files from HIVE tables, because HIVE style partitioning results in parquet files that do not actually have the partition data in them. The partition columns are inferred from the directory structure. But I think the suggested approach should be favored over file path inference if it is possible.

@Fokko , I'd love to get your opinion on the following:

  1. We will introduce two modes of add_files: Hive path partition inference, versus parquet metadata min/max based partition inference. The former mode doesn't care if the fields are missing in the data file and uses the partition values from the file_path by casting the String value to their respective data types. The metadata based approach requires that the partition data be in the parquet file, and complies with the Iceberg spec.
  2. To support Hive path partition inference mode, we will be suppressing the schema parity checks in fill_parquet_file_metadata function when we are using path based partition inference.

These two modes cover some of the options that were discussed in the initial discussion of the add_files migration procedure.

@Fokko
Copy link
Contributor

Fokko commented Mar 13, 2024

So both of the approaches have pro's and con's. One thing I would like to avoid is having to rely on Hive directly, this will make sure that we can generalize it to also import generic Parquet files.

One problematic thing is that with Iceberg hidden partitioning we actually have the source-id that points to the field where the data is being kept. If the Hive partitioning is just arbitrary, eg:

INSERT INTO transactions PARTITION (year = '2023') AS SELECT name, amount FROM some_other_table

In this case there is no relation between the partition and any column in the table. In Iceberg you would expect something like:

INSERT INTO transactions PARTITION (year = '2023') AS SELECT name, amount, created_at FROM some_other_table

Where the partitioning is year(created_at). If this column is not in there, I don't think we can import it into Iceberg because we cannot set the source-id of the partition spec.

I would also expect the user to pre-create the partition spec prior to the import, because inferring is tricky.

@sungwy
Copy link
Collaborator Author

sungwy commented Mar 13, 2024

So both of the approaches have pro's and con's. One thing I would like to avoid is having to rely on Hive directly, this will make sure that we can generalize it to also import generic Parquet files.

One problematic thing is that with Iceberg hidden partitioning we actually have the source-id that points to the field where the data is being kept. If the Hive partitioning is just arbitrary, eg:

INSERT INTO transactions PARTITION (year = '2023') AS SELECT name, amount FROM some_other_table

In this case there is no relation between the partition and any column in the table. In Iceberg you would expect something like:

INSERT INTO transactions PARTITION (year = '2023') AS SELECT name, amount, created_at FROM some_other_table

Where the partitioning is year(created_at). If this column is not in there, I don't think we can import it into Iceberg because we cannot set the source-id of the partition spec.

I would also expect the user to pre-create the partition spec prior to the import, because inferring is tricky.

Thank you for the context @Fokko . What I meant by partition inference is the act of inferring the partition values instead of the Partition Spec itself. So this function only runs after the Iceberg Table has been created with its expected PartitionSpec.

But because Hive tables have the partition values in the file paths instead of in the actual data files, I'm proposing that we have the two modes of partition value inference: one from the file paths, and the other based on the upper and lower bound values from the parquet metadata

@Fokko
Copy link
Contributor

Fokko commented Mar 14, 2024

@syun64 I'm all for it if it works, but I see a lot of issues with inferring it from the Hive path.

Copy link
Contributor

@Fokko Fokko left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking good @syun64. Could you also update the docs? We could also defer the partitioning into a separate PR, up to you 👍

Comment on lines 1160 to 1161
if any(not isinstance(field.transform, IdentityTransform) for field in self.metadata.spec().fields):
raise NotImplementedError("Cannot add_files to a table with Transform Partitions")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can be more permissive. It isn't a problem the table's current partitioning has something different than a IdentitiyTransform, the issue is that we cannot add DataFiles that use this partitioning (until we find a clever way of checking this).

if any(not isinstance(field.transform, IdentityTransform) for field in self.metadata.spec().fields):
raise NotImplementedError("Cannot add_files to a table with Transform Partitions")

if self.name_mapping() is None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Technically you don't have to add a name-mapping if the field-IDs are set

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Fokko Yeah I think you are right!

When field IDs are in the files, and the name_mapping is also present, the field_ids take precedence over the name_mapping in schema resolution. So the name_mapping here would essentially be meaningless in that case.

I'm on the fence between moving forward with your suggestion (create name_mapping if there are no field_ids) or whether we should always assert that the parquet files that we want to add have no field IDs. And that's because the field_ids that we actually use in our Iceberg generated parquet files, is the Iceberg Table's internal notion of field IDs. Whenever a new table gets created, new field IDs are assigned, and Iceberg keeps track of these field IDs internally to ensure that the same field can be treated the same through column renaming.

When we add_files, we are introducing files that have been produced by an external process to Iceberg, which isn't aware of Iceberg's internal fields metadata. In that sense, I feel that allowing files that have field_ids to be added could result in unexpected errors for the user that are difficult to diagnose.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My main concern is that the Parquet file and the mapping don't match. For example, there are more fields in the parquet file than in the mapping. I think it is good to add checks there.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added this check here @Fokko let me know if that makes sense to you

pyiceberg/table/__init__.py Outdated Show resolved Hide resolved
pyiceberg/table/__init__.py Show resolved Hide resolved
@sungwy
Copy link
Collaborator Author

sungwy commented Mar 14, 2024

@syun64 I'm all for it if it works, but I see a lot of issues with inferring it from the Hive path.

Yeah. I don't personally need migration procedures to add files from Hive tables, but I am aware of various teams and community members that want this sort of feature to migrate to Iceberg from Hive without having to rewrite all of their files.

I do think that partition inference from partition path is more complicated and has more gotchas that need to be discussed at length than the more accurate approach based on the partition metadata. I will pull that feature out and put together a follow up PR that only introduces file addition to partitioned tables using the lower and upper bounds of the partition column in the partition metadata.

@sungwy sungwy changed the title [WIP] Add Data Files from Parquet Files Add Data Files from Parquet Files to UnPartitioned Table Mar 14, 2024
@sungwy sungwy marked this pull request as ready for review March 14, 2024 17:53
@sungwy sungwy requested a review from Fokko March 14, 2024 17:53
Makefile Outdated
@@ -42,7 +42,7 @@ test-integration:
docker-compose -f dev/docker-compose-integration.yml up -d
sleep 10
docker-compose -f dev/docker-compose-integration.yml exec -T spark-iceberg ipython ./provision.py
poetry run pytest tests/ -v -m integration ${PYTEST_ARGS}
poetry run pytest tests/integration/test_add_files.py -v -m integration ${PYTEST_ARGS}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was committed by accident?

Suggested change
poetry run pytest tests/integration/test_add_files.py -v -m integration ${PYTEST_ARGS}
poetry run pytest tests/ -v -m integration ${PYTEST_ARGS}

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I always do 😅

if any(not isinstance(field.transform, IdentityTransform) for field in self.metadata.spec().fields):
raise NotImplementedError("Cannot add_files to a table with Transform Partitions")

if self.name_mapping() is None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My main concern is that the Parquet file and the mapping don't match. For example, there are more fields in the parquet file than in the mapping. I think it is good to add checks there.

pyiceberg/table/__init__.py Show resolved Hide resolved
df = spark.table(identifier)
assert df.count() == 6, "Expected 6 rows"
assert len(df.columns) == 4, "Expected 4 columns"
df.show()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this was for testing, can we remove this one? .show() is a spark action, meaning it will run the pipeline.

@Fokko
Copy link
Contributor

Fokko commented Mar 14, 2024

@syun64 Can you add this also to the docs? :)

Copy link
Contributor

@HonahX HonahX left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @syun64 ! Adding 2 quick comments

pyiceberg/table/__init__.py Outdated Show resolved Hide resolved
@@ -292,6 +292,39 @@ The nested lists indicate the different Arrow buffers, where the first write res

<!-- prettier-ignore-end -->

### Add Files

Expert Iceberg users may choose to commit existing parquet files to the Iceberg table as data files, without rewriting them.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we mention in the doc that this procedure currently only work for unpartitioned table?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe! We've already discussed the different approaches for supporting adds to partitioned tables extensively, so I'm optimistic we'll get it in before the next release. I'll put it up shortly after this is merged.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds great! Thanks!

Co-authored-by: Honah J. <undefined.newdb.newtable@gmail.com>
@sungwy
Copy link
Collaborator Author

sungwy commented Mar 15, 2024

Thank you for the reviews @Fokko and @HonahX . Could either of you help me merge it in as well?

Copy link
Contributor

@HonahX HonahX left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @syun64 for the great work and @Fokko for reviewing!

@HonahX HonahX merged commit 7f712fd into apache:main Mar 16, 2024
7 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants