Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat/fmd-206 add ingestion timing #213

Merged
merged 5 commits into from
Aug 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions .github/workflows/ingest-cadet-metadata.yml
Original file line number Diff line number Diff line change
Expand Up @@ -71,14 +71,14 @@ jobs:
DATAHUB_GMS_TOKEN: ${{ secrets.DATAHUB_GMS_TOKEN }}
DATAHUB_GMS_URL: ${{ vars.DATAHUB_GMS_URL }}
DATAHUB_TELEMETRY_ENABLED: false
run: poetry run datahub ingest -c ingestion/create_cadet_databases.yaml
run: time poetry run datahub ingest -c ingestion/create_cadet_databases.yaml

- name: push metadata to datahub
env:
DATAHUB_GMS_TOKEN: ${{ secrets.DATAHUB_GMS_TOKEN }}
DATAHUB_GMS_URL: ${{ vars.DATAHUB_GMS_URL }}
DATAHUB_TELEMETRY_ENABLED: false
run: poetry run datahub ingest -c ingestion/cadet.yaml
run: time poetry run datahub ingest -c ingestion/cadet.yaml

- name: Notify on failure
uses: slackapi/slack-github-action@v1.26.0
Expand All @@ -87,7 +87,7 @@ jobs:
channel-id: "C071VNHPUHZ"
payload: |
{
"text": ":warning: Unable to ingest CaDeT metadata on ${{inputs.env}}!",
"text": ":warning: Unable to ingest CaDeT metadata on ${{inputs.ENVIRONMENT}}!",
"blocks": [
{
"type": "section",
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/ingest-justice-data.yml
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ jobs:
DATAHUB_GMS_TOKEN: ${{ secrets.DATAHUB_GMS_TOKEN }}
DATAHUB_GMS_URL: ${{ vars.DATAHUB_GMS_URL }}
DATAHUB_TELEMETRY_ENABLED: false
run: poetry run datahub ingest -c ingestion/justice_data_ingest.yaml
run: time poetry run datahub ingest -c ingestion/justice_data_ingest.yaml

- name: Notify on failure
uses: slackapi/slack-github-action@v1.26.0
Expand All @@ -72,7 +72,7 @@ jobs:
channel-id: "C071VNHPUHZ"
payload: |
{
"text": ":warning: Unable to ingest Justice Data metadata on ${{inputs.env}}!",
"text": ":warning: Unable to ingest Justice Data metadata on ${{inputs.ENVIRONMENT}}!",
"blocks": [
{
"type": "section",
Expand Down
8 changes: 7 additions & 1 deletion ingestion/create_cadet_databases_source/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,17 @@
get_cadet_manifest,
validate_fqn,
)
from ingestion.utils import report_generator_time, report_time

logging.basicConfig(level=logging.DEBUG)


@config_class(CreateCadetDatabasesConfig)
class CreateCadetDatabases(Source):
source_config: CreateCadetDatabasesConfig
report: SourceReport = SourceReport()

@report_time
def __init__(self, config: CreateCadetDatabasesConfig, ctx: PipelineContext):
super().__init__(ctx)
self.source_config = config
Expand All @@ -35,6 +39,7 @@ def create(cls, config_dict, ctx):
config = CreateCadetDatabasesConfig.parse_obj(config_dict)
return cls(config, ctx)

@report_generator_time
def get_workunits(self) -> Iterable[MetadataWorkUnit]:
manifest = get_cadet_manifest(self.source_config.manifest_s3_uri)

Expand All @@ -51,7 +56,7 @@ def get_workunits(self) -> Iterable[MetadataWorkUnit]:
databases_with_domains, display_tags = (
self._get_databases_with_domains_and_display_tags(manifest)
)
sub_types = [DatasetContainerSubTypes.DATABASE]
sub_types: list[str] = [DatasetContainerSubTypes.DATABASE]
last_modified = int(datetime.now().timestamp())
for database, domain in databases_with_domains:
database_container_key = mcp_builder.DatabaseKey(
Expand Down Expand Up @@ -89,6 +94,7 @@ def _get_domains(self, manifest) -> set[str]:
if manifest["nodes"][node]["resource_type"] == "model"
)

@report_time
def _get_databases_with_domains_and_display_tags(
self, manifest
) -> tuple[set[tuple[str, str]], dict]:
Expand Down
6 changes: 5 additions & 1 deletion ingestion/ingestion_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,12 @@
from datahub.ingestion.graph.client import DatahubClientConfig, DataHubGraph

from ingestion.config import ENV, INSTANCE, PLATFORM
from ingestion.utils import report_time

logging.basicConfig(level=logging.DEBUG)


@report_time
def get_cadet_manifest(manifest_s3_uri: str) -> Dict:
try:
s3 = boto3.client("s3")
Expand Down Expand Up @@ -49,7 +53,7 @@ def validate_fqn(fqn: list[str]) -> bool:
f"{table_name=} has multiple double underscores which will confuse parsing"
)

match = re.match(r"\w+__\w+", table_name)
match: re.Match[str] | None = re.match(r"\w+__\w+", table_name)
if match:
return True
if not match:
Expand Down
6 changes: 5 additions & 1 deletion ingestion/justice_data_source/source.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import logging
from io import BufferedReader
from typing import Iterable, Optional

Expand Down Expand Up @@ -25,7 +26,6 @@
)
from datahub.metadata.com.linkedin.pegasus2avro.mxe import MetadataChangeEvent
from datahub.metadata.schema_classes import (
BrowsePathsV2Class,
ChangeTypeClass,
ChartInfoClass,
CorpGroupInfoClass,
Expand All @@ -38,10 +38,13 @@
)

from ingestion.ingestion_utils import list_datahub_domains
from ingestion.utils import report_generator_time

from .api_client import JusticeDataAPIClient
from .config import JusticeDataAPIConfig

logging.basicConfig(level=logging.DEBUG)


@platform_name("File")
@config_class(JusticeDataAPIConfig)
Expand Down Expand Up @@ -72,6 +75,7 @@ def create(cls, config_dict, ctx):
config = JusticeDataAPIConfig.parse_obj(config_dict)
return cls(ctx, config)

@report_generator_time
def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
all_chart_data = self.client.list_all(self.config.exclude_id_list)

Expand Down
2 changes: 2 additions & 0 deletions ingestion/taggers/display_in_catalogue_tagger.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import datahub.emitter.mce_builder as builder
from datahub.metadata.schema_classes import TagAssociationClass

logging.basicConfig(level=logging.DEBUG)


def add_display_in_catalogue_tag(entity_urn: str) -> List[TagAssociationClass]:
"""
Expand Down
5 changes: 5 additions & 0 deletions ingestion/transformers/assign_cadet_databases.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
parse_database_and_table_names,
validate_fqn,
)
from ingestion.utils import report_time

logging.basicConfig(level=logging.DEBUG)


class AssignCadetDatabasesConfig(ConfigModel):
Expand Down Expand Up @@ -49,6 +52,7 @@ def transform_aspect(
) -> Optional[Aspect]:
return None

@report_time
def handle_end_of_stream(
self,
) -> List[Union[MetadataChangeProposalWrapper, MetadataChangeProposalClass]]:
Expand Down Expand Up @@ -77,6 +81,7 @@ def handle_end_of_stream(

return mcps

@report_time
def _get_table_database_mappings(self, manifest) -> Dict[str, str]:
mappings = {}
for node in manifest["nodes"]:
Expand Down
47 changes: 29 additions & 18 deletions ingestion/transformers/assign_cadet_domains.py
Original file line number Diff line number Diff line change
@@ -1,28 +1,22 @@
from typing import Callable, Union
from typing import Iterable

from datahub.configuration.common import (
KeyValuePattern,
TransformerSemanticsConfigModel,
)
from datahub.configuration.import_resolver import pydantic_resolve_key
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.transformer.dataset_domain import AddDatasetDomain
from datahub.ingestion.api.common import PipelineContext, RecordEnvelope
from datahub.ingestion.transformer.dataset_domain import (
AddDatasetDomain,
AddDatasetDomainSemanticsConfig,
)
from datahub.metadata.schema_classes import DomainsClass

from ingestion.ingestion_utils import (
convert_cadet_manifest_table_to_datahub,
get_cadet_manifest,
validate_fqn,
)


class AddDatasetDomainSemanticsConfig(TransformerSemanticsConfigModel):
get_domains_to_add: Union[
Callable[[str], DomainsClass],
Callable[[str], DomainsClass],
]

_resolve_domain_fn = pydantic_resolve_key("get_domains_to_add")
from ingestion.utils import Stopwatch, report_time


class PatternDatasetDomainSemanticsConfig(TransformerSemanticsConfigModel):
Expand All @@ -39,26 +33,42 @@ class AssignCadetDomains(AddDatasetDomain):
def __init__(self, config: CadetDatasetDomainSemanticsConfig, ctx: PipelineContext):
AddDatasetDomain.raise_ctx_configuration_error(ctx)
manifest = get_cadet_manifest(config.manifest_s3_uri)
domain_mappings = self._get_domain_mapping(manifest)
domain_pattern = domain_mappings.domain_pattern
domain_mappings: PatternDatasetDomainSemanticsConfig = self._get_domain_mapping(
manifest
)
domain_pattern: KeyValuePattern = domain_mappings.domain_pattern

def resolve_domain(domain_urn: str) -> DomainsClass:
domains = domain_pattern.value(domain_urn)
return self.get_domain_class(ctx.graph, domains)

generic_config = AddDatasetDomainSemanticsConfig(
get_domains_to_add=resolve_domain,
semantics=config.semantics,
replace_existing=config.replace_existing,
get_domains_to_add=resolve_domain,
)

self.transform_timer = Stopwatch(transformer="AssignCadetDomains")

super().__init__(generic_config, ctx)

def _should_process(self, record):
if not self.transform_timer.running:
self.transform_timer.start()
return super()._should_process(record)

def _handle_end_of_stream(
self, envelope: RecordEnvelope
) -> Iterable[RecordEnvelope]:
self.transform_timer.stop()
self.transform_timer.report()
return super()._handle_end_of_stream(envelope)

@classmethod
def create(cls, config_dict, ctx: PipelineContext) -> "AssignCadetDomains":
try:
manifest_s3_uri = config_dict.get("manifest_s3_uri")
replace_existing = config_dict.get("replace_existing", False)
manifest_s3_uri: str = config_dict.get("manifest_s3_uri", "")
replace_existing: bool = config_dict.get("replace_existing", False)
except Exception as e:
print(e)
raise
Expand All @@ -69,6 +79,7 @@ def create(cls, config_dict, ctx: PipelineContext) -> "AssignCadetDomains":
)
return cls(config_dict, ctx)

@report_time
def _get_domain_mapping(self, manifest) -> PatternDatasetDomainSemanticsConfig:
"""Map regex patterns for tables to domains"""
nodes = manifest.get("nodes")
Expand Down
88 changes: 88 additions & 0 deletions ingestion/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
import logging
import time
from datetime import timedelta

logging.basicConfig(level=logging.DEBUG)


def report_time(func):
"""
Decorator to report the total time of a function call
"""

def wrapped_func(*args, **kwargs):
arg_types = [type(arg) for arg in args]
stopwatch = Stopwatch(
function=func.__name__, arg_types=arg_types, kwargs=kwargs
)

stopwatch.start()

r = func(*args, **kwargs)

stopwatch.stop()
stopwatch.report()

return r

return wrapped_func


def report_generator_time(func):
"""
Decorator to report the total time of an iterable
"""

def wrapped_func(*args, **kwargs):
arg_types = [type(arg) for arg in args]
stopwatch = Stopwatch(
function=func.__name__, arg_types=arg_types, kwargs=kwargs
)

stopwatch.start()

r = func(*args, **kwargs)
yield from r

stopwatch.stop()
stopwatch.report()

return r

return wrapped_func


class Stopwatch:
"""
Wrapper around the time module for timing code execution
"""

def __init__(self, **meta):
self.running = False
self.start_time = None
self.stop_time = None
self.elapsed = 0
joined_meta = ", ".join(f"{k}={v}" for k, v in meta.items())
self.prefix = f"TIMING: {joined_meta}, " if joined_meta else "TIMING: "

def start(self):
self.start_time = time.time()
self.running = True

def stop(self):
self.running = False
if not self.start_time:
return

now = time.time()
elapsed = now - self.start_time
self.stop_time = now
self.elapsed += elapsed

def report(self):
logging.info(
f"{self.prefix}"
f"start_time={time.strftime('%Y-%m-%d %H:%M:%S', time.gmtime(self.start_time))}, "
f"end_time={time.strftime('%Y-%m-%d %H:%M:%S', time.gmtime(self.stop_time))}, "
f"elapsed_time={str(timedelta(seconds=self.elapsed))}"
)
Loading
Loading