Skip to content

Commit

Permalink
[#4504] Use mashumaro for serializing logging events (#4505)
Browse files Browse the repository at this point in the history
automatic commit by git-black, original commits:
  efb890d
  • Loading branch information
gshank authored and iknox-fa committed Feb 8, 2022
1 parent ad4b8e1 commit f970955
Show file tree
Hide file tree
Showing 13 changed files with 71 additions and 70 deletions.
21 changes: 9 additions & 12 deletions core/dbt/contracts/graph/parsed.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,20 +176,20 @@ def identifier(self):


@dataclass
class NodeInfoMixin():
class NodeInfoMixin:
_event_status: Dict[str, Any] = field(default_factory=dict)

@property
def node_info(self):
node_info = {
"node_path": getattr(self, 'path', None),
"node_name": getattr(self, 'name', None),
"unique_id": getattr(self, 'unique_id', None),
"resource_type": str(getattr(self, 'resource_type', '')),
"materialized": self.config.get('materialized'),
"node_status": str(self._event_status.get('node_status')),
"node_path": getattr(self, "path", None),
"node_name": getattr(self, "name", None),
"unique_id": getattr(self, "unique_id", None),
"resource_type": str(getattr(self, "resource_type", "")),
"materialized": self.config.get("materialized"),
"node_status": str(self._event_status.get("node_status")),
"node_started_at": self._event_status.get("started_at"),
"node_finished_at": self._event_status.get("finished_at")
"node_finished_at": self._event_status.get("finished_at"),
}
return node_info

Expand Down Expand Up @@ -628,10 +628,7 @@ class ParsedSourceMandatory(


@dataclass
class ParsedSourceDefinition(
NodeInfoMixin,
ParsedSourceMandatory
):
class ParsedSourceDefinition(NodeInfoMixin, ParsedSourceMandatory):
quoting: Quoting = field(default_factory=Quoting)
loaded_at_field: Optional[str] = None
freshness: Optional[FreshnessThreshold] = None
Expand Down
4 changes: 2 additions & 2 deletions core/dbt/events/base_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ def code() -> str:
# It should be in all subclasses that are to record actual events.
@abstractmethod
def to_dict(self):
raise Exception('to_dict not implemented for Event')
raise Exception("to_dict not implemented for Event")

# do not define this yourself. inherit it from one of the above level types.
@abstractmethod
Expand Down Expand Up @@ -116,5 +116,5 @@ class NoStdOut():
# This class represents the node_info which is generated
# by the NodeInfoMixin class in dbt.contracts.graph.parsed
@dataclass
class NodeInfo():
class NodeInfo:
node_info: Dict[str, Any]
14 changes: 7 additions & 7 deletions core/dbt/events/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,17 +168,17 @@ def event_to_serializable_dict(
)

# We get the code from the event object, so we don't need it in the data
if 'code' in log_line:
del log_line['code']
if "code" in log_line:
del log_line["code"]

event_dict = {
'type': 'log_line',
'log_version': LOG_VERSION,
'ts': get_ts_rfc3339(),
"log_version": LOG_VERSION,
"ts": get_ts_rfc3339(),
'pid': e.get_pid(),
'msg': e.message(),
'level': e.level_tag(),
'data': log_line,
"data": log_line,
'invocation_id': e.get_invocation_id(),
'thread_name': e.get_thread_name(),
'code': e.code
Expand All @@ -202,7 +202,7 @@ def create_debug_text_log_line(e: T_Event) -> str:
# Create a separator if this is the beginning of an invocation
if type(e) == MainReportVersion:
separator = 30 * '='
log_line = f'\n\n{separator} {get_ts()} | {get_invocation_id()} {separator}\n'
log_line = f"\n\n{separator} {get_ts()} | {get_invocation_id()} {separator}\n"
color_tag: str = '' if this.format_color else Style.RESET_ALL
ts: str = get_ts().strftime("%H:%M:%S.%f")
scrubbed_msg: str = scrub_secrets(e.message(), env_secrets())
Expand Down Expand Up @@ -390,5 +390,5 @@ def get_ts() -> datetime:
# preformatted time stamp
def get_ts_rfc3339() -> str:
ts = get_ts()
ts_rfc3339 = ts.strftime('%Y-%m-%dT%H:%M:%S.%fZ')
ts_rfc3339 = ts.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
return ts_rfc3339
9 changes: 4 additions & 5 deletions core/dbt/events/serialization.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
from dbt.helper_types import Lazy
from mashumaro import DataClassDictMixin
from mashumaro.config import (
BaseConfig as MashBaseConfig
)
from mashumaro.config import BaseConfig as MashBaseConfig
from mashumaro.types import SerializationStrategy
from typing import Dict, List

Expand All @@ -11,21 +9,22 @@
# class. If a datetime ends up in an event class, we could use a similar class
# here to serialize it in our preferred format.


class ExceptionSerialization(SerializationStrategy):
def serialize(self, value):
out = str(value)
return out

def deserialize(self, value):
return (Exception(value))
return Exception(value)


class BaseExceptionSerialization(SerializationStrategy):
def serialize(self, value):
return str(value)

def deserialize(self, value):
return (BaseException(value))
return BaseException(value)


# This is an explicit deserializer for the type Lazy[Dict[str, List[str]]]
Expand Down
29 changes: 15 additions & 14 deletions core/dbt/events/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -1708,7 +1708,9 @@ def message(self) -> str:
index=self.index,
total=self.total,
execution_time=self.execution_time,
truncate=True)
execution_time=self.execution_time,
truncate=True,
)


@dataclass
Expand Down Expand Up @@ -2355,11 +2357,11 @@ def message(self) -> str:
#
# TODO remove these lines once we run mypy everywhere.
if 1 == 0:
MainReportVersion(v='')
MainReportVersion(v="")
MainKeyboardInterrupt()
MainEncounteredError(e=BaseException(''))
MainStackTrace(stack_trace='')
MainTrackingUserState(user_state='')
MainEncounteredError(e=BaseException(""))
MainStackTrace(stack_trace="")
MainTrackingUserState(user_state="")
ParsingStart()
ParsingCompiling()
ParsingWritingManifest()
Expand Down Expand Up @@ -2402,8 +2404,7 @@ def message(self) -> str:
SQLQueryStatus(status="", elapsed=0.1)
SQLCommit(conn_name="")
ColTypeChange(
orig_type="", new_type="",
table=_ReferenceKey(database="", schema="", identifier="")
orig_type="", new_type="", table=_ReferenceKey(database="", schema="", identifier="")
)
SchemaCreation(relation=_ReferenceKey(database="", schema="", identifier=""))
SchemaDrop(relation=_ReferenceKey(database="", schema="", identifier=""))
Expand Down Expand Up @@ -2457,7 +2458,7 @@ def message(self) -> str:
PartialParsingFailedBecauseProfileChange()
PartialParsingFailedBecauseNewProjectDependency()
PartialParsingFailedBecauseHashChanged()
PartialParsingDeletedMetric(id='')
PartialParsingDeletedMetric(id="")
ParsedFileLoadFailed(path='', exc=Exception(''))
PartialParseSaveFileNotFound()
StaticParserCausedJinjaRendering(path='')
Expand Down Expand Up @@ -2493,7 +2494,7 @@ def message(self) -> str:
ProfileHelpMessage()
CatchableExceptionOnRun(exc=Exception(''))
InternalExceptionOnRun(build_path='', exc=Exception(''))
GenericExceptionOnRun(build_path='', unique_id='', exc=Exception(''))
GenericExceptionOnRun(build_path="", unique_id="", exc=Exception(""))
NodeConnectionReleaseError(node_name='', exc=Exception(''))
CheckCleanPath(path='')
ConfirmCleanPath(path='')
Expand Down Expand Up @@ -2531,7 +2532,7 @@ def message(self) -> str:
FirstRunResultError(msg='')
AfterFirstRunResultError(msg='')
EndOfRunSummary(num_errors=0, num_warnings=0, keyboard_interrupt=False)
PrintStartLine(description='', index=0, total=0, node_info={})
PrintStartLine(description="", index=0, total=0, node_info={})
PrintHookStartLine(
statement='',
index=0,
Expand Down Expand Up @@ -2671,12 +2672,12 @@ def message(self) -> str:
)
PrintCancelLine(conn_name='')
DefaultSelector(name='')
NodeStart(node_info={}, unique_id='')
NodeFinished(node_info={}, unique_id='', run_result={})
NodeStart(node_info={}, unique_id="")
NodeFinished(node_info={}, unique_id="", run_result={})
QueryCancelationUnsupported(type='')
ConcurrencyLine(num_threads=0, target_name='')
NodeCompiling(node_info={}, unique_id='')
NodeExecuting(node_info={}, unique_id='')
NodeCompiling(node_info={}, unique_id="")
NodeExecuting(node_info={}, unique_id="")
StarterProjectPath(dir='')
ConfigFolderDirectory(dir='')
NoSampleProfileFound(adapter='')
Expand Down
2 changes: 1 addition & 1 deletion core/dbt/graph/selector.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ def get_nodes_from_criteria(
except InvalidSelectorException:
valid_selectors = ", ".join(self.SELECTOR_METHODS)
fire_event(SelectorReportInvalidSelector(
valid_selectors=valid_selectors,
SelectorReportInvalidSelector(
spec_method=spec.method,
raw_spec=spec.raw
))
Expand Down
2 changes: 1 addition & 1 deletion core/dbt/task/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -458,7 +458,7 @@ def on_skip(self):
node_name=node_name,
index=self.node_index,
total=self.num_nodes,
node_info=self.node.node_info
node_info=self.node.node_info,
)
)

Expand Down
10 changes: 5 additions & 5 deletions core/dbt/task/freshness.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def before_execute(self):
description=description,
index=self.node_index,
total=self.num_nodes,
node_info=self.node.node_info
node_info=self.node.node_info,
)
)

Expand All @@ -58,7 +58,7 @@ def after_execute(self, result):
index=self.node_index,
total=self.num_nodes,
execution_time=result.execution_time,
node_info=self.node.node_info
node_info=self.node.node_info,
)
)
elif result.status == FreshnessStatus.Error:
Expand All @@ -69,7 +69,7 @@ def after_execute(self, result):
index=self.node_index,
total=self.num_nodes,
execution_time=result.execution_time,
node_info=self.node.node_info
node_info=self.node.node_info,
)
)
elif result.status == FreshnessStatus.Warn:
Expand All @@ -80,7 +80,7 @@ def after_execute(self, result):
index=self.node_index,
total=self.num_nodes,
execution_time=result.execution_time,
node_info=self.node.node_info
node_info=self.node.node_info,
)
)
else:
Expand All @@ -91,7 +91,7 @@ def after_execute(self, result):
index=self.node_index,
total=self.num_nodes,
execution_time=result.execution_time,
node_info=self.node.node_info
node_info=self.node.node_info,
)
)

Expand Down
10 changes: 5 additions & 5 deletions core/dbt/task/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ def print_start_line(self):
description=self.describe_node(),
index=self.node_index,
total=self.num_nodes,
node_info=self.node.node_info
node_info=self.node.node_info,
)
)

Expand All @@ -188,7 +188,7 @@ def print_result_line(self, result):
index=self.node_index,
total=self.num_nodes,
execution_time=result.execution_time,
node_info=self.node.node_info
node_info=self.node.node_info,
)
)
else:
Expand All @@ -199,7 +199,7 @@ def print_result_line(self, result):
index=self.node_index,
total=self.num_nodes,
execution_time=result.execution_time,
node_info=self.node.node_info
node_info=self.node.node_info,
)
)

Expand Down Expand Up @@ -347,7 +347,7 @@ def run_hooks(self, adapter, hook_type: RunHookType, extra_context):
statement=hook_text,
index=idx,
total=num_hooks,
node_info=hook.node_info
node_info=hook.node_info,
)
)

Expand All @@ -369,7 +369,7 @@ def run_hooks(self, adapter, hook_type: RunHookType, extra_context):
index=idx,
total=num_hooks,
execution_time=timer.elapsed,
node_info=hook.node_info
node_info=hook.node_info,
)
)
# `_event_status` dict is only used for logging. Make sure
Expand Down
8 changes: 4 additions & 4 deletions core/dbt/task/seed.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from dbt.logger import TextOnly
from dbt.events.functions import fire_event
from dbt.events.types import (
SeedHeader, SeedHeaderSeparator, EmptyLine, PrintSeedErrorResultLine,
SeedHeader,
PrintSeedResultLine, PrintStartLine
)
from dbt.node_types import NodeType
Expand All @@ -28,7 +28,7 @@ def before_execute(self):
description=self.describe_node(),
index=self.node_index,
total=self.num_nodes,
node_info=self.node.node_info
node_info=self.node.node_info,
)
)

Expand All @@ -52,7 +52,7 @@ def print_result_line(self, result):
execution_time=result.execution_time,
schema=self.node.schema,
relation=model.alias,
node_info=model.node_info
node_info=model.node_info,
)
)
else:
Expand All @@ -64,7 +64,7 @@ def print_result_line(self, result):
execution_time=result.execution_time,
schema=self.node.schema,
relation=model.alias,
node_info=model.node_info
node_info=model.node_info,
)
)

Expand Down
4 changes: 2 additions & 2 deletions core/dbt/task/snapshot.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ def print_result_line(self, result):
index=self.node_index,
total=self.num_nodes,
execution_time=result.execution_time,
node_info=model.node_info
node_info=model.node_info,
)
)
else:
Expand All @@ -36,7 +36,7 @@ def print_result_line(self, result):
index=self.node_index,
total=self.num_nodes,
execution_time=result.execution_time,
node_info=model.node_info
node_info=model.node_info,
)
)

Expand Down
Loading

0 comments on commit f970955

Please sign in to comment.