Skip to content

Commit

Permalink
perf: time ingestion sources and transformers
Browse files Browse the repository at this point in the history
- add calls to `time` unix command to `datahub ingest` calls
- add decorators for timing function/iterator runs
- use the timers for
  - create_cadet_databases_source
  - justice_data_source
  - `AssignCadetDatabases transformer

Co-authored-by: Mat Moore <mat.moore@noreply.github.com>
  • Loading branch information
2 people authored and MatMoore committed Aug 7, 2024
1 parent 65979d4 commit 13b9de5
Show file tree
Hide file tree
Showing 10 changed files with 199 additions and 5 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/ingest-cadet-metadata.yml
Original file line number Diff line number Diff line change
Expand Up @@ -71,14 +71,14 @@ jobs:
DATAHUB_GMS_TOKEN: ${{ secrets.DATAHUB_GMS_TOKEN }}
DATAHUB_GMS_URL: ${{ vars.DATAHUB_GMS_URL }}
DATAHUB_TELEMETRY_ENABLED: false
run: poetry run datahub ingest -c ingestion/create_cadet_databases.yaml
run: time poetry run datahub ingest -c ingestion/create_cadet_databases.yaml

- name: push metadata to datahub
env:
DATAHUB_GMS_TOKEN: ${{ secrets.DATAHUB_GMS_TOKEN }}
DATAHUB_GMS_URL: ${{ vars.DATAHUB_GMS_URL }}
DATAHUB_TELEMETRY_ENABLED: false
run: poetry run datahub ingest -c ingestion/cadet.yaml
run: time poetry run datahub ingest -c ingestion/cadet.yaml

- name: Notify on failure
uses: slackapi/slack-github-action@v1.26.0
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/ingest-justice-data.yml
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ jobs:
DATAHUB_GMS_TOKEN: ${{ secrets.DATAHUB_GMS_TOKEN }}
DATAHUB_GMS_URL: ${{ vars.DATAHUB_GMS_URL }}
DATAHUB_TELEMETRY_ENABLED: false
run: poetry run datahub ingest -c ingestion/justice_data_ingest.yaml
run: time poetry run datahub ingest -c ingestion/justice_data_ingest.yaml

- name: Notify on failure
uses: slackapi/slack-github-action@v1.26.0
Expand Down
6 changes: 6 additions & 0 deletions ingestion/create_cadet_databases_source/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,17 @@
get_cadet_manifest,
validate_fqn,
)
from ingestion.utils import report_generator_time, report_time

logging.basicConfig(level=logging.DEBUG)


@config_class(CreateCadetDatabasesConfig)
class CreateCadetDatabases(Source):
source_config: CreateCadetDatabasesConfig
report: SourceReport = SourceReport()

@report_time
def __init__(self, config: CreateCadetDatabasesConfig, ctx: PipelineContext):
super().__init__(ctx)
self.source_config = config
Expand All @@ -35,6 +39,7 @@ def create(cls, config_dict, ctx):
config = CreateCadetDatabasesConfig.parse_obj(config_dict)
return cls(config, ctx)

@report_generator_time
def get_workunits(self) -> Iterable[MetadataWorkUnit]:
manifest = get_cadet_manifest(self.source_config.manifest_s3_uri)

Expand Down Expand Up @@ -89,6 +94,7 @@ def _get_domains(self, manifest) -> set[str]:
if manifest["nodes"][node]["resource_type"] == "model"
)

@report_time
def _get_databases_with_domains_and_display_tags(
self, manifest
) -> tuple[set[tuple[str, str]], dict]:
Expand Down
5 changes: 5 additions & 0 deletions ingestion/ingestion_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,13 @@
from datahub.ingestion.graph.client import DatahubClientConfig, DataHubGraph

from ingestion.config import ENV, INSTANCE, PLATFORM
from ingestion.utils import report_time

logging.basicConfig()
logging.getLogger().setLevel(logging.DEBUG)


@report_time
def get_cadet_manifest(manifest_s3_uri: str) -> Dict:
try:
s3 = boto3.client("s3")
Expand Down
6 changes: 5 additions & 1 deletion ingestion/justice_data_source/source.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import logging
from io import BufferedReader
from typing import Iterable, Optional

Expand Down Expand Up @@ -25,7 +26,6 @@
)
from datahub.metadata.com.linkedin.pegasus2avro.mxe import MetadataChangeEvent
from datahub.metadata.schema_classes import (
BrowsePathsV2Class,
ChangeTypeClass,
ChartInfoClass,
CorpGroupInfoClass,
Expand All @@ -38,10 +38,13 @@
)

from ingestion.ingestion_utils import list_datahub_domains
from ingestion.utils import report_generator_time

from .api_client import JusticeDataAPIClient
from .config import JusticeDataAPIConfig

logging.basicConfig(level=logging.DEBUG)


@platform_name("File")
@config_class(JusticeDataAPIConfig)
Expand Down Expand Up @@ -72,6 +75,7 @@ def create(cls, config_dict, ctx):
config = JusticeDataAPIConfig.parse_obj(config_dict)
return cls(ctx, config)

@report_generator_time
def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
all_chart_data = self.client.list_all(self.config.exclude_id_list)

Expand Down
3 changes: 3 additions & 0 deletions ingestion/taggers/display_in_catalogue_tagger.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@
import datahub.emitter.mce_builder as builder
from datahub.metadata.schema_classes import TagAssociationClass

logging.basicConfig()
logging.getLogger().setLevel(logging.DEBUG)


def add_display_in_catalogue_tag(entity_urn: str) -> List[TagAssociationClass]:
"""
Expand Down
5 changes: 5 additions & 0 deletions ingestion/transformers/assign_cadet_databases.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
parse_database_and_table_names,
validate_fqn,
)
from ingestion.utils import report_time

logging.basicConfig(level=logging.DEBUG)


class AssignCadetDatabasesConfig(ConfigModel):
Expand Down Expand Up @@ -49,6 +52,7 @@ def transform_aspect(
) -> Optional[Aspect]:
return None

@report_time
def handle_end_of_stream(
self,
) -> List[Union[MetadataChangeProposalWrapper, MetadataChangeProposalClass]]:
Expand Down Expand Up @@ -77,6 +81,7 @@ def handle_end_of_stream(

return mcps

@report_time
def _get_table_database_mappings(self, manifest) -> Dict[str, str]:
mappings = {}
for node in manifest["nodes"]:
Expand Down
20 changes: 19 additions & 1 deletion ingestion/transformers/assign_cadet_domains.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
from typing import Iterable

from datahub.configuration.common import (
KeyValuePattern,
TransformerSemanticsConfigModel,
)
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.common import PipelineContext, RecordEnvelope
from datahub.ingestion.transformer.dataset_domain import (
AddDatasetDomain,
AddDatasetDomainSemanticsConfig,
Expand All @@ -14,6 +16,7 @@
get_cadet_manifest,
validate_fqn,
)
from ingestion.utils import Stopwatch, report_time


class PatternDatasetDomainSemanticsConfig(TransformerSemanticsConfigModel):
Expand Down Expand Up @@ -45,8 +48,22 @@ def resolve_domain(domain_urn: str) -> DomainsClass:
get_domains_to_add=resolve_domain,
)

self.transform_timer = Stopwatch(transformer="AssignCadetDomains")

super().__init__(generic_config, ctx)

def _should_process(self, record):
if not self.transform_timer.running:
self.transform_timer.start()
return super()._should_process(record)

def _handle_end_of_stream(
self, envelope: RecordEnvelope
) -> Iterable[RecordEnvelope]:
self.transform_timer.stop()
self.transform_timer.report()
return super()._handle_end_of_stream(envelope)

@classmethod
def create(cls, config_dict, ctx: PipelineContext) -> "AssignCadetDomains":
try:
Expand All @@ -62,6 +79,7 @@ def create(cls, config_dict, ctx: PipelineContext) -> "AssignCadetDomains":
)
return cls(config_dict, ctx)

@report_time
def _get_domain_mapping(self, manifest) -> PatternDatasetDomainSemanticsConfig:
"""Map regex patterns for tables to domains"""
nodes = manifest.get("nodes")
Expand Down
88 changes: 88 additions & 0 deletions ingestion/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
import logging
import time
from datetime import timedelta

logging.basicConfig(level=logging.DEBUG)


def report_time(func):
"""
Decorator to report the total time of a function call
"""

def wrapped_func(*args, **kwargs):
arg_types = [type(arg) for arg in args]
stopwatch = Stopwatch(
function=func.__name__, arg_types=arg_types, kwargs=kwargs
)

stopwatch.start()

r = func(*args, **kwargs)

stopwatch.stop()
stopwatch.report()

return r

return wrapped_func


def report_generator_time(func):
"""
Decorator to report the total time of an iterable
"""

def wrapped_func(*args, **kwargs):
arg_types = [type(arg) for arg in args]
stopwatch = Stopwatch(
function=func.__name__, arg_types=arg_types, kwargs=kwargs
)

stopwatch.start()

r = func(*args, **kwargs)
yield from r

stopwatch.stop()
stopwatch.report()

return r

return wrapped_func


class Stopwatch:
"""
Wrapper around the time module for timing code execution
"""

def __init__(self, **meta):
self.running = False
self.start_time = None
self.stop_time = None
self.elapsed = 0
joined_meta = ", ".join(f"{k}={v}" for k, v in meta.items())
self.prefix = f"TIMING: {joined_meta}, " if joined_meta else "TIMING: "

def start(self):
self.start_time = time.time()
self.running = True

def stop(self):
self.running = False
if not self.start_time:
return

now = time.time()
elapsed = now - self.start_time
self.stop_time = now
self.elapsed += elapsed

def report(self):
logging.info(
f"{self.prefix}"
f"start_time={time.strftime('%Y-%m-%d %H:%M:%S', time.gmtime(self.start_time))}, "
f"end_time={time.strftime('%Y-%m-%d %H:%M:%S', time.gmtime(self.stop_time))}, "
f"elapsed_time={str(timedelta(seconds=self.elapsed))}"
)
65 changes: 65 additions & 0 deletions tests/test_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
import logging
import re

from ingestion.utils import Stopwatch, report_generator_time, report_time

REPORT_REGEX = re.compile(
r"TIMING: .*"
r"start_time=\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}, "
r"end_time=\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}, "
r"elapsed_time=0:00:\d\d",
)


def test_stopwatch_generates_a_report(caplog):
caplog.set_level(logging.INFO)
s = Stopwatch()
s.start()
s.stop()
s.report()

messages = [r.message for r in caplog.records]
assert len(messages) == 1
assert re.match(
REPORT_REGEX,
messages[0],
)


def test_report_time_generates_a_report(caplog):
caplog.set_level(logging.INFO)

@report_time
def foo():
return 1 + 1

assert foo() == 2

messages = [r.message for r in caplog.records]
assert len(messages) == 1
assert re.match(
REPORT_REGEX,
messages[0],
)
assert "function=foo, " in messages[0]


def test_report_generate_time(caplog):
caplog.set_level(logging.INFO)

@report_generator_time
def foo():
yield 1
yield 2

generator = foo()
values = list(generator)
assert values == [1, 2]

messages = [r.message for r in caplog.records]
assert len(messages) == 1
assert re.match(
REPORT_REGEX,
messages[0],
)
assert "function=foo, " in messages[0]

0 comments on commit 13b9de5

Please sign in to comment.