Skip to content

Commit

Permalink
Create and assign CaDeT domains (#138)
Browse files Browse the repository at this point in the history
* Added a new source which creates domains from the CaDeT manifest
* Added a transformer which builds upon the dbt ingestion and assigns those domains to CaDet models
* The transformer has been adapted from the existing PatternAddDatasetDomain transformer written by DataHub.
* A urn is formed for every dataset from the manifest which maps to a domain.
* Domains in CaDeT are not assigned to sources, only models, so out of 505 datasets currently ingested, only ~310 have domains (the rest are sources).
* Changed dependency management to poetry for alignment with our other projects
* Adjusted dbt ingestion workflows to use new source and transformer
* The cadet ingestion source (dbt) can't create domains, so the creation of domains happens before dbt ingestion
  • Loading branch information
murdo-moj committed Jun 6, 2024
1 parent 430bbf0 commit 1cbc0bd
Show file tree
Hide file tree
Showing 21 changed files with 3,016 additions and 72 deletions.
34 changes: 31 additions & 3 deletions .github/workflows/ingest-cadet-metadata.yml
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,42 @@ jobs:
with:
role-to-assume: ${{ secrets.CADET_METADATA_ROLE_TO_ASSUME }}
aws-region: ${{ inputs.ECR_REGION }}
- name: install reqs
run: pip install acryl-datahub[dbt]

- name: cache poetry install
uses: actions/cache@v4
with:
path: ~/.local
key: poetry-1.7.1-0

- uses: snok/install-poetry@v1
with:
version: 1.7.1
virtualenvs-create: true
virtualenvs-in-project: true

- name: cache deps
id: cache-deps
uses: actions/cache@v4
with:
path: .venv
key: pydeps-${{ hashFiles('**/poetry.lock') }}
- run: poetry install --no-interaction --no-root
if: steps.cache-deps.outputs.cache-hit != 'true'
- run: poetry install --no-interaction

- name: create cadet domains
env:
DATAHUB_GMS_TOKEN: ${{ secrets.DATAHUB_GMS_TOKEN }}
DATAHUB_GMS_URL: ${{ vars.DATAHUB_GMS_URL }}
DATAHUB_TELEMETRY_ENABLED: false
run: poetry run datahub ingest -c ingestion/create_derived_table_domains.yaml

- name: push metadata to datahub
env:
DATAHUB_GMS_TOKEN: ${{ secrets.DATAHUB_GMS_TOKEN }}
DATAHUB_GMS_URL: ${{ vars.DATAHUB_GMS_URL }}
DATAHUB_TELEMETRY_ENABLED: false
run: datahub ingest -c ingestion/cadet.yaml
run: poetry run datahub ingest -c ingestion/cadet.yaml

- name: Notify on failure
uses: slackapi/slack-github-action@v1.26.0
Expand Down
35 changes: 25 additions & 10 deletions .github/workflows/python-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ on:
paths:
- "scripts/**.py"
- "tests/**.py"

jobs:
python-unit-tests:
runs-on: ubuntu-latest
Expand All @@ -23,13 +23,28 @@ jobs:
with:
python-version: "3.11"

- name: Install Python dependencies
run: |
pip install --upgrade pip
pip install --no-cache-dir -r requirements.txt
pip install --no-cache-dir -r requirements-dev.txt
- name: cache poetry install
uses: actions/cache@v4
with:
path: ~/.local
key: poetry-1.7.1-0

- uses: snok/install-poetry@v1
with:
version: 1.7.1
virtualenvs-create: true
virtualenvs-in-project: true

- name: cache deps
id: cache-deps
uses: actions/cache@v4
with:
path: .venv
key: pydeps-${{ hashFiles('**/poetry.lock') }}
- run: poetry install --no-interaction --no-root
if: steps.cache-deps.outputs.cache-hit != 'true'
- run: poetry install --no-interaction

- name: run unit tests
id: python-tests
run: pytest tests/
- name: run pytest
id: pytest
run: poetry run pytest
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -60,3 +60,4 @@ env/
*.code-workspace
*.sha256
terraform.tfstate
__pycache__
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import pandas as pd
import logging
import boto3
import yaml
import os
from pathlib import Path
from typing import Dict

import boto3
import pandas as pd
import yaml

logging.basicConfig()


Expand All @@ -19,14 +20,11 @@ def get_data(bucket: str, key: str) -> pd.DataFrame:
logging.info("Data has been collected from {}/{}".format(bucket, key))
return pd.read_csv(response.get("Body"))
else:
logging.error(
"Bucket {} or file {} does not exist".format(bucket, key))
logging.error("Bucket {} or file {} does not exist".format(bucket, key))
raise


def get_tables(
bucket: str, key: str, source_data: str
) -> Dict[str, list[str]]:
def get_tables(bucket: str, key: str, source_data: str) -> Dict[str, list[str]]:
s3_client = boto3.client("s3")
product_name = Path(key).parts[1]
yaml_key = os.path.join(
Expand All @@ -37,8 +35,9 @@ def get_tables(
tables_dict = {}
for key in data_dict:
tables_dict[key] = [
table for table in data_dict[key]['tables']
if data_dict[key]['tables'][table]['source_data'] == source_data
table
for table in data_dict[key]["tables"]
if data_dict[key]["tables"][table]["source_data"] == source_data
]

return tables_dict
Expand All @@ -54,9 +53,7 @@ def generate_report(bucket: str, key: str) -> Dict[str, pd.DataFrame]:
for table in tables:
# e.g. One loop of this might evaluate to `table_1(bucket, key, data)`
# and execute one of the functions below with the correct arguments
results_dict[database][table] = eval(
table + "(bucket, key, raw_data)"
)
results_dict[database][table] = eval(table + "(bucket, key, raw_data)")

return results_dict

Expand All @@ -65,48 +62,39 @@ def generate_report(bucket: str, key: str) -> Dict[str, pd.DataFrame]:
# The names of each function need to match up with the tables
# as defined in the metadata data-dictionary.yml file


# function to create table 1
def adj_example_1(
bucket: str,
key: str,
raw_data: pd.DataFrame
) -> pd.DataFrame:
def adj_example_1(bucket: str, key: str, raw_data: pd.DataFrame) -> pd.DataFrame:
# group by establishment, religion, offence and get count offence
transformed_data = raw_data.value_counts(
subset=["Establishment", "Religion", "Offence"], sort=False).reset_index()
transformed_data.columns = [
"Establishment", "Religion", "Offence", "Count"]
subset=["Establishment", "Religion", "Offence"], sort=False
).reset_index()
transformed_data.columns = ["Establishment", "Religion", "Offence", "Count"]

logging.info("Data is transformed")
return transformed_data


# function to create table 2
def adj_example_2(
bucket: str,
key: str,
raw_data: pd.DataFrame
) -> pd.DataFrame:
def adj_example_2(bucket: str, key: str, raw_data: pd.DataFrame) -> pd.DataFrame:

transformed_data = raw_data.value_counts(
subset=["Establishment"], sort=False).reset_index()
transformed_data.columns = [
"Establishment", "Count"]
subset=["Establishment"], sort=False
).reset_index()
transformed_data.columns = ["Establishment", "Count"]

logging.info("Data is transformed")
return transformed_data


def punishments_example_1(
bucket: str,
key: str,
raw_data: pd.DataFrame
bucket: str, key: str, raw_data: pd.DataFrame
) -> pd.DataFrame:

transformed_data = raw_data.value_counts(
subset=["Establishment"], sort=False).reset_index()
transformed_data.columns = [
"Establishment", "Count"]
subset=["Establishment"], sort=False
).reset_index()
transformed_data.columns = ["Establishment", "Count"]

logging.info("Data is transformed")
return transformed_data
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import zipfile
import logging
import glob
import logging
import os
import zipfile

logging.getLogger().setLevel(logging.INFO)
file_directory = os.path.dirname(os.path.abspath(__file__))
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import boto3
import os
import pytest

import boto3
import pytest
from moto import mock_s3

aws_region = "eu-west-1"
Expand All @@ -11,13 +11,13 @@
def tests_env_setup_and_teardown():

TEMP_ENV_VARS = {
"AWS_ACCESS_KEY_ID": 'testing',
"AWS_SECRET_ACCESS_KEY": 'testing',
"AWS_SECURITY_TOKEN": 'testing',
"AWS_SESSION_TOKEN": 'testing',
"AWS_ACCESS_KEY_ID": "testing",
"AWS_SECRET_ACCESS_KEY": "testing",
"AWS_SECURITY_TOKEN": "testing",
"AWS_SESSION_TOKEN": "testing",
"AWS_DEFAULT_REGION": aws_region,
"IAM_ROLE": "test_iam",
"BOTO_CONFIG": "/dev/null"
"BOTO_CONFIG": "/dev/null",
}

# Will be executed before the first test
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
from pathlib import Path
import os
from pathlib import Path

print(os.path.join(Path(__file__).parent.absolute(),
"test_metadata", "02-data-dictionary.yaml"))
print(
os.path.join(
Path(__file__).parent.absolute(), "test_metadata", "02-data-dictionary.yaml"
)
)
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
from application.transform import generate_report, get_tables
import os
from pathlib import Path
from unittest.mock import patch

import pandas as pd
import pytest
from pathlib import Path
import os
from application.transform import generate_report, get_tables


@pytest.fixture
Expand All @@ -24,8 +25,7 @@ def s3_test_bucket(s3_mock_client, bucket_name):
os.path.dirname(os.path.abspath(__file__)), "..", "data"
)

sample_input = pd.read_csv(os.path.join(
data_product_directory, "sample_input.csv"))
sample_input = pd.read_csv(os.path.join(data_product_directory, "sample_input.csv"))


table_dict = {"product_for_test": ["adj_example_1"]}
Expand Down Expand Up @@ -53,8 +53,7 @@ def test_generate_report(mocked_data, mocked_tables):
def test_get_tables(s3_mock_client, s3_test_bucket):
s3_mock_client.upload_file(
os.path.join(
Path(__file__).parent.absolute(
), "test_metadata", "02-data-dictionary.yaml"
Path(__file__).parent.absolute(), "test_metadata", "02-data-dictionary.yaml"
),
"product-test-bucket",
"code/product_for_test/extracted/metadata/02-data-dictionary.yml",
Expand Down
5 changes: 5 additions & 0 deletions ingestion/cadet.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,12 @@ source:
test_definitions: "YES"
stateful_ingestion:
remove_stale_metadata: true

transformers:
- type: "ingestion.transformers.assign_cadet_domains.AssignDerivedTableDomains"
config:
manifest_s3_uri: "s3://mojap-derived-tables/prod/run_artefacts/latest/target/manifest.json"
replace_existing: true
- type: "add_dataset_tags"
config:
get_tags_to_add: "ingestion.cadet_display_in_catalogue_tagger.add_display_in_catalogue_tag"
4 changes: 4 additions & 0 deletions ingestion/create_derived_table_domains.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
source:
type: "ingestion.create_derived_table_domains_source.source.CreateDerivedTableDomains"
config:
manifest_s3_uri: "s3://mojap-derived-tables/prod/run_artefacts/latest/target/manifest.json"
8 changes: 8 additions & 0 deletions ingestion/create_derived_table_domains_source/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
from datahub.configuration.common import ConfigModel
from pydantic import Field


class CreateDerivedTableDomainsConfig(ConfigModel):
manifest_s3_uri: str = Field(
description="s3 path to dbt manifest json", default=None
)
Loading

0 comments on commit 1cbc0bd

Please sign in to comment.