Skip to content

Commit

Permalink
add event type modeling and fire_event calls
Browse files Browse the repository at this point in the history
  • Loading branch information
Nathaniel May committed Oct 26, 2021
1 parent 5c9fd07 commit 8655220
Show file tree
Hide file tree
Showing 7 changed files with 182 additions and 58 deletions.
9 changes: 9 additions & 0 deletions core/dbt/events/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# Events Module

The Events module is the implmentation for structured logging. These events represent both a programatic interface to dbt processes as well as human-readable messaging in one centralized place. The centralization allows for leveraging mypy to enforce interface invariants across all dbt events, and the distinct type layer allows for decoupling events and libraries such as loggers.

# Using the Events Module
The event module provides types that represent what is happening in dbt in `events.types`. These types are intended to represent an exhaustive list of all things happening within dbt that will need to be logged, streamed, or printed. To fire an event, `events.functions::fire_event` is the entry point to the module from everywhere in dbt.

# Adding a New Event
In `events.types` add a new class that represents the new event. This may be a simple class with no values, or it may require some values to construct downstream messaging. Only include the data necessary to construct this message within this class. If it fits into one of the existing hierarchies, add it as a subclass of the base class, and add it as a member of the union type so that all of the mypy checks will include it. Finally, add the type to the body of the functions that compose the final messages.
46 changes: 0 additions & 46 deletions core/dbt/events/events.py

This file was deleted.

61 changes: 61 additions & 0 deletions core/dbt/events/functions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@

import dbt.logger as logger # type: ignore # TODO eventually remove dependency on this logger
from dbt.events.history import EVENT_HISTORY
from dbt.events.types import *
from dbt.events.types import ParsingProgressBase, ManifestProgressBase
from typing import NoReturn


# common trick for getting mypy to do exhaustiveness checks
# will come up with something like `"assert_never" has incompatible type`
# if something is missing.
def assert_never(x: NoReturn) -> NoReturn:
raise AssertionError("Unhandled type: {}".format(type(x).__name__))


# TODO is there a type-level way to do this in mypy? `isinstance(e, CliEvent)`
# triggers `Parameterized generics cannot be used with class or instance checks`
def is_cli_event(e: Event) -> bool:
return isinstance(e, ParsingProgressBase) or isinstance(e, ManifestProgressBase)


# top-level method for accessing the new eventing system
# this is where all the side effects happen branched by event type
# (i.e. - mutating the event history, printing to stdout, logging
# to files, etc.)
def fire_event(e: Event) -> None:
EVENT_HISTORY.append(e)
if is_cli_event(e):
# TODO handle log levels
logger.GLOBAL_LOGGER.info(cli_msg(e))


# These functions translate any instance of the above event types
# into various message types to later be sent to their final destination.
#
# These could instead be implemented as methods on an ABC for all the
# above classes, but this way we can enforce exhaustiveness with mypy


# returns the string to be printed to the CLI
def cli_msg(e: CliEvent) -> str:
if isinstance(e, ParsingStart):
return logger.timestamped_line("Start parsing.")
elif isinstance(e, ParsingCompiling):
return logger.timestamped_line("Compiling.")
elif isinstance(e, ParsingWritingManifest):
return logger.timestamped_line("Writing manifest.")
elif isinstance(e, ParsingDone):
return logger.timestamped_line("Done.")
elif isinstance(e, ManifestDependenciesLoaded):
return logger.timestamped_line("Dependencies loaded")
elif isinstance(e, ManifestLoaderCreated):
return logger.timestamped_line("ManifestLoader created")
elif isinstance(e, ManifestLoaded):
return logger.timestamped_line("Manifest loaded")
elif isinstance(e, ManifestChecked):
return logger.timestamped_line("Manifest checked")
elif isinstance(e, ManifestFlatGraphBuilt):
return logger.timestamped_line("Flat graph built")
else:
assert_never(e)
5 changes: 5 additions & 0 deletions core/dbt/events/history.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from dbt.events.types import Event
from typing import List

# the global history of events for this session
EVENT_HISTORY: List[Event] = []
92 changes: 92 additions & 0 deletions core/dbt/events/types.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@

from typing import NamedTuple, Union


# The following classes represent the data necessary to describe a
# particular event to both human readable logs, and machine reliable
# event streams. The transformation to these forms will live in outside
# functions.
#
# Until we drop support for Python 3.6 we must use NamedTuples over
# frozen dataclasses.


# base class used for type-level membership checking only
class ParsingProgressBase(NamedTuple):
pass


class ParsingStart(ParsingProgressBase):
pass


class ParsingCompiling(ParsingProgressBase):
pass


class ParsingWritingManifest(ParsingProgressBase):
pass


class ParsingDone(ParsingProgressBase):
pass


# using a union instead of inheritance means that this set cannot
# be extended outside this file, and thus mypy can do exhaustiveness
# checks for us.

# type for parsing progress events
ParsingProgress = Union[
ParsingStart,
ParsingCompiling,
ParsingWritingManifest,
ParsingDone
]


# base class used for type-level membership checking only
class ManifestProgressBase(NamedTuple):
pass


class ManifestDependenciesLoaded(ManifestProgressBase):
pass


class ManifestLoaderCreated(ManifestProgressBase):
pass


class ManifestLoaded(ManifestProgressBase):
pass


class ManifestChecked(ManifestProgressBase):
pass


class ManifestFlatGraphBuilt(ManifestProgressBase):
pass


# type for manifest loading progress events
ManifestProgress = Union[
ManifestDependenciesLoaded,
ManifestLoaderCreated,
ManifestLoaded,
ManifestChecked,
ManifestFlatGraphBuilt
]

# top-level event type for all events that go to the CLI
CliEvent = Union[
ParsingProgress,
ManifestProgress
]

# top-level event type for all events
Event = Union[
ParsingProgress,
ManifestProgress
]
4 changes: 3 additions & 1 deletion core/dbt/logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -654,9 +654,11 @@ def list_handler(
def get_timestamp():
return time.strftime("%H:%M:%S")

def timestamped_line(msg: str) -> str:
return "{} | {}".format(get_timestamp(), msg)

def print_timestamped_line(msg: str, use_color: Optional[str] = None):
if use_color is not None:
msg = dbt.ui.color(msg, use_color)

GLOBAL_LOGGER.info("{} | {}".format(get_timestamp(), msg))
GLOBAL_LOGGER.info(timestamped_line(msg))
23 changes: 12 additions & 11 deletions core/dbt/task/parse.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
)
from dbt.logger import DbtProcessState, print_timestamped_line
from dbt.clients.system import write_file
from dbt.events.types import *
from dbt.events.functions import fire_event
from dbt.graph import Graph
import time
from typing import Optional
Expand Down Expand Up @@ -58,38 +60,37 @@ def get_full_manifest(self):
with PARSING_STATE:
start_load_all = time.perf_counter()
projects = root_config.load_dependencies()
print_timestamped_line("Dependencies loaded")
fire_event(ManifestDependenciesLoaded())
loader = ManifestLoader(root_config, projects, macro_hook)
print_timestamped_line("ManifestLoader created")
fire_event(ManifestLoaderCreated())
manifest = loader.load()
print_timestamped_line("Manifest loaded")
fire_event(ManifestLoaded())
_check_manifest(manifest, root_config)
print_timestamped_line("Manifest checked")
fire_event(ManifestChecked())
manifest.build_flat_graph()
print_timestamped_line("Flat graph built")
fire_event(ManifestFlatGraphBuilt())
loader._perf_info.load_all_elapsed = (
time.perf_counter() - start_load_all
)

self.loader = loader
self.manifest = manifest
print_timestamped_line("Manifest loaded")
fire_event(ManifestLoaded())

def compile_manifest(self):
adapter = get_adapter(self.config)
compiler = adapter.get_compiler()
self.graph = compiler.compile(self.manifest)

def run(self):
events.register(Progress(ParseStart))
print_timestamped_line('Start parsing.')
fire_event(ParsingStart())
self.get_full_manifest()
if self.args.compile:
print_timestamped_line('Compiling.')
fire_event(ParsingCompiling())
self.compile_manifest()
if self.args.write_manifest:
print_timestamped_line('Writing manifest.')
fire_event(ParsingWritingManifest())
self.write_manifest()

self.write_perf_info()
print_timestamped_line('Done.')
fire_event(ParsingDone())

0 comments on commit 8655220

Please sign in to comment.