Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve the InMemory Catalog Implementation #289

Merged
merged 24 commits into from
Mar 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyiceberg/cli/output.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ def describe_properties(self, properties: Properties) -> None:
Console().print(output_table)

def text(self, response: str) -> None:
Console().print(response)
Console(soft_wrap=True).print(response)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

some test_console.py outputs are too long and end up with an extra \n in the middle of the string, causing tests to fail


def schema(self, schema: Schema) -> None:
output_table = self._table
Expand Down
104 changes: 70 additions & 34 deletions tests/catalog/test_base.py
kevinjqliu marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
# under the License.
# pylint:disable=redefined-outer-name


import uuid
from pathlib import PosixPath
from typing import (
Dict,
List,
Expand All @@ -42,7 +45,7 @@
NoSuchTableError,
TableAlreadyExistsError,
)
from pyiceberg.io import load_file_io
from pyiceberg.io import WAREHOUSE, load_file_io
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionField, PartitionSpec
from pyiceberg.schema import Schema
from pyiceberg.table import (
Expand All @@ -55,15 +58,21 @@
TableIdentifier,
update_table_metadata,
)
from pyiceberg.table.metadata import TableMetadataV1
from pyiceberg.table.metadata import new_table_metadata
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
from pyiceberg.transforms import IdentityTransform
from pyiceberg.typedef import EMPTY_DICT
from pyiceberg.types import IntegerType, LongType, NestedField

DEFAULT_WAREHOUSE_LOCATION = "file:///tmp/warehouse"


class InMemoryCatalog(Catalog):
"""An in-memory catalog implementation for testing purposes."""
"""
An in-memory catalog implementation that uses in-memory data-structures to store the namespaces and tables.

This is useful for test, demo, and playground but not in production as data is not persisted.
"""

__tables: Dict[Identifier, Table]
__namespaces: Dict[Identifier, Properties]
Expand All @@ -72,6 +81,7 @@ def __init__(self, name: str, **properties: str) -> None:
super().__init__(name, **properties)
self.__tables = {}
self.__namespaces = {}
self._warehouse_location = properties.get(WAREHOUSE, DEFAULT_WAREHOUSE_LOCATION)

def create_table(
self,
Expand All @@ -81,6 +91,7 @@ def create_table(
partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC,
sort_order: SortOrder = UNSORTED_SORT_ORDER,
properties: Properties = EMPTY_DICT,
table_uuid: Optional[uuid.UUID] = None,
) -> Table:
schema: Schema = self._convert_schema_if_needed(schema) # type: ignore

Expand All @@ -93,24 +104,26 @@ def create_table(
if namespace not in self.__namespaces:
self.__namespaces[namespace] = {}

new_location = location or f's3://warehouse/{"/".join(identifier)}/data'
metadata = TableMetadataV1(**{
"format-version": 1,
"table-uuid": "d20125c8-7284-442c-9aea-15fee620737c",
"location": new_location,
"last-updated-ms": 1602638573874,
"last-column-id": schema.highest_field_id,
"schema": schema.model_dump(),
"partition-spec": partition_spec.model_dump()["fields"],
"properties": properties,
"current-snapshot-id": -1,
"snapshots": [{"snapshot-id": 1925, "timestamp-ms": 1602638573822}],
})
if not location:
location = f'{self._warehouse_location}/{"/".join(identifier)}'

metadata_location = self._get_metadata_location(location=location)
metadata = new_table_metadata(
schema=schema,
partition_spec=partition_spec,
sort_order=sort_order,
location=location,
properties=properties,
table_uuid=table_uuid,
)
io = load_file_io({**self.properties, **properties}, location=location)
self._write_metadata(metadata, io, metadata_location)

table = Table(
identifier=identifier,
metadata=metadata,
metadata_location=f's3://warehouse/{"/".join(identifier)}/metadata/metadata.json',
io=load_file_io(),
metadata_location=metadata_location,
io=io,
catalog=self,
)
self.__tables[identifier] = table
Expand All @@ -120,14 +133,29 @@ def register_table(self, identifier: Union[str, Identifier], metadata_location:
raise NotImplementedError

def _commit_table(self, table_request: CommitTableRequest) -> CommitTableResponse:
identifier = tuple(table_request.identifier.namespace.root) + (table_request.identifier.name,)
table = self.__tables[identifier]
table.metadata = update_table_metadata(base_metadata=table.metadata, updates=table_request.updates)

return CommitTableResponse(
metadata=table.metadata.model_dump(),
metadata_location=table.location(),
identifier_tuple = self.identifier_to_tuple_without_catalog(
tuple(table_request.identifier.namespace.root + [table_request.identifier.name])
)
current_table = self.load_table(identifier_tuple)
base_metadata = current_table.metadata

for requirement in table_request.requirements:
requirement.validate(base_metadata)

updated_metadata = update_table_metadata(base_metadata, table_request.updates)
if updated_metadata == base_metadata:
# no changes, do nothing
return CommitTableResponse(metadata=base_metadata, metadata_location=current_table.metadata_location)

# write new metadata
new_metadata_version = self._parse_metadata_version(current_table.metadata_location) + 1
new_metadata_location = self._get_metadata_location(current_table.metadata.location, new_metadata_version)
self._write_metadata(updated_metadata, current_table.io, new_metadata_location)

# update table state
current_table.metadata = updated_metadata

return CommitTableResponse(metadata=updated_metadata, metadata_location=new_metadata_location)

def load_table(self, identifier: Union[str, Identifier]) -> Table:
identifier = self.identifier_to_tuple_without_catalog(identifier)
Expand Down Expand Up @@ -162,7 +190,7 @@ def rename_table(self, from_identifier: Union[str, Identifier], to_identifier: U
identifier=to_identifier,
metadata=table.metadata,
metadata_location=table.metadata_location,
io=load_file_io(),
io=self._load_file_io(properties=table.metadata.properties, location=table.metadata_location),
catalog=self,
)
return self.__tables[to_identifier]
Expand Down Expand Up @@ -234,8 +262,8 @@ def update_namespace_properties(


@pytest.fixture
def catalog() -> InMemoryCatalog:
return InMemoryCatalog("test.in.memory.catalog", **{"test.key": "test.value"})
def catalog(tmp_path: PosixPath) -> InMemoryCatalog:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added ability to write to temporary files for testing, which is then automatically cleaned up

return InMemoryCatalog("test.in_memory.catalog", **{WAREHOUSE: tmp_path.absolute().as_posix(), "test.key": "test.value"})


TEST_TABLE_IDENTIFIER = ("com", "organization", "department", "my_table")
Expand All @@ -246,7 +274,6 @@ def catalog() -> InMemoryCatalog:
NestedField(2, "y", LongType(), doc="comment"),
NestedField(3, "z", LongType()),
)
TEST_TABLE_LOCATION = "protocol://some/location"
TEST_TABLE_PARTITION_SPEC = PartitionSpec(PartitionField(name="x", transform=IdentityTransform(), source_id=1, field_id=1000))
TEST_TABLE_PROPERTIES = {"key1": "value1", "key2": "value2"}
NO_SUCH_TABLE_ERROR = "Table does not exist: \\('com', 'organization', 'department', 'my_table'\\)"
Expand All @@ -263,7 +290,6 @@ def given_catalog_has_a_table(
return catalog.create_table(
identifier=TEST_TABLE_IDENTIFIER,
schema=TEST_TABLE_SCHEMA,
location=TEST_TABLE_LOCATION,
partition_spec=TEST_TABLE_PARTITION_SPEC,
properties=properties or TEST_TABLE_PROPERTIES,
)
Expand Down Expand Up @@ -309,13 +335,25 @@ def test_create_table(catalog: InMemoryCatalog) -> None:
table = catalog.create_table(
identifier=TEST_TABLE_IDENTIFIER,
schema=TEST_TABLE_SCHEMA,
location=TEST_TABLE_LOCATION,
partition_spec=TEST_TABLE_PARTITION_SPEC,
properties=TEST_TABLE_PROPERTIES,
)
assert catalog.load_table(TEST_TABLE_IDENTIFIER) == table


def test_create_table_location_override(catalog: InMemoryCatalog) -> None:
new_location = f"{catalog._warehouse_location}/new_location"
table = catalog.create_table(
identifier=TEST_TABLE_IDENTIFIER,
schema=TEST_TABLE_SCHEMA,
location=new_location,
partition_spec=TEST_TABLE_PARTITION_SPEC,
properties=TEST_TABLE_PROPERTIES,
)
assert catalog.load_table(TEST_TABLE_IDENTIFIER) == table
assert table.location() == new_location


@pytest.mark.parametrize(
"schema,expected",
[
Expand All @@ -337,8 +375,6 @@ def test_create_table_pyarrow_schema(catalog: InMemoryCatalog, pyarrow_schema_si
table = catalog.create_table(
identifier=TEST_TABLE_IDENTIFIER,
schema=pyarrow_schema_simple_without_ids,
location=TEST_TABLE_LOCATION,
partition_spec=TEST_TABLE_PARTITION_SPEC,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@syun64 FYI, I realized that the TEST_TABLE_PARTITION_SPEC here breaks this test.

TEST_TABLE_PARTITION_SPEC = PartitionSpec(PartitionField(name="x", transform=IdentityTransform(), source_id=1, field_id=1000))

The partition field's source_id here is 1, but in create_table the schema's field_ids are all -1 due to _convert_schema_if_needed

So assign_fresh_partition_spec_ids fails

original_column_name = old_schema.find_column_name(field.source_id)
if original_column_name is None:

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @kevinjqliu thank you for flagging this 😄 I think '-1' ID discrepancy is the symptom of the issue that makes the issue easy to understand, just as we decided in #305 (comment)

The root cause of the issue I think is that we are introducing a way for non-ID's schema (PyArrow Schema) to be used as an input into create_table, while not supporting the same for partition_spec and sort_order (PartitionField and SortField both require field IDs as inputs).

So I think we should update both assign_fresh_partition_spec_ids and assign_fresh_sort_order_ids to support field look up by name.

@Fokko - does that sound like a good way to resolve this issue?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Created #338 to track this issue

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with you @syun64 that for creating tables having to look up the IDs is not ideal. Probably that API has to be extended at some point.

But for the metadata (and also how Iceberg internally tracks columns, since names can change; IDs not), we need to track it by ID. I'm in doubt if assigning -1 was the best idea because that will give you a table that you cannot work with. Thanks for creating the issue, and let's continue there.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good @Fokko 👍 and thanks again for flagging this @kevinjqliu !

properties=TEST_TABLE_PROPERTIES,
)
assert catalog.load_table(TEST_TABLE_IDENTIFIER) == table
Expand Down Expand Up @@ -664,7 +700,7 @@ def test_add_column_with_statement(catalog: InMemoryCatalog) -> None:

def test_catalog_repr(catalog: InMemoryCatalog) -> None:
s = repr(catalog)
assert s == "test.in.memory.catalog (<class 'test_base.InMemoryCatalog'>)"
assert s == "test.in_memory.catalog (<class 'test_base.InMemoryCatalog'>)"


def test_table_properties_int_value(catalog: InMemoryCatalog) -> None:
Expand Down
Loading