Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix tracing existing entries when there are deletes #1046

Merged
merged 1 commit into from
Aug 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 6 additions & 7 deletions pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@
Reference,
)
from pyiceberg.expressions.visitors import (
ROWS_CANNOT_MATCH,
ROWS_MIGHT_NOT_MATCH,
ROWS_MUST_MATCH,
_InclusiveMetricsEvaluator,
_StrictMetricsEvaluator,
Expand Down Expand Up @@ -3379,13 +3379,14 @@ def _copy_with_new_status(entry: ManifestEntry, status: ManifestEntryStatus) ->
existing_entries = []
for entry in manifest_file.fetch_manifest_entry(io=self._io, discard_deleted=True):
if strict_metrics_evaluator(entry.data_file) == ROWS_MUST_MATCH:
# Based on the metadata, it can be dropped right away
deleted_entries.append(_copy_with_new_status(entry, ManifestEntryStatus.DELETED))
self._deleted_data_files.add(entry.data_file)
elif inclusive_metrics_evaluator(entry.data_file) == ROWS_CANNOT_MATCH:
existing_entries.append(_copy_with_new_status(entry, ManifestEntryStatus.EXISTING))
else:
# Based on the metadata, it is unsure to say if the file can be deleted
partial_rewrites_needed = True
# Based on the metadata, we cannot determine if it can be deleted
existing_entries.append(_copy_with_new_status(entry, ManifestEntryStatus.EXISTING))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, so even when we are rewriting the data partially, we still need to add the new manifestentries as "existing" entries in order to track the new data files that are re-written.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, these files are unaffected by the delete and should be kept in the manifest as an existing entry. I should have tested more extensively 😱

if inclusive_metrics_evaluator(entry.data_file) != ROWS_MIGHT_NOT_MATCH:
partial_rewrites_needed = True

if len(deleted_entries) > 0:
total_deleted_entries += deleted_entries
Expand All @@ -3402,8 +3403,6 @@ def _copy_with_new_status(entry: ManifestEntry, status: ManifestEntryStatus) ->
for existing_entry in existing_entries:
writer.add_entry(existing_entry)
existing_manifests.append(writer.to_manifest_file())
# else:
# deleted_manifests.append()
else:
existing_manifests.append(manifest_file)
else:
Expand Down
82 changes: 79 additions & 3 deletions tests/integration/test_writes/test_writes.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from typing import Any, Dict
from urllib.parse import urlparse

import numpy as np
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
Expand All @@ -38,13 +39,20 @@
from pyiceberg.catalog.rest import RestCatalog
from pyiceberg.catalog.sql import SqlCatalog
from pyiceberg.exceptions import NoSuchTableError
from pyiceberg.expressions import In
from pyiceberg.expressions import GreaterThanOrEqual, In, Not
from pyiceberg.io.pyarrow import _dataframe_to_data_files
from pyiceberg.partitioning import PartitionField, PartitionSpec
from pyiceberg.schema import Schema
from pyiceberg.table import TableProperties
from pyiceberg.transforms import IdentityTransform
from pyiceberg.types import IntegerType, LongType, NestedField, StringType
from pyiceberg.transforms import DayTransform, IdentityTransform
from pyiceberg.types import (
DateType,
DoubleType,
IntegerType,
LongType,
NestedField,
StringType,
)
from utils import _create_table


Expand Down Expand Up @@ -1333,3 +1341,71 @@ def test_overwrite_all_data_with_filter(session_catalog: Catalog) -> None:
tbl.overwrite(data, In("id", ["1", "2", "3"]))

assert len(tbl.scan().to_arrow()) == 3


@pytest.mark.integration
def test_delete_threshold() -> None:
catalog = load_catalog(
"local",
**{
"type": "rest",
"uri": "http://localhost:8181",
"s3.endpoint": "http://localhost:9000",
"s3.access-key-id": "admin",
"s3.secret-access-key": "password",
},
)

schema = Schema(
NestedField(field_id=101, name="id", field_type=LongType(), required=True),
NestedField(field_id=103, name="created_at", field_type=DateType(), required=False),
NestedField(field_id=104, name="relevancy_score", field_type=DoubleType(), required=False),
)

partition_spec = PartitionSpec(PartitionField(source_id=103, field_id=2000, transform=DayTransform(), name="created_at_day"))

try:
catalog.drop_table(
identifier="default.scores",
)
except NoSuchTableError:
pass

catalog.create_table(
identifier="default.scores",
schema=schema,
partition_spec=partition_spec,
)

# Parameters
num_rows = 100 # Number of rows in the dataframe
id_min, id_max = 1, 10000
date_start, date_end = date(2024, 1, 1), date(2024, 2, 1)

# Generate the 'id' column
id_column = np.random.randint(id_min, id_max, num_rows)

# Generate the 'created_at' column as dates only
date_range = pd.date_range(start=date_start, end=date_end, freq="D") # Daily frequency for dates
created_at_column = np.random.choice(date_range, num_rows) # Convert to string (YYYY-MM-DD format)

# Generate the 'relevancy_score' column with a peak around 0.1
relevancy_score_column = np.random.beta(a=2, b=20, size=num_rows) # Adjusting parameters to peak around 0.1

# Create the dataframe
df = pd.DataFrame({"id": id_column, "created_at": created_at_column, "relevancy_score": relevancy_score_column})

iceberg_table = catalog.load_table("default.scores")

# Convert the pandas DataFrame to a PyArrow Table with the Iceberg schema
arrow_schema = iceberg_table.schema().as_arrow()
docs_table = pa.Table.from_pandas(df, schema=arrow_schema)

# Append the data to the Iceberg table
iceberg_table.append(docs_table)

delete_condition = GreaterThanOrEqual("relevancy_score", 0.1)
lower_before = len(iceberg_table.scan(row_filter=Not(delete_condition)).to_arrow())
assert len(iceberg_table.scan(row_filter=Not(delete_condition)).to_arrow()) == lower_before
iceberg_table.delete(delete_condition)
assert len(iceberg_table.scan().to_arrow()) == lower_before