Skip to content

Commit

Permalink
Allow non-string typed values in table properties (#469)
Browse files Browse the repository at this point in the history
* property accept int

https://stackoverflow.com/questions/77304167/using-pydantic-to-change-int-to-string
https://docs.pydantic.dev/latest/concepts/validators/\#field-validators

* add tests

* add integration tests

* pr feedback

* make validator reusable

* show key when none
  • Loading branch information
kevinjqliu committed Feb 29, 2024
1 parent 6708a6e commit d56dddd
Show file tree
Hide file tree
Showing 8 changed files with 144 additions and 11 deletions.
9 changes: 6 additions & 3 deletions pyiceberg/catalog/rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
Union,
)

from pydantic import Field, ValidationError
from pydantic import Field, ValidationError, field_validator
from requests import HTTPError, Session
from tenacity import RetryCallState, retry, retry_if_exception_type, stop_after_attempt

Expand Down Expand Up @@ -69,6 +69,7 @@
)
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder, assign_fresh_sort_order_ids
from pyiceberg.typedef import EMPTY_DICT, UTF8, IcebergBaseModel
from pyiceberg.types import transform_dict_value_to_str

if TYPE_CHECKING:
import pyarrow as pa
Expand Down Expand Up @@ -147,6 +148,8 @@ class CreateTableRequest(IcebergBaseModel):
write_order: Optional[SortOrder] = Field(alias="write-order")
stage_create: bool = Field(alias="stage-create", default=False)
properties: Properties = Field(default_factory=dict)
# validators
transform_properties_dict_value_to_str = field_validator('properties', mode='before')(transform_dict_value_to_str)


class RegisterTableRequest(IcebergBaseModel):
Expand Down Expand Up @@ -234,9 +237,9 @@ def _create_session(self) -> Session:

# Sets the client side and server side SSL cert verification, if provided as properties.
if ssl_config := self.properties.get(SSL):
if ssl_ca_bundle := ssl_config.get(CA_BUNDLE): # type: ignore
if ssl_ca_bundle := ssl_config.get(CA_BUNDLE):
session.verify = ssl_ca_bundle
if ssl_client := ssl_config.get(CLIENT): # type: ignore
if ssl_client := ssl_config.get(CLIENT):
if all(k in ssl_client for k in (CERT, KEY)):
session.cert = (ssl_client[CERT], ssl_client[KEY])
elif ssl_client_cert := ssl_client.get(CERT):
Expand Down
6 changes: 5 additions & 1 deletion pyiceberg/table/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
Union,
)

from pydantic import Field, model_validator
from pydantic import Field, field_validator, model_validator
from pydantic import ValidationError as PydanticValidationError
from typing_extensions import Annotated

Expand All @@ -49,6 +49,7 @@
IcebergRootModel,
Properties,
)
from pyiceberg.types import transform_dict_value_to_str
from pyiceberg.utils.datetime import datetime_to_millis

CURRENT_SNAPSHOT_ID = "current-snapshot-id"
Expand Down Expand Up @@ -218,6 +219,9 @@ class TableMetadataCommonFields(IcebergBaseModel):
There is always a main branch reference pointing to the
current-snapshot-id even if the refs map is null."""

# validators
transform_properties_dict_value_to_str = field_validator('properties', mode='before')(transform_dict_value_to_str)

def snapshot_by_id(self, snapshot_id: int) -> Optional[Snapshot]:
"""Get the snapshot by snapshot_id."""
return next((snapshot for snapshot in self.snapshots if snapshot.snapshot_id == snapshot_id), None)
Expand Down
2 changes: 1 addition & 1 deletion pyiceberg/typedef.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ def __missing__(self, key: K) -> V:


Identifier = Tuple[str, ...]
Properties = Dict[str, str]
Properties = Dict[str, Any]
RecursiveDict = Dict[str, Union[str, "RecursiveDict"]]

# Represents the literal value
Expand Down
9 changes: 9 additions & 0 deletions pyiceberg/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
from typing import (
Any,
ClassVar,
Dict,
Literal,
Optional,
Tuple,
Expand All @@ -61,6 +62,14 @@
FIXED_PARSER = ParseNumberFromBrackets(FIXED)


def transform_dict_value_to_str(dict: Dict[str, Any]) -> Dict[str, str]:
"""Transform all values in the dictionary to string. Raise an error if any value is None."""
for key, value in dict.items():
if value is None:
raise ValueError(f"None type is not a supported value in properties: {key}")
return {k: str(v) for k, v in dict.items()}


def _parse_decimal_type(decimal: Any) -> Tuple[int, int]:
if isinstance(decimal, str):
matches = DECIMAL_REGEX.search(decimal)
Expand Down
22 changes: 20 additions & 2 deletions tests/catalog/test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

import pyarrow as pa
import pytest
from pydantic_core import ValidationError
from pytest_lazyfixture import lazy_fixture

from pyiceberg.catalog import (
Expand Down Expand Up @@ -255,13 +256,16 @@ def catalog() -> InMemoryCatalog:
NAMESPACE_NOT_EMPTY_ERROR = "Namespace is not empty: \\('com', 'organization', 'department'\\)"


def given_catalog_has_a_table(catalog: InMemoryCatalog) -> Table:
def given_catalog_has_a_table(
catalog: InMemoryCatalog,
properties: Properties = EMPTY_DICT,
) -> Table:
return catalog.create_table(
identifier=TEST_TABLE_IDENTIFIER,
schema=TEST_TABLE_SCHEMA,
location=TEST_TABLE_LOCATION,
partition_spec=TEST_TABLE_PARTITION_SPEC,
properties=TEST_TABLE_PROPERTIES,
properties=properties or TEST_TABLE_PROPERTIES,
)


Expand Down Expand Up @@ -661,3 +665,17 @@ def test_add_column_with_statement(catalog: InMemoryCatalog) -> None:
def test_catalog_repr(catalog: InMemoryCatalog) -> None:
s = repr(catalog)
assert s == "test.in.memory.catalog (<class 'test_base.InMemoryCatalog'>)"


def test_table_properties_int_value(catalog: InMemoryCatalog) -> None:
# table properties can be set to int, but still serialized to string
property_with_int = {"property_name": 42}
given_table = given_catalog_has_a_table(catalog, properties=property_with_int)
assert isinstance(given_table.properties["property_name"], str)


def test_table_properties_raise_for_none_value(catalog: InMemoryCatalog) -> None:
property_with_none = {"property_name": None}
with pytest.raises(ValidationError) as exc_info:
_ = given_catalog_has_a_table(catalog, properties=property_with_none)
assert "None type is not a supported value in properties: property_name" in str(exc_info.value)
39 changes: 38 additions & 1 deletion tests/catalog/test_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import pyarrow as pa
import pytest
from pydantic_core import ValidationError
from pytest_lazyfixture import lazy_fixture
from sqlalchemy.exc import ArgumentError, IntegrityError

Expand Down Expand Up @@ -640,7 +641,7 @@ def test_create_namespace_with_null_properties(catalog: SqlCatalog, database_nam
catalog.create_namespace(namespace=database_name, properties={None: "value"}) # type: ignore

with pytest.raises(IntegrityError):
catalog.create_namespace(namespace=database_name, properties={"key": None}) # type: ignore
catalog.create_namespace(namespace=database_name, properties={"key": None})


@pytest.mark.parametrize(
Expand Down Expand Up @@ -915,3 +916,39 @@ def test_write_and_evolve(catalog: SqlCatalog, format_version: int) -> None:
with txn.update_snapshot().fast_append() as snapshot_update:
for data_file in _dataframe_to_data_files(table_metadata=txn.table_metadata, df=pa_table_with_column, io=tbl.io):
snapshot_update.append_data_file(data_file)


@pytest.mark.parametrize(
'catalog',
[
lazy_fixture('catalog_memory'),
lazy_fixture('catalog_sqlite'),
lazy_fixture('catalog_sqlite_without_rowcount'),
],
)
def test_table_properties_int_value(catalog: SqlCatalog, table_schema_simple: Schema, random_identifier: Identifier) -> None:
# table properties can be set to int, but still serialized to string
database_name, _table_name = random_identifier
catalog.create_namespace(database_name)
property_with_int = {"property_name": 42}
table = catalog.create_table(random_identifier, table_schema_simple, properties=property_with_int)
assert isinstance(table.properties["property_name"], str)


@pytest.mark.parametrize(
'catalog',
[
lazy_fixture('catalog_memory'),
lazy_fixture('catalog_sqlite'),
lazy_fixture('catalog_sqlite_without_rowcount'),
],
)
def test_table_properties_raise_for_none_value(
catalog: SqlCatalog, table_schema_simple: Schema, random_identifier: Identifier
) -> None:
database_name, _table_name = random_identifier
catalog.create_namespace(database_name)
property_with_none = {"property_name": None}
with pytest.raises(ValidationError) as exc_info:
_ = catalog.create_table(random_identifier, table_schema_simple, properties=property_with_none)
assert "None type is not a supported value in properties: property_name" in str(exc_info.value)
39 changes: 37 additions & 2 deletions tests/integration/test_writes.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import pytest
import pytz
from pyarrow.fs import S3FileSystem
from pydantic_core import ValidationError
from pyspark.sql import SparkSession
from pytest_mock.plugin import MockerFixture

Expand Down Expand Up @@ -403,7 +404,7 @@ def test_data_files(spark: SparkSession, session_catalog: Catalog, arrow_table_w


@pytest.mark.integration
@pytest.mark.parametrize("format_version", ["1", "2"])
@pytest.mark.parametrize("format_version", [1, 2])
@pytest.mark.parametrize(
"properties, expected_compression_name",
[
Expand All @@ -419,7 +420,7 @@ def test_write_parquet_compression_properties(
spark: SparkSession,
session_catalog: Catalog,
arrow_table_with_null: pa.Table,
format_version: str,
format_version: int,
properties: Dict[str, Any],
expected_compression_name: str,
) -> None:
Expand Down Expand Up @@ -654,3 +655,37 @@ def test_write_and_evolve(session_catalog: Catalog, format_version: int) -> None
with txn.update_snapshot().fast_append() as snapshot_update:
for data_file in _dataframe_to_data_files(table_metadata=txn.table_metadata, df=pa_table_with_column, io=tbl.io):
snapshot_update.append_data_file(data_file)


@pytest.mark.integration
@pytest.mark.parametrize("format_version", [1, 2])
def test_table_properties_int_value(
session_catalog: Catalog,
arrow_table_with_null: pa.Table,
format_version: int,
) -> None:
# table properties can be set to int, but still serialized to string
property_with_int = {"property_name": 42}
identifier = "default.test_table_properties_int_value"

tbl = _create_table(
session_catalog, identifier, {"format-version": format_version, **property_with_int}, [arrow_table_with_null]
)
assert isinstance(tbl.properties["property_name"], str)


@pytest.mark.integration
@pytest.mark.parametrize("format_version", [1, 2])
def test_table_properties_raise_for_none_value(
session_catalog: Catalog,
arrow_table_with_null: pa.Table,
format_version: int,
) -> None:
property_with_none = {"property_name": None}
identifier = "default.test_table_properties_raise_for_none_value"

with pytest.raises(ValidationError) as exc_info:
_ = _create_table(
session_catalog, identifier, {"format-version": format_version, **property_with_none}, [arrow_table_with_null]
)
assert "None type is not a supported value in properties: property_name" in str(exc_info.value)
29 changes: 28 additions & 1 deletion tests/table/test_init.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@
# pylint:disable=redefined-outer-name
import uuid
from copy import copy
from typing import Dict
from typing import Any, Dict

import pyarrow as pa
import pytest
from pydantic import ValidationError
from sortedcontainers import SortedList

from pyiceberg.catalog.noop import NoopCatalog
Expand Down Expand Up @@ -1081,3 +1082,29 @@ def test_schema_mismatch_additional_field(table_schema_simple: Schema) -> None:

with pytest.raises(ValueError, match=expected):
_check_schema(table_schema_simple, other_schema)


def test_table_properties(example_table_metadata_v2: Dict[str, Any]) -> None:
# metadata properties are all strings
for k, v in example_table_metadata_v2["properties"].items():
assert isinstance(k, str)
assert isinstance(v, str)
metadata = TableMetadataV2(**example_table_metadata_v2)
for k, v in metadata.properties.items():
assert isinstance(k, str)
assert isinstance(v, str)

# property can be set to int, but still serialized as string
property_with_int = {"property_name": 42}
new_example_table_metadata_v2 = {**example_table_metadata_v2, "properties": property_with_int}
assert isinstance(new_example_table_metadata_v2["properties"]["property_name"], int)
new_metadata = TableMetadataV2(**new_example_table_metadata_v2)
assert isinstance(new_metadata.properties["property_name"], str)


def test_table_properties_raise_for_none_value(example_table_metadata_v2: Dict[str, Any]) -> None:
property_with_none = {"property_name": None}
example_table_metadata_v2 = {**example_table_metadata_v2, "properties": property_with_none}
with pytest.raises(ValidationError) as exc_info:
TableMetadataV2(**example_table_metadata_v2)
assert "None type is not a supported value in properties: property_name" in str(exc_info.value)

0 comments on commit d56dddd

Please sign in to comment.