Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Data Files from Parquet Files to UnPartitioned Table #506

Merged
merged 12 commits into from
Mar 16, 2024
33 changes: 33 additions & 0 deletions mkdocs/docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,39 @@ The nested lists indicate the different Arrow buffers, where the first write res

<!-- prettier-ignore-end -->

### Add Files

Expert Iceberg users may choose to commit existing parquet files to the Iceberg table as data files, without rewriting them.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we mention in the doc that this procedure currently only work for unpartitioned table?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe! We've already discussed the different approaches for supporting adds to partitioned tables extensively, so I'm optimistic we'll get it in before the next release. I'll put it up shortly after this is merged.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds great! Thanks!


```
# Given that these parquet files have schema consistent with the Iceberg table

file_paths = [
"s3a://warehouse/default/existing-1.parquet",
"s3a://warehouse/default/existing-2.parquet",
]

# They can be added to the table without rewriting them

tbl.add_files(file_paths=file_paths)

# A new snapshot is committed to the table with manifests pointing to the existing parquet files
```

<!-- prettier-ignore-start -->

!!! note "Name Mapping"
Because `add_files` uses existing files without writing new parquet files that are aware of the Iceberg's schema, it requires the Iceberg's table to have a [Name Mapping](https://iceberg.apache.org/spec/?h=name+mapping#name-mapping-serialization) (The Name mapping maps the field names within the parquet files to the Iceberg field IDs). Hence, `add_files` requires that there are no field IDs in the parquet file's metadata, and creates a new Name Mapping based on the table's current schema if the table doesn't already have one.

<!-- prettier-ignore-end -->

<!-- prettier-ignore-start -->

!!! warning "Maintenance Operations"
Because `add_files` commits the existing parquet files to the Iceberg Table as any other data file, destructive maintenance operations like expiring snapshots will remove them.

<!-- prettier-ignore-end -->

## Schema evolution

PyIceberg supports full schema evolution through the Python API. It takes care of setting the field-IDs and makes sure that only non-breaking changes are done (can be overriden).
Expand Down
35 changes: 34 additions & 1 deletion pyiceberg/io/pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@
visit,
visit_with_partner,
)
from pyiceberg.table import PropertyUtil, TableProperties, WriteTask
from pyiceberg.table import AddFileTask, PropertyUtil, TableProperties, WriteTask
from pyiceberg.table.metadata import TableMetadata
from pyiceberg.table.name_mapping import NameMapping
from pyiceberg.transforms import TruncateTransform
Expand Down Expand Up @@ -1772,6 +1772,39 @@ def write_file(io: FileIO, table_metadata: TableMetadata, tasks: Iterator[WriteT
return iter([data_file])


def parquet_files_to_data_files(io: FileIO, table_metadata: TableMetadata, tasks: Iterator[AddFileTask]) -> Iterator[DataFile]:
for task in tasks:
input_file = io.new_input(task.file_path)
with input_file.open() as input_stream:
parquet_metadata = pq.read_metadata(input_stream)

if visit_pyarrow(parquet_metadata.schema.to_arrow_schema(), _HasIds()):
raise NotImplementedError(
f"Cannot add file {task.file_path} because it has field IDs. `add_files` only supports addition of files without field_ids"
)

schema = table_metadata.schema()
data_file = DataFile(
content=DataFileContent.DATA,
file_path=task.file_path,
file_format=FileFormat.PARQUET,
partition=task.partition_field_value,
record_count=parquet_metadata.num_rows,
file_size_in_bytes=len(input_file),
sort_order_id=None,
spec_id=table_metadata.default_spec_id,
equality_ids=None,
key_metadata=None,
)
fill_parquet_file_metadata(
data_file=data_file,
parquet_metadata=parquet_metadata,
stats_columns=compute_statistics_plan(schema, table_metadata.properties),
parquet_column_mapping=parquet_path_to_id_mapping(schema),
)
yield data_file


ICEBERG_UNCOMPRESSED_CODEC = "uncompressed"
PYARROW_UNCOMPRESSED_CODEC = "none"

Expand Down
52 changes: 52 additions & 0 deletions pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
Dict,
Generic,
Iterable,
Iterator,
List,
Literal,
Optional,
Expand Down Expand Up @@ -115,6 +116,7 @@
Identifier,
KeyDefaultDict,
Properties,
Record,
)
from pyiceberg.types import (
IcebergType,
Expand Down Expand Up @@ -1147,6 +1149,27 @@ def overwrite(self, df: pa.Table, overwrite_filter: BooleanExpression = ALWAYS_T
for data_file in data_files:
update_snapshot.append_data_file(data_file)

def add_files(self, file_paths: List[str]) -> None:
"""
Shorthand API for adding files as data files to the table.

Args:
file_paths: The list of full file paths to be added as data files to the table

Raises:
FileNotFoundError: If the file does not exist.
"""
sungwy marked this conversation as resolved.
Show resolved Hide resolved
if len(self.spec().fields) > 0:
raise ValueError("Cannot add files to partitioned tables")

with self.transaction() as tx:
Fokko marked this conversation as resolved.
Show resolved Hide resolved
if self.name_mapping() is None:
tx.set_properties(**{TableProperties.DEFAULT_NAME_MAPPING: self.schema().name_mapping.model_dump_json()})
with tx.update_snapshot().fast_append() as update_snapshot:
data_files = _parquet_files_to_data_files(table_metadata=self.metadata, file_paths=file_paths, io=self.io)
for data_file in data_files:
update_snapshot.append_data_file(data_file)

def update_spec(self, case_sensitive: bool = True) -> UpdateSpec:
return UpdateSpec(Transaction(self, autocommit=True), case_sensitive=case_sensitive)

Expand Down Expand Up @@ -2444,6 +2467,12 @@ def generate_data_file_filename(self, extension: str) -> str:
return f"00000-{self.task_id}-{self.write_uuid}.{extension}"


@dataclass(frozen=True)
class AddFileTask:
file_path: str
partition_field_value: Record


def _new_manifest_path(location: str, num: int, commit_uuid: uuid.UUID) -> str:
return f'{location}/metadata/{commit_uuid}-m{num}.avro'

Expand Down Expand Up @@ -2475,6 +2504,29 @@ def _dataframe_to_data_files(
yield from write_file(io=io, table_metadata=table_metadata, tasks=iter([WriteTask(write_uuid, next(counter), df)]))


def add_file_tasks_from_file_paths(file_paths: List[str], table_metadata: TableMetadata) -> Iterator[AddFileTask]:
if len([spec for spec in table_metadata.partition_specs if spec.spec_id != 0]) > 0:
raise ValueError("Cannot add files to partitioned tables")

for file_path in file_paths:
yield AddFileTask(
file_path=file_path,
partition_field_value=Record(),
)


def _parquet_files_to_data_files(table_metadata: TableMetadata, file_paths: List[str], io: FileIO) -> Iterable[DataFile]:
"""Convert a list files into DataFiles.

Returns:
An iterable that supplies DataFiles that describe the parquet files.
"""
from pyiceberg.io.pyarrow import parquet_files_to_data_files

tasks = add_file_tasks_from_file_paths(file_paths, table_metadata)
yield from parquet_files_to_data_files(io=io, table_metadata=table_metadata, tasks=tasks)


class _MergingSnapshotProducer(UpdateTableMetadata["_MergingSnapshotProducer"]):
commit_uuid: uuid.UUID
_operation: Operation
Expand Down
Loading