From 8dce317084d76d83707e42d2d4e55e46ff91c5bb Mon Sep 17 00:00:00 2001 From: Randy Date: Mon, 11 Dec 2023 09:40:16 -0500 Subject: [PATCH] Feature/05 configure fileloader and cdc props (#14) * fixed a painful bloated dependencies issue * wired up table creation -- next is simple table property management * STUPID - I didn't have .github in the project root * specified python version better for gh action * made pipenv just a little more flexible * made gh actions less strict * remember to pipenv install dev deps * added envs to gh action workflow * fixed bad var ref for ci * fixed state issues in unit testing * refactor to s3_monitoring_uri syntax, split table creation into loader tables and cdc mirror tables * small tweak to config file * added s3 uri indicator --- .../workflows/pr_check.yml | 15 +- README.md | 8 +- tabular-cdc-bootstrapper/.gitignore | 5 +- tabular-cdc-bootstrapper/Pipfile | 5 +- tabular-cdc-bootstrapper/Pipfile.lock | 126 +++++------ .../cdc_bootstrap_handler.py | 47 ++-- tabular-cdc-bootstrapper/serverless.yml | 15 +- tabular-cdc-bootstrapper/tabular.py | 210 ++++++++++++------ tabular-cdc-bootstrapper/test_tabular.py | 50 +++-- 9 files changed, 303 insertions(+), 178 deletions(-) rename {tabular-cdc-bootstrapper/.github => .github}/workflows/pr_check.yml (53%) diff --git a/tabular-cdc-bootstrapper/.github/workflows/pr_check.yml b/.github/workflows/pr_check.yml similarity index 53% rename from tabular-cdc-bootstrapper/.github/workflows/pr_check.yml rename to .github/workflows/pr_check.yml index ace2cd2..555bd49 100644 --- a/tabular-cdc-bootstrapper/.github/workflows/pr_check.yml +++ b/.github/workflows/pr_check.yml @@ -6,9 +6,20 @@ on: - main jobs: - test: + build_and_test: + name: Build and test cdc bootstrapper locally (no sls deploy) runs-on: ubuntu-latest + + env: + S3_BUCKET_NAME: ${{ vars.S3_BUCKET_NAME }} + S3_BUCKET_PATH: ${{ vars.S3_BUCKET_PATH }} + + TABULAR_CREDENTIAL: ${{ secrets.TABULAR_CREDENTIAL }} + TABULAR_TARGET_WAREHOUSE: ${{ vars.TABULAR_TARGET_WAREHOUSE }} + TABULAR_CATALOG_URI: ${{ vars.TABULAR_CATALOG_URI }} + + steps: - uses: actions/checkout@v2 @@ -21,7 +32,7 @@ jobs: run: | pip install pipenv cd tabular-cdc-bootstrapper - pipenv install + pipenv install --dev - name: Run tests run: | diff --git a/README.md b/README.md index 0728bba..a73ff4d 100644 --- a/README.md +++ b/README.md @@ -32,8 +32,12 @@ pipenv install - [configure serverless for your AWS account.](https://www.serverless.com/framework/docs/providers/aws/guide/credentials) - update serverless.yml with your specific configs, including tabular credentials. You may also provide a `.env` file if you prefer. Place the file alongside the `serverless.yml` file in the same directory 💪 ```.env -S3_BUCKET_NAME=randy-pitcher-workspace--aws -S3_BUCKET_PATH=cdc-bootstrap +S3_BUCKET_TO_MONITOR=randy-pitcher-workspace--aws +S3_PATH_TO_MONITOR=cdc-bootstrap + +TABULAR_TARGET_WAREHOUSE=enterprise_data_warehouse +TABULAR_CREDENTIAL=t-1234:123123123 # needs permission to create database in a warehouse and list all existing objects in a warehouse +TABULAR_CATALOG_URI=https://api.tabular.io/ws ``` - activate the python virtual environment with `pipenv shell` - deploy with `npx sls deploy`. NOTE: if you want to just run `sls deploy`, install serverless globally with npm (`npm install -g serverless`) diff --git a/tabular-cdc-bootstrapper/.gitignore b/tabular-cdc-bootstrapper/.gitignore index 7d07c70..ca0035d 100644 --- a/tabular-cdc-bootstrapper/.gitignore +++ b/tabular-cdc-bootstrapper/.gitignore @@ -23,4 +23,7 @@ node_modules .requirements.zip # shhh, secrets. Or maybe just configs, who knows 🤷 -.env \ No newline at end of file +.env + +# ignore sample files +*.parquet \ No newline at end of file diff --git a/tabular-cdc-bootstrapper/Pipfile b/tabular-cdc-bootstrapper/Pipfile index 7f20361..339bd25 100644 --- a/tabular-cdc-bootstrapper/Pipfile +++ b/tabular-cdc-bootstrapper/Pipfile @@ -4,14 +4,11 @@ verify_ssl = true name = "pypi" [packages] -requests = "*" -pytest = "*" pyiceberg = {extras = ["s3fs"], version = "*"} pyarrow = "*" -boto3 = "*" [dev-packages] +pytest = "*" [requires] python_version = "3.9" -python_full_version = "3.9.6" diff --git a/tabular-cdc-bootstrapper/Pipfile.lock b/tabular-cdc-bootstrapper/Pipfile.lock index 4b0e5fb..c58fd8b 100644 --- a/tabular-cdc-bootstrapper/Pipfile.lock +++ b/tabular-cdc-bootstrapper/Pipfile.lock @@ -1,11 +1,10 @@ { "_meta": { "hash": { - "sha256": "da81e648cf1ee436259028d4d88cabe48d35d856398e2d49e5b7dde129bdce9e" + "sha256": "bc3d5723880bca256c2eab834b115545dc21bb7f9316f0a940f02728dcfd2904" }, "pipfile-spec": 6, "requires": { - "python_full_version": "3.9.6", "python_version": "3.9" }, "sources": [ @@ -147,15 +146,6 @@ "markers": "python_version >= '3.7'", "version": "==23.1.0" }, - "boto3": { - "hashes": [ - "sha256:1fe5fa75ff0f0c29a6f55e818d149d33571731e692a7b785ded7a28ac832cae8", - "sha256:fa5aa92d16763cb906fb4a83d6eba887342202a980bea07862af5ba40827aa5a" - ], - "index": "pypi", - "markers": "python_version >= '3.7'", - "version": "==1.33.1" - }, "botocore": { "hashes": [ "sha256:c744b90980786c610dd9ad9c50cf2cdde3f1c4634b954a33613f6f8a1865a1de", @@ -276,14 +266,6 @@ "markers": "python_version >= '3.7'", "version": "==8.1.7" }, - "exceptiongroup": { - "hashes": [ - "sha256:4bfd3996ac73b41e9b9628b04e079f193850720ea5945fc96a08633c66912f14", - "sha256:91f5c769735f051a4290d52edd0858999b57e5876e9f85937691bd4c9fa3ed68" - ], - "markers": "python_version < '3.11'", - "version": "==1.2.0" - }, "frozenlist": { "hashes": [ "sha256:007df07a6e3eb3e33e9a1fe6a9db7af152bbd8a185f9aaa6ece10a3529e3e1c6", @@ -367,14 +349,6 @@ "markers": "python_version >= '3.5'", "version": "==3.6" }, - "iniconfig": { - "hashes": [ - "sha256:2d91e135bf72d31a410b17c16da610a82cb55f6b0477d1a902134b24a455b8b3", - "sha256:b6a85871a79d2e3b22d2d1b94ac2824226a63c6b741c88f7ae975f18b6778374" - ], - "markers": "python_version >= '3.7'", - "version": "==2.0.0" - }, "jmespath": { "hashes": [ "sha256:02e2e4cc71b5bcab88332eebf907519190dd9e6e82107fa7f83b1003a6252980", @@ -560,22 +534,6 @@ "markers": "python_version >= '3.9'", "version": "==1.26.2" }, - "packaging": { - "hashes": [ - "sha256:048fb0e9405036518eaaf48a55953c750c11e1a1b68e0dd1a9d62ed0c092cfc5", - "sha256:8c491190033a9af7e1d931d0b5dacc2ef47509b34dd0de67ed209b5203fc88c7" - ], - "markers": "python_version >= '3.7'", - "version": "==23.2" - }, - "pluggy": { - "hashes": [ - "sha256:cf61ae8f126ac6f7c451172cf30e3e43d3ca77615509771b3a984a0730651e12", - "sha256:d89c696a773f8bd377d18e5ecda92b7a3793cbe66c87060a6fb58c7b6e1061f7" - ], - "markers": "python_version >= '3.8'", - "version": "==1.3.0" - }, "pyarrow": { "hashes": [ "sha256:0140c7e2b740e08c5a459439d87acd26b747fc408bde0a8806096ee0baaa0c15", @@ -789,21 +747,12 @@ "markers": "python_full_version >= '3.6.8'", "version": "==3.1.1" }, - "pytest": { - "hashes": [ - "sha256:0d009c083ea859a71b76adf7c1d502e4bc170b80a8ef002da5806527b9591fac", - "sha256:d989d136982de4e3b29dabcc838ad581c64e8ed52c11fbe86ddebd9da0818cd5" - ], - "index": "pypi", - "markers": "python_version >= '3.7'", - "version": "==7.4.3" - }, "python-dateutil": { "hashes": [ "sha256:0123cacc1627ae19ddf3c27a5de5bd67ee4586fbdd6440d9748f8abb483d3e86", "sha256:961d03dc3453ebbc59dbdea9e4e11c5651520a876d0f4db161e8674aae935da9" ], - "markers": "python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2'", + "markers": "python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3'", "version": "==2.8.2" }, "requests": { @@ -811,7 +760,6 @@ "sha256:58cd2187c01e70e6e26505bca751777aa9f2ee0b7f4300988b709f44e013003f", "sha256:942c5a758f98d790eaed1a29cb6eefc7ffb0d1cf7af05c3d2791656dbd6ad1e1" ], - "index": "pypi", "markers": "python_version >= '3.7'", "version": "==2.31.0" }, @@ -830,20 +778,12 @@ ], "version": "==2023.12.1" }, - "s3transfer": { - "hashes": [ - "sha256:baa479dc2e63e5c2ed51611b4d46cdf0295e2070d8d0b86b22f335ee5b954986", - "sha256:e8d6bd52ffd99841e3a57b34370a54841f12d3aab072af862cdcc50955288002" - ], - "markers": "python_version >= '3.7'", - "version": "==0.8.0" - }, "six": { "hashes": [ "sha256:1e61c37477a1626458e36f7b1d82aa5c9b094fa4802892072e49de9c60c4c926", "sha256:8abb2f1d86890a2dfb989f9a77cfcfd3e47c2a354b01111771326f8aa26e0254" ], - "markers": "python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2'", + "markers": "python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3'", "version": "==1.16.0" }, "sortedcontainers": { @@ -861,14 +801,6 @@ "markers": "python_full_version >= '3.7.0'", "version": "==1.7.3" }, - "tomli": { - "hashes": [ - "sha256:939de3e7a6161af0c887ef91b7d41a53e7c5a1ca976325f429cb46ea9bc30ecc", - "sha256:de526c12914f0c550d15924c62d72abc48d6fe7364aa87328337a31007fe8a4f" - ], - "markers": "python_version < '3.11'", - "version": "==2.0.1" - }, "typing-extensions": { "hashes": [ "sha256:8f92fc8806f9a6b641eaa5318da32b44d401efaac0f6678c9bc448ba3605faa0", @@ -1058,5 +990,55 @@ "version": "==1.9.4" } }, - "develop": {} + "develop": { + "exceptiongroup": { + "hashes": [ + "sha256:4bfd3996ac73b41e9b9628b04e079f193850720ea5945fc96a08633c66912f14", + "sha256:91f5c769735f051a4290d52edd0858999b57e5876e9f85937691bd4c9fa3ed68" + ], + "markers": "python_version < '3.11'", + "version": "==1.2.0" + }, + "iniconfig": { + "hashes": [ + "sha256:2d91e135bf72d31a410b17c16da610a82cb55f6b0477d1a902134b24a455b8b3", + "sha256:b6a85871a79d2e3b22d2d1b94ac2824226a63c6b741c88f7ae975f18b6778374" + ], + "markers": "python_version >= '3.7'", + "version": "==2.0.0" + }, + "packaging": { + "hashes": [ + "sha256:048fb0e9405036518eaaf48a55953c750c11e1a1b68e0dd1a9d62ed0c092cfc5", + "sha256:8c491190033a9af7e1d931d0b5dacc2ef47509b34dd0de67ed209b5203fc88c7" + ], + "markers": "python_version >= '3.7'", + "version": "==23.2" + }, + "pluggy": { + "hashes": [ + "sha256:cf61ae8f126ac6f7c451172cf30e3e43d3ca77615509771b3a984a0730651e12", + "sha256:d89c696a773f8bd377d18e5ecda92b7a3793cbe66c87060a6fb58c7b6e1061f7" + ], + "markers": "python_version >= '3.8'", + "version": "==1.3.0" + }, + "pytest": { + "hashes": [ + "sha256:0d009c083ea859a71b76adf7c1d502e4bc170b80a8ef002da5806527b9591fac", + "sha256:d989d136982de4e3b29dabcc838ad581c64e8ed52c11fbe86ddebd9da0818cd5" + ], + "index": "pypi", + "markers": "python_version >= '3.7'", + "version": "==7.4.3" + }, + "tomli": { + "hashes": [ + "sha256:939de3e7a6161af0c887ef91b7d41a53e7c5a1ca976325f429cb46ea9bc30ecc", + "sha256:de526c12914f0c550d15924c62d72abc48d6fe7364aa87328337a31007fe8a4f" + ], + "markers": "python_version < '3.11'", + "version": "==2.0.1" + } + } } diff --git a/tabular-cdc-bootstrapper/cdc_bootstrap_handler.py b/tabular-cdc-bootstrapper/cdc_bootstrap_handler.py index d2720e2..d2d92b7 100644 --- a/tabular-cdc-bootstrapper/cdc_bootstrap_handler.py +++ b/tabular-cdc-bootstrapper/cdc_bootstrap_handler.py @@ -10,18 +10,17 @@ except ImportError: pass - import tabular - -# Tabular ENVs +# Tabular connectivity TABULAR_CREDENTIAL = os.environ['TABULAR_CREDENTIAL'] TABULAR_CATALOG_URI = os.environ['TABULAR_CATALOG_URI'] TABULAR_TARGET_WAREHOUSE = os.environ['TABULAR_TARGET_WAREHOUSE'] -# S3 Monitoring ENVs -S3_BUCKET_NAME = os.environ['S3_BUCKET_NAME'] -S3_BUCKET_PATH = os.environ.get('S3_BUCKET_PATH', '') # monitor the whole bucket if no path provided +# S3 Monitoring +S3_BUCKET_TO_MONITOR = os.environ['S3_BUCKET_TO_MONITOR'] +S3_PATH_TO_MONITOR = os.environ['S3_PATH_TO_MONITOR'] +S3_MONITORING_URI = f's3://{S3_BUCKET_TO_MONITOR}/{S3_PATH_TO_MONITOR}' # Set up logging @@ -37,8 +36,7 @@ def handle_new_file(event, context): logger.info(f"""Processing new bootstrap event... TABULAR_CATALOG_URI: {TABULAR_CATALOG_URI} TABULAR_TARGET_WAREHOUSE: {TABULAR_TARGET_WAREHOUSE} - S3_BUCKET_NAME: {S3_BUCKET_NAME} - S3_BUCKET_PATH: {S3_BUCKET_PATH} + S3_MONITORING_URI: {S3_MONITORING_URI} Object Key: {object_key} @@ -55,7 +53,21 @@ def handle_new_file(event, context): 'warehouse': TABULAR_TARGET_WAREHOUSE } - tabular.bootstrap_from_file(object_key, S3_BUCKET_PATH, catalog_properties) + if tabular.bootstrap_from_file(object_key, S3_MONITORING_URI, catalog_properties): + msg = 'Table successfully bootstrapped ✅' + logger.info(msg=msg) + return { + 'statusCode': 200, + 'body': json.dumps(msg) + } + + else: + msg = 'Nothing to do 🌞' + logger.info(msg=msg) + return { + 'statusCode': 200, + 'body': json.dumps(msg) + } except Exception as e: @@ -63,16 +75,13 @@ def handle_new_file(event, context): error_type = type(e).__name__ stack_info = traceback.format_exc() - return { - 'statusCode': 500, - 'errorType': error_type, - 'errorMessage': error_message, - 'stackTrace': stack_info, + resp = { + 'statusCode': 500, + 'errorType': error_type, + 'errorMessage': error_message, + 'stackTrace': stack_info, } + logger.error(f'\nFailed to bootstrap 🔴\n{resp}') - else: - return { - 'statusCode': 200, - 'body': json.dumps('Looks good to me 🌞') - } + return resp diff --git a/tabular-cdc-bootstrapper/serverless.yml b/tabular-cdc-bootstrapper/serverless.yml index 6979371..248c939 100644 --- a/tabular-cdc-bootstrapper/serverless.yml +++ b/tabular-cdc-bootstrapper/serverless.yml @@ -5,14 +5,15 @@ service: tabular-cdc-bootstrapper frameworkVersion: "3" custom: - s3_bucket_name: ${env:S3_BUCKET_NAME, 'you_can_hardcode_here'} - s3_bucket_path: ${env:S3_BUCKET_PATH, 'you_can_hardcode_here'} + s3_bucket_name: ${env:S3_BUCKET_TO_MONITOR, 'you_can_hardcode_here'} + s3_bucket_path: ${env:S3_PATH_TO_MONITOR, 'you_can_hardcode_here'} pythonRequirements: dockerizePip: true zip: true noDeploy: - pytest + - boto3 provider: name: aws @@ -37,9 +38,19 @@ functions: event: s3:ObjectCreated:* rules: - prefix: ${self:custom.s3_bucket_path} + - suffix: .parquet existing: true forceDeploy: true +package: + patterns: + - '!test*.py' + - '!*.parquet' + - '!.requirements.zip' + - '!.pytest_cache' + - '!.github' + - '!node_modules' + plugins: - serverless-python-requirements - serverless-dotenv-plugin diff --git a/tabular-cdc-bootstrapper/tabular.py b/tabular-cdc-bootstrapper/tabular.py index ce738dd..61cdba4 100644 --- a/tabular-cdc-bootstrapper/tabular.py +++ b/tabular-cdc-bootstrapper/tabular.py @@ -9,58 +9,44 @@ logger = logging.getLogger() logger.setLevel(logging.INFO) - -def extract_database_and_table(s3_key: str, prefix: str = ''): - """ - Extract the database and table name from the s3 key. - """ - relevant_s3_path = s3_key[len(prefix):].strip('/') - - try: - relevant_parent_folders = relevant_s3_path.split('/')[:-1] #ignore the actual file by dropping the last element (-1 index) - database = relevant_parent_folders[0] - table = relevant_parent_folders[1] - - return database, table - - except IndexError: - raise ValueError("The s3 key must have at least 2 subdirectory levels.") - - -def bootstrap_from_file(s3_key: str, s3_prefix:str, catalog_properties) -> str: - """ - Connects to an iceberg rest catalog with properties and bootstraps a new table if one doesn't already exist - - Returns: - bool: True when a table is created, False when it already exists. Is this a good pattern? Who can say. - """ - catalog = load_catalog('wh', **catalog_properties) - target_db_name, target_table_name = extract_database_and_table(s3_key, s3_prefix) - - # see if the table exists - try: - target_table = catalog.load_table(f'{target_db_name}.{target_table_name}') - logger.info(f"""Success - Existing table found in catalog... - s3_key: {s3_key} - s3_prefix: {s3_prefix} - target_db_name: {target_db_name} - target_table_name: {target_table_name} - - """) - - return False # if the table exists, we're done here 😎 - - except NoSuchTableError as nste: - # get to boot strappin! 💪 - logger.info(f"""Success - No table found in catalog, yet... - s3_key: {s3_key} - s3_prefix: {s3_prefix} - target_db_name: {target_db_name} - target_table_name: {target_table_name} - """) - - return True - +def parse_s3_monitoring_uri(s3_monitoring_uri: str) -> (str, str): + """ + Returns (bucket_name, monitoring_path) from the s3 uri to monitor for bootstrapping + """ + if not s3_monitoring_uri or not s3_monitoring_uri.startswith('s3://'): + raise ValueError(f'Valid s3_monitoring_uri values must start with "s3://". Instead got "{s3_monitoring_uri}"') + + uri_components = s3_monitoring_uri[len('s3://'):].split('/') + if [''] == uri_components: + raise ValueError(f'No bucket name found in s3_monitoring_uri - got "{s3_monitoring_uri}"') + + bucket_name = uri_components[0] + monitoring_path = '/'.join(uri_components[1:]) + return (bucket_name, monitoring_path) + +def extract_database_and_table(s3_object_path: str, s3_monitoring_path: str = ''): + """ + Extract the database and table name from the s3 object path, relative to + the s3_monitoring_path -> /some-path-to-monitor/{database}/{table_name} + """ + if not s3_object_path.startswith(s3_monitoring_path): + raise ValueError(f""" + s3_object_path must exist within s3_monitoring_path: + - s3_object_path: "{s3_object_path}" + - s3_monitoring_path: "{s3_monitoring_path}" + """) + + relevant_s3_path = s3_object_path[len(s3_monitoring_path):].strip('/') + + try: + relevant_parent_folders = relevant_s3_path.split('/')[:-1] #ignore the actual file by dropping the last element (-1 index) + database = relevant_parent_folders[0] + table = relevant_parent_folders[1] + + return database, table + + except IndexError: + raise ValueError("The s3 key must have at least 2 subdirectory levels.") def get_table_schema_from_parquet(parquet_io_object: BytesIO) -> dict: # read that schema @@ -68,14 +54,90 @@ def get_table_schema_from_parquet(parquet_io_object: BytesIO) -> dict: table = pq.read_table(source=parquet_io_object) return table.schema +def get_tabular_table_properties(file_loader_path: str, cdc_id_field: str = '', cdc_timestamp_field: str = '') -> dict: + """ + generates the appropriate tabular properties dictionary for an iceberg table requiring file loading + and cdc processing. + + Args: + - file_loader_path (str): s3 uri that should be monitored for new files to load. + For example: s3://randy-pitcher-workspace--aws/cdc-bootstrap/alpha/pyiceberg/some_data.parquet + + - cdc_id_field (str): column in the table representing the unique identity of each row in the cdc output. Often an id. + For example: 'customer_id'. This tells tabular whether to update or insert a row. + + - cdc_timestamp_field (str): column in the table representing the timestamp in the current timestamp to use to determine + which records belong to different points in time, specifically which records are the latest. + For example: 'last_updated_at'. + """ + properties = {} + + if file_loader_path: + # https://docs.tabular.io/tables#file-loader-properties + properties['fileloader.enabled'] = 'true' + properties['fileloader.path'] = file_loader_path + properties['fileloader.file-format'] = 'parquet' + properties['fileloader.write-mode'] = 'append' + properties['fileloader.evolve-schema'] = 'true' + + return properties + +def bootstrap_from_file(s3_object_path: str, s3_monitoring_uri:str, catalog_properties) -> str: + """ + Connects to an iceberg rest catalog with catalog_properties and bootstraps + a new table if one doesn't already exist + + Returns: + bool: True when a table is created, False when it already exists. Is this a good pattern? Who can say. + """ + catalog = load_catalog('wh', **catalog_properties) + bucket_name, monitoring_path = parse_s3_monitoring_uri(s3_monitoring_uri) + target_db_name, target_table_name = extract_database_and_table(s3_object_path, monitoring_path) + + # see if the table exists + try: + target_table = catalog.load_table(f'{target_db_name}.{target_table_name}') + logger.info(f""" + Success - Existing table found in catalog... + s3_object_path: {s3_object_path} + s3_monitoring_uri: {s3_monitoring_uri} + target_db_name: {target_db_name} + target_table_name: {target_table_name} + """) + + return False # if the table exists, we're done here 😎 + + + except NoSuchTableError as nste: + # get to boot strappin! 💪 + logger.info(f""" + Creating table... + s3_object_path: {s3_object_path} + s3_monitoring_uri: {s3_monitoring_uri} + target_db_name: {target_db_name} + target_table_name: {target_table_name} + """) + + file_loader_monitoring_path = f'{s3_monitoring_uri}/{target_db_name}/{target_table_name}' + create_file_loader_target_table( + file_loader_monitoring_path, + catalog=catalog, + database=target_db_name, + table=target_table_name + ) -def create_table_from_s3_path(s3_key: str, catalog, database: str, table: str): - """ - Creates an empty, columnless iceberg table with the given database and table name - in the provided iceberg catalog. + return True # good work, team 💪 - TODO: actually implement schema inference. Someday... +def create_file_loader_target_table(s3_uri_file_loader_directory: str, catalog, database: str, table: str): """ + Creates an empty, columnless iceberg table with the given + database and table name in the provided iceberg catalog. + """ + loader_dir = s3_uri_file_loader_directory.strip('/') + if not loader_dir.startswith('s3://'): + raise ValueError(f'valid s3 uri must be provided for file loader target table creation. Got: {s3_uri_file_loader_directory}') + if loader_dir.endswith('.parquet'): + raise ValueError(f'Expecting an s3 folder path but got: {s3_uri_file_loader_directory}') # Create the namespace if it doesn't exist try: @@ -84,16 +146,38 @@ def create_table_from_s3_path(s3_key: str, catalog, database: str, table: str): pass # Create 'db.table' + table_props = get_tabular_table_properties(loader_dir) + table_props['comment'] = f'created by cdc bootstrapper to monitor {s3_uri_file_loader_directory}' + print(table_props) catalog.create_table( identifier=f'{database}.{table}', schema={}, - properties={'comment': f'created by cdc bootstrapper from s3 file: {s3_key}'} + properties=table_props ) - -def bootstrap_load_table(s3_folder_path: str, warehouse: str, database: str, table: str): +def create_cdc_mirror_target_table(s3_uri_file_loader_directory: str, catalog, database: str, table: str): """ - Call tabular API to load data from the folder into the table + Creates an empty, columnless iceberg table with the given + database and table name in the provided iceberg catalog. """ - # TODO: Call the tabular API to load data - raise NotImplementedError + loader_dir = s3_uri_file_loader_directory.strip('/') + if not loader_dir.startswith('s3://'): + raise ValueError(f'valid s3 uri must be provided for file loader target table creation. Got: {s3_uri_file_loader_directory}') + if loader_dir.endswith('.parquet'): + raise ValueError(f'Expecting an s3 folder path but got: {s3_uri_file_loader_directory}') + + # Create the namespace if it doesn't exist + try: + catalog.create_namespace(database) + except NamespaceAlreadyExistsError as naee: + pass + + # Create 'db.table' + table_props = get_tabular_table_properties(loader_dir) + table_props['comment'] = f'created by cdc bootstrapper to monitor {s3_uri_file_loader_directory}' + print(table_props) + catalog.create_table( + identifier=f'{database}.{table}', + schema={}, + properties=table_props + ) diff --git a/tabular-cdc-bootstrapper/test_tabular.py b/tabular-cdc-bootstrapper/test_tabular.py index 14583a4..1394a74 100644 --- a/tabular-cdc-bootstrapper/test_tabular.py +++ b/tabular-cdc-bootstrapper/test_tabular.py @@ -18,7 +18,6 @@ class TestTabular: catalog = load_catalog(**CATALOG_PROPERTIES) - def test_extract_database_and_table(self): s3_key = 'cdc-bootstrap/alpha/gazebo/my-file.json' @@ -33,21 +32,30 @@ def test_extract_database_and_table(self): with pytest.raises(ValueError): tabular.extract_database_and_table(s3_key, 'cdc-bootstrap/alpha') + + with pytest.raises(ValueError): + tabular.extract_database_and_table(s3_key, 'non/overlapping/monitoring/path') def test_bootstrap_from_file(self): test_cases = { - 'table_exists': ('cdc-bootstrap/system/catalog_events/my-file.json', 'cdc-bootstrap', False), - 'table_missing': ('cdc-bootstrap/fingers-crossed-this-doesnt-exist/missing-table/my-file.json', 'cdc-bootstrap', True), - 'database_missing': ('cdc-bootstrap/pyiceberg/alpha/my-file.json', '', True) + 'table_exists': ('cdc-bootstrap/system/catalog_events/my-file.json', 's3://some_bucket/cdc-bootstrap', False), + 'table_missing': ('cdc-bootstrap/_test_cdc_bootloader/missing-table/my-file.json', 's3://some_bucket/cdc-bootstrap', True), } - for key in test_cases: - test_case = test_cases[key] - assert tabular.bootstrap_from_file(test_case[0], test_case[1], self.CATALOG_PROPERTIES) == test_case[2] + try: + for key in test_cases: + test_case = test_cases[key] + assert tabular.bootstrap_from_file(test_case[0], test_case[1], self.CATALOG_PROPERTIES) == test_case[2] + + # test some junk + with pytest.raises(ValueError): + tabular.bootstrap_from_file('lkdfj.jdsfskl', 'fdassdf', {}) - # test some junk - with pytest.raises(ValueError): - tabular.bootstrap_from_file('lkdfj.jdsfskl', 'fdassdf', {}) + # cleanup the missing table or future tests will always fail + finally: + target_db_name, target_table_name = test_cases['table_missing'][0].split('/')[1:-1] + self.catalog.drop_table(f'{target_db_name}.{target_table_name}') + self.catalog.drop_namespace(target_db_name) def test_get_table_schema_from_parquet(self): test_sets = [ @@ -72,13 +80,13 @@ def test_get_table_schema_from_parquet(self): # assert 💪 assert set(actual_schema.names) == expected_field_names - def test_create_table_from_s3_path(self): - mock_s3_key = 'cdc-bootstrap/pyiceberg/_test_create_table_from_s3_path/my-file.json' + def test_create_file_loader_target_table(self): target_db_name = '_test_cdc_bootloader' target_table_name = '_test_create_table_from_s3_path' + mock_s3_target_uri = f's3://some-bucket/cdc-bootstrap/{target_db_name}/{target_table_name}' try: - tabular.create_table_from_s3_path(s3_key=mock_s3_key, catalog=self.catalog, database=target_db_name, table=target_table_name) + tabular.create_file_loader_target_table(mock_s3_target_uri, catalog=self.catalog, database=target_db_name, table=target_table_name) actual_table = self.catalog.load_table(f'{target_db_name}.{target_table_name}') expected_table_name = ('default', target_db_name, target_table_name) @@ -89,3 +97,19 @@ def test_create_table_from_s3_path(self): self.catalog.drop_table(f'{target_db_name}.{target_table_name}') self.catalog.drop_namespace(target_db_name) + def test_parse_s3_monitoring_uri(self): + test_cases = [ + ('s3://some_bucket', ('some_bucket', '')), + ('s3://some_bucket/some_path', ('some_bucket', 'some_path')), + ('s3://some_bucket/some/nested/path', ('some_bucket', 'some/nested/path')) + ] + + for tc in test_cases: + assert tc[1] == tabular.parse_s3_monitoring_uri(tc[0]) + + with pytest.raises(ValueError): + tabular.parse_s3_monitoring_uri('feefasdfasdflkjh') + with pytest.raises(ValueError): + tabular.parse_s3_monitoring_uri('s3://') + with pytest.raises(ValueError): + tabular.parse_s3_monitoring_uri('')