Skip to content

Commit

Permalink
Feature/03 create bootstrap targets (#13)
Browse files Browse the repository at this point in the history
* added firm parquet check

* implemented pyarrow schema inferrence for parquet + test

* implemented table creation + testing

* implemented namespace creation as well, plus test cleanup logic

* suprise! CI testing : )
  • Loading branch information
randypitcherii committed Dec 7, 2023
1 parent 6c70b2d commit b4c1966
Show file tree
Hide file tree
Showing 6 changed files with 228 additions and 27 deletions.
29 changes: 29 additions & 0 deletions tabular-cdc-bootstrapper/.github/workflows/pr_check.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
name: Tabular CDC Bootstrapper PR Unit Testing

on:
pull_request:
branches:
- main

jobs:
test:
runs-on: ubuntu-latest

steps:
- uses: actions/checkout@v2

- name: Set up Python 3.9
uses: actions/setup-python@v2
with:
python-version: 3.9

- name: Install dependencies
run: |
pip install pipenv
cd tabular-cdc-bootstrapper
pipenv install
- name: Run tests
run: |
cd tabular-cdc-bootstrapper
pipenv run pytest
2 changes: 2 additions & 0 deletions tabular-cdc-bootstrapper/Pipfile
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ name = "pypi"
requests = "*"
pytest = "*"
pyiceberg = {extras = ["s3fs"], version = "*"}
pyarrow = "*"
boto3 = "*"

[dev-packages]

Expand Down
112 changes: 108 additions & 4 deletions tabular-cdc-bootstrapper/Pipfile.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions tabular-cdc-bootstrapper/cdc_bootstrap_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ def handle_new_file(event, context):
""")

try:
# parquet-only gate
if not object_key.endswith('.parquet'):
raise ValueError("Only parquet files are supported right now, sunshine 🌞.")

catalog_properties = {
'uri': TABULAR_CATALOG_URI,
'credential': TABULAR_CREDENTIAL,
Expand Down
40 changes: 27 additions & 13 deletions tabular-cdc-bootstrapper/tabular.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
from io import BytesIO
import logging

import pyarrow.parquet as pq
from pyiceberg.catalog import load_catalog
from pyiceberg.exceptions import NoSuchTableError
from pyiceberg.exceptions import NoSuchTableError, NamespaceAlreadyExistsError

# Set up logging
logger = logging.getLogger()
Expand Down Expand Up @@ -54,28 +57,39 @@ def bootstrap_from_file(s3_key: str, s3_prefix:str, catalog_properties) -> str:
s3_prefix: {s3_prefix}
target_db_name: {target_db_name}
target_table_name: {target_table_name}
""")

return True


def get_table_schema_from_parquet(parquet_io_object: BytesIO) -> dict:
# read that schema
parquet_io_object.seek(0)
table = pq.read_table(source=parquet_io_object)
return table.schema


def create_table_from_s3_path(s3_key: str, catalog, database: str, table: str):
"""
Call tabular API to infer the schema of the table to be created from the s3 path.
Then create the table.
Creates an empty, columnless iceberg table with the given database and table name
in the provided iceberg catalog.
TODO: actually implement schema inference. Someday...
"""
# TODO: connection to the placeholder tabular API
# Use the api to infer schema from s3 path
# schema = infer_schema(tabular_api, s3_key)

# Once we have the schema
# Create a table using the catalog object
# table_id = iceberg.TableIdentifier.of(database, table)
# iceberg_table = catalog.createTable(table_id, schema)
# Create the namespace if it doesn't exist
try:
catalog.create_namespace(database)
except NamespaceAlreadyExistsError as naee:
pass

# Create 'db.table'
catalog.create_table(
identifier=f'{database}.{table}',
schema={},
properties={'comment': f'created by cdc bootstrapper from s3 file: {s3_key}'}
)

# return iceberg_table
raise NotImplementedError

def bootstrap_load_table(s3_folder_path: str, warehouse: str, database: str, table: str):
"""
Expand Down
68 changes: 58 additions & 10 deletions tabular-cdc-bootstrapper/test_tabular.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,24 @@
from io import BytesIO
import os

import pyarrow as pa
import pyarrow.parquet as pq
from pyiceberg.catalog import load_catalog
import pytest

import tabular

class TestTabular:
CATALOG_PROPERTIES = {
'type': 'rest',
'uri': os.environ['TABULAR_CATALOG_URI'],
'credential': os.environ['TABULAR_CREDENTIAL'],
'warehouse': os.environ['TABULAR_TARGET_WAREHOUSE'],
}

catalog = load_catalog(**CATALOG_PROPERTIES)


def test_extract_database_and_table(self):
s3_key = 'cdc-bootstrap/alpha/gazebo/my-file.json'

Expand All @@ -23,21 +37,55 @@ def test_extract_database_and_table(self):
def test_bootstrap_from_file(self):
test_cases = {
'table_exists': ('cdc-bootstrap/system/catalog_events/my-file.json', 'cdc-bootstrap', False),
'table_missing': ('cdc-bootstrap/pyiceberg/alpha/my-file.json', 'cdc-bootstrap', True),
'table_missing': ('cdc-bootstrap/fingers-crossed-this-doesnt-exist/missing-table/my-file.json', 'cdc-bootstrap', True),
'database_missing': ('cdc-bootstrap/pyiceberg/alpha/my-file.json', '', True)
}

catalog_properties = {
'type': 'rest',
'uri': os.environ['TABULAR_CATALOG_URI'],
'credential': os.environ['TABULAR_CREDENTIAL'],
'warehouse': os.environ['TABULAR_TARGET_WAREHOUSE'],
}

for key in test_cases:
test_case = test_cases[key]
assert tabular.bootstrap_from_file(test_case[0], test_case[1], catalog_properties) == test_case[2]
assert tabular.bootstrap_from_file(test_case[0], test_case[1], self.CATALOG_PROPERTIES) == test_case[2]

# test some junk
with pytest.raises(ValueError):
tabular.bootstrap_from_file('lkdfj.jdsfskl', 'fdassdf', {})
tabular.bootstrap_from_file('lkdfj.jdsfskl', 'fdassdf', {})

def test_get_table_schema_from_parquet(self):
test_sets = [
{
'Name': ['John', 'Anna', 'Peter', 'Linda'],
'Age': [30, 20, 40, 50]
},
]

for test_set in test_sets:
# turn the test set into an in-memory parquet file
tbl = pa.table(test_set)
parquet_file = BytesIO()
pq.write_table(tbl, parquet_file)

# get the actual schema
actual_schema = tabular.get_table_schema_from_parquet(parquet_file)

# get expected values
expected_field_names = set(test_set.keys())

# assert 💪
assert set(actual_schema.names) == expected_field_names

def test_create_table_from_s3_path(self):
mock_s3_key = 'cdc-bootstrap/pyiceberg/_test_create_table_from_s3_path/my-file.json'
target_db_name = '_test_cdc_bootloader'
target_table_name = '_test_create_table_from_s3_path'

try:
tabular.create_table_from_s3_path(s3_key=mock_s3_key, catalog=self.catalog, database=target_db_name, table=target_table_name)
actual_table = self.catalog.load_table(f'{target_db_name}.{target_table_name}')

expected_table_name = ('default', target_db_name, target_table_name)

assert actual_table.name() == expected_table_name

finally:
self.catalog.drop_table(f'{target_db_name}.{target_table_name}')
self.catalog.drop_namespace(target_db_name)

0 comments on commit b4c1966

Please sign in to comment.