Skip to content

Commit

Permalink
Improve the InMemory Catalog Implementation (#289)
Browse files Browse the repository at this point in the history
* extract InMemoryCatalog out of test

* generalize InMemoryCatalog

* make write work

* write to temporary location

* can override table location

* memory.py -> in_memory.py

* fix test_commit_table

* rebase from main

* revert fs changes

* fix tests

* add docs and comments

* comma

* comment

* order

* fix test

* add license

* `create_table` write metadata file

* move InMemoryCatalog back to test_base

* remove unused references

* Update mkdocs/docs/configuration.md

Co-authored-by: Fokko Driesprong <fokko@apache.org>

* Update mkdocs/docs/configuration.md

Co-authored-by: Fokko Driesprong <fokko@apache.org>

* Update tests/catalog/test_base.py

Co-authored-by: Fokko Driesprong <fokko@apache.org>

* remove schema_id

---------

Co-authored-by: Fokko Driesprong <fokko@apache.org>
  • Loading branch information
kevinjqliu and Fokko committed Mar 13, 2024
1 parent 1fd85c8 commit 36a505f
Show file tree
Hide file tree
Showing 3 changed files with 101 additions and 73 deletions.
2 changes: 1 addition & 1 deletion pyiceberg/cli/output.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ def describe_properties(self, properties: Properties) -> None:
Console().print(output_table)

def text(self, response: str) -> None:
Console().print(response)
Console(soft_wrap=True).print(response)

def schema(self, schema: Schema) -> None:
output_table = self._table
Expand Down
104 changes: 70 additions & 34 deletions tests/catalog/test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
# under the License.
# pylint:disable=redefined-outer-name


import uuid
from pathlib import PosixPath
from typing import (
Dict,
List,
Expand All @@ -40,7 +43,7 @@
NoSuchTableError,
TableAlreadyExistsError,
)
from pyiceberg.io import load_file_io
from pyiceberg.io import WAREHOUSE, load_file_io
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionField, PartitionSpec
from pyiceberg.schema import Schema
from pyiceberg.table import (
Expand All @@ -53,15 +56,21 @@
TableIdentifier,
update_table_metadata,
)
from pyiceberg.table.metadata import TableMetadataV1
from pyiceberg.table.metadata import new_table_metadata
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
from pyiceberg.transforms import IdentityTransform
from pyiceberg.typedef import EMPTY_DICT, Identifier, Properties
from pyiceberg.types import IntegerType, LongType, NestedField

DEFAULT_WAREHOUSE_LOCATION = "file:///tmp/warehouse"


class InMemoryCatalog(Catalog):
"""An in-memory catalog implementation for testing purposes."""
"""
An in-memory catalog implementation that uses in-memory data-structures to store the namespaces and tables.
This is useful for test, demo, and playground but not in production as data is not persisted.
"""

__tables: Dict[Identifier, Table]
__namespaces: Dict[Identifier, Properties]
Expand All @@ -70,6 +79,7 @@ def __init__(self, name: str, **properties: str) -> None:
super().__init__(name, **properties)
self.__tables = {}
self.__namespaces = {}
self._warehouse_location = properties.get(WAREHOUSE, DEFAULT_WAREHOUSE_LOCATION)

def create_table(
self,
Expand All @@ -79,6 +89,7 @@ def create_table(
partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC,
sort_order: SortOrder = UNSORTED_SORT_ORDER,
properties: Properties = EMPTY_DICT,
table_uuid: Optional[uuid.UUID] = None,
) -> Table:
schema: Schema = self._convert_schema_if_needed(schema) # type: ignore

Expand All @@ -91,24 +102,26 @@ def create_table(
if namespace not in self.__namespaces:
self.__namespaces[namespace] = {}

new_location = location or f's3://warehouse/{"/".join(identifier)}/data'
metadata = TableMetadataV1(**{
"format-version": 1,
"table-uuid": "d20125c8-7284-442c-9aea-15fee620737c",
"location": new_location,
"last-updated-ms": 1602638573874,
"last-column-id": schema.highest_field_id,
"schema": schema.model_dump(),
"partition-spec": partition_spec.model_dump()["fields"],
"properties": properties,
"current-snapshot-id": -1,
"snapshots": [{"snapshot-id": 1925, "timestamp-ms": 1602638573822}],
})
if not location:
location = f'{self._warehouse_location}/{"/".join(identifier)}'

metadata_location = self._get_metadata_location(location=location)
metadata = new_table_metadata(
schema=schema,
partition_spec=partition_spec,
sort_order=sort_order,
location=location,
properties=properties,
table_uuid=table_uuid,
)
io = load_file_io({**self.properties, **properties}, location=location)
self._write_metadata(metadata, io, metadata_location)

table = Table(
identifier=identifier,
metadata=metadata,
metadata_location=f's3://warehouse/{"/".join(identifier)}/metadata/metadata.json',
io=load_file_io(),
metadata_location=metadata_location,
io=io,
catalog=self,
)
self.__tables[identifier] = table
Expand All @@ -118,14 +131,29 @@ def register_table(self, identifier: Union[str, Identifier], metadata_location:
raise NotImplementedError

def _commit_table(self, table_request: CommitTableRequest) -> CommitTableResponse:
identifier = tuple(table_request.identifier.namespace.root) + (table_request.identifier.name,)
table = self.__tables[identifier]
table.metadata = update_table_metadata(base_metadata=table.metadata, updates=table_request.updates)

return CommitTableResponse(
metadata=table.metadata.model_dump(),
metadata_location=table.location(),
identifier_tuple = self.identifier_to_tuple_without_catalog(
tuple(table_request.identifier.namespace.root + [table_request.identifier.name])
)
current_table = self.load_table(identifier_tuple)
base_metadata = current_table.metadata

for requirement in table_request.requirements:
requirement.validate(base_metadata)

updated_metadata = update_table_metadata(base_metadata, table_request.updates)
if updated_metadata == base_metadata:
# no changes, do nothing
return CommitTableResponse(metadata=base_metadata, metadata_location=current_table.metadata_location)

# write new metadata
new_metadata_version = self._parse_metadata_version(current_table.metadata_location) + 1
new_metadata_location = self._get_metadata_location(current_table.metadata.location, new_metadata_version)
self._write_metadata(updated_metadata, current_table.io, new_metadata_location)

# update table state
current_table.metadata = updated_metadata

return CommitTableResponse(metadata=updated_metadata, metadata_location=new_metadata_location)

def load_table(self, identifier: Union[str, Identifier]) -> Table:
identifier = self.identifier_to_tuple_without_catalog(identifier)
Expand Down Expand Up @@ -160,7 +188,7 @@ def rename_table(self, from_identifier: Union[str, Identifier], to_identifier: U
identifier=to_identifier,
metadata=table.metadata,
metadata_location=table.metadata_location,
io=load_file_io(),
io=self._load_file_io(properties=table.metadata.properties, location=table.metadata_location),
catalog=self,
)
return self.__tables[to_identifier]
Expand Down Expand Up @@ -232,8 +260,8 @@ def update_namespace_properties(


@pytest.fixture
def catalog() -> InMemoryCatalog:
return InMemoryCatalog("test.in.memory.catalog", **{"test.key": "test.value"})
def catalog(tmp_path: PosixPath) -> InMemoryCatalog:
return InMemoryCatalog("test.in_memory.catalog", **{WAREHOUSE: tmp_path.absolute().as_posix(), "test.key": "test.value"})


TEST_TABLE_IDENTIFIER = ("com", "organization", "department", "my_table")
Expand All @@ -244,7 +272,6 @@ def catalog() -> InMemoryCatalog:
NestedField(2, "y", LongType(), doc="comment"),
NestedField(3, "z", LongType()),
)
TEST_TABLE_LOCATION = "protocol://some/location"
TEST_TABLE_PARTITION_SPEC = PartitionSpec(PartitionField(name="x", transform=IdentityTransform(), source_id=1, field_id=1000))
TEST_TABLE_PROPERTIES = {"key1": "value1", "key2": "value2"}
NO_SUCH_TABLE_ERROR = "Table does not exist: \\('com', 'organization', 'department', 'my_table'\\)"
Expand All @@ -261,7 +288,6 @@ def given_catalog_has_a_table(
return catalog.create_table(
identifier=TEST_TABLE_IDENTIFIER,
schema=TEST_TABLE_SCHEMA,
location=TEST_TABLE_LOCATION,
partition_spec=TEST_TABLE_PARTITION_SPEC,
properties=properties or TEST_TABLE_PROPERTIES,
)
Expand Down Expand Up @@ -307,13 +333,25 @@ def test_create_table(catalog: InMemoryCatalog) -> None:
table = catalog.create_table(
identifier=TEST_TABLE_IDENTIFIER,
schema=TEST_TABLE_SCHEMA,
location=TEST_TABLE_LOCATION,
partition_spec=TEST_TABLE_PARTITION_SPEC,
properties=TEST_TABLE_PROPERTIES,
)
assert catalog.load_table(TEST_TABLE_IDENTIFIER) == table


def test_create_table_location_override(catalog: InMemoryCatalog) -> None:
new_location = f"{catalog._warehouse_location}/new_location"
table = catalog.create_table(
identifier=TEST_TABLE_IDENTIFIER,
schema=TEST_TABLE_SCHEMA,
location=new_location,
partition_spec=TEST_TABLE_PARTITION_SPEC,
properties=TEST_TABLE_PROPERTIES,
)
assert catalog.load_table(TEST_TABLE_IDENTIFIER) == table
assert table.location() == new_location


@pytest.mark.parametrize(
"schema,expected",
[
Expand All @@ -335,8 +373,6 @@ def test_create_table_pyarrow_schema(catalog: InMemoryCatalog, pyarrow_schema_si
table = catalog.create_table(
identifier=TEST_TABLE_IDENTIFIER,
schema=pyarrow_schema_simple_without_ids,
location=TEST_TABLE_LOCATION,
partition_spec=TEST_TABLE_PARTITION_SPEC,
properties=TEST_TABLE_PROPERTIES,
)
assert catalog.load_table(TEST_TABLE_IDENTIFIER) == table
Expand Down Expand Up @@ -662,7 +698,7 @@ def test_add_column_with_statement(catalog: InMemoryCatalog) -> None:

def test_catalog_repr(catalog: InMemoryCatalog) -> None:
s = repr(catalog)
assert s == "test.in.memory.catalog (<class 'test_base.InMemoryCatalog'>)"
assert s == "test.in_memory.catalog (<class 'test_base.InMemoryCatalog'>)"


def test_table_properties_int_value(catalog: InMemoryCatalog) -> None:
Expand Down
Loading

0 comments on commit 36a505f

Please sign in to comment.