Skip to content

Commit

Permalink
small tweak to config file
Browse files Browse the repository at this point in the history
  • Loading branch information
randypitcherii committed Dec 8, 2023
1 parent ece4aea commit f84c539
Show file tree
Hide file tree
Showing 5 changed files with 39 additions and 11 deletions.
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ pipenv install
- [configure serverless for your AWS account.](https://www.serverless.com/framework/docs/providers/aws/guide/credentials)
- update serverless.yml with your specific configs, including tabular credentials. You may also provide a `.env` file if you prefer. Place the file alongside the `serverless.yml` file in the same directory 💪
```.env
S3_MONITORING_URI=s3://randy-pitcher-workspace--aws/cdc-bootstrap
S3_BUCKET_TO_MONITOR=randy-pitcher-workspace--aws
S3_PATH_TO_MONITOR=cdc-bootstrap
TABULAR_TARGET_WAREHOUSE=enterprise_data_warehouse
TABULAR_CREDENTIAL=t-1234:123123123 # needs permission to create database in a warehouse and list all existing objects in a warehouse
Expand Down
8 changes: 5 additions & 3 deletions tabular-cdc-bootstrapper/cdc_bootstrap_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,15 @@

import tabular

# Tabular ENVs
# Tabular connectivity
TABULAR_CREDENTIAL = os.environ['TABULAR_CREDENTIAL']
TABULAR_CATALOG_URI = os.environ['TABULAR_CATALOG_URI']
TABULAR_TARGET_WAREHOUSE = os.environ['TABULAR_TARGET_WAREHOUSE']

# S3 Monitoring ENVs
S3_MONITORING_URI = os.environ['S3_MONITORING_URI']
# S3 Monitoring
S3_BUCKET_TO_MONITOR = os.environ['S3_BUCKET_TO_MONITOR']
S3_PATH_TO_MONITOR = os.environ['S3_PATH_TO_MONITOR']
S3_MONITORING_URI = f'{S3_BUCKET_TO_MONITOR}/{S3_PATH_TO_MONITOR}'


# Set up logging
Expand Down
4 changes: 2 additions & 2 deletions tabular-cdc-bootstrapper/serverless.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ service: tabular-cdc-bootstrapper
frameworkVersion: "3"

custom:
s3_bucket_name: ${env:S3_BUCKET_NAME, 'you_can_hardcode_here'}
s3_bucket_path: ${env:S3_BUCKET_PATH, 'you_can_hardcode_here'}
s3_bucket_name: ${env:S3_BUCKET_TO_MONITOR, 'you_can_hardcode_here'}
s3_bucket_path: ${env:S3_PATH_TO_MONITOR, 'you_can_hardcode_here'}

pythonRequirements:
dockerizePip: true
Expand Down
31 changes: 28 additions & 3 deletions tabular-cdc-bootstrapper/tabular.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ def bootstrap_from_file(s3_object_path: str, s3_monitoring_uri:str, catalog_prop
""")

file_loader_monitoring_path = f'{s3_monitoring_uri}/{target_db_name}/{target_table_name}'
create_file_loader_target(
create_file_loader_target_table(
file_loader_monitoring_path,
catalog=catalog,
database=target_db_name,
Expand All @@ -128,7 +128,7 @@ def bootstrap_from_file(s3_object_path: str, s3_monitoring_uri:str, catalog_prop

return True # good work, team 💪

def create_file_loader_target(s3_uri_file_loader_directory: str, catalog, database: str, table: str):
def create_file_loader_target_table(s3_uri_file_loader_directory: str, catalog, database: str, table: str):
"""
Creates an empty, columnless iceberg table with the given
database and table name in the provided iceberg catalog.
Expand All @@ -155,4 +155,29 @@ def create_file_loader_target(s3_uri_file_loader_directory: str, catalog, databa
properties=table_props
)

# todo: create_cdc_mirror_target
def create_cdc_mirror_target_table(s3_uri_file_loader_directory: str, catalog, database: str, table: str):
"""
Creates an empty, columnless iceberg table with the given
database and table name in the provided iceberg catalog.
"""
loader_dir = s3_uri_file_loader_directory.strip('/')
if not loader_dir.startswith('s3://'):
raise ValueError(f'valid s3 uri must be provided for file loader target table creation. Got: {s3_uri_file_loader_directory}')
if loader_dir.endswith('.parquet'):
raise ValueError(f'Expecting an s3 folder path but got: {s3_uri_file_loader_directory}')

# Create the namespace if it doesn't exist
try:
catalog.create_namespace(database)
except NamespaceAlreadyExistsError as naee:
pass

# Create 'db.table'
table_props = get_tabular_table_properties(loader_dir)
table_props['comment'] = f'created by cdc bootstrapper to monitor {s3_uri_file_loader_directory}'
print(table_props)
catalog.create_table(
identifier=f'{database}.{table}',
schema={},
properties=table_props
)
4 changes: 2 additions & 2 deletions tabular-cdc-bootstrapper/test_tabular.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,13 +80,13 @@ def test_get_table_schema_from_parquet(self):
# assert 💪
assert set(actual_schema.names) == expected_field_names

def test_create_file_loader_target(self):
def test_create_file_loader_target_table(self):
target_db_name = '_test_cdc_bootloader'
target_table_name = '_test_create_table_from_s3_path'
mock_s3_target_uri = f's3://some-bucket/cdc-bootstrap/{target_db_name}/{target_table_name}'

try:
tabular.create_file_loader_target(mock_s3_target_uri, catalog=self.catalog, database=target_db_name, table=target_table_name)
tabular.create_file_loader_target_table(mock_s3_target_uri, catalog=self.catalog, database=target_db_name, table=target_table_name)
actual_table = self.catalog.load_table(f'{target_db_name}.{target_table_name}')

expected_table_name = ('default', target_db_name, target_table_name)
Expand Down

0 comments on commit f84c539

Please sign in to comment.