From f9709555477ae42ff6314e646824d2b12ab6953b Mon Sep 17 00:00:00 2001 From: Gerda Shank Date: Thu, 27 Jan 2022 14:43:26 -0500 Subject: [PATCH] [#4504] Use mashumaro for serializing logging events (#4505) automatic commit by git-black, original commits: efb890db2d4798b6c197c1434313954e655de207 --- core/dbt/contracts/graph/parsed.py | 21 +++++++++------------ core/dbt/events/base_types.py | 4 ++-- core/dbt/events/functions.py | 14 +++++++------- core/dbt/events/serialization.py | 9 ++++----- core/dbt/events/types.py | 29 +++++++++++++++-------------- core/dbt/graph/selector.py | 2 +- core/dbt/task/base.py | 2 +- core/dbt/task/freshness.py | 10 +++++----- core/dbt/task/run.py | 10 +++++----- core/dbt/task/seed.py | 8 ++++---- core/dbt/task/snapshot.py | 4 ++-- core/dbt/task/test.py | 10 +++++----- core/dbt/utils.py | 18 +++++++++++------- 13 files changed, 71 insertions(+), 70 deletions(-) diff --git a/core/dbt/contracts/graph/parsed.py b/core/dbt/contracts/graph/parsed.py index 2fbeb806e57..603de631a27 100644 --- a/core/dbt/contracts/graph/parsed.py +++ b/core/dbt/contracts/graph/parsed.py @@ -176,20 +176,20 @@ def identifier(self): @dataclass -class NodeInfoMixin(): +class NodeInfoMixin: _event_status: Dict[str, Any] = field(default_factory=dict) @property def node_info(self): node_info = { - "node_path": getattr(self, 'path', None), - "node_name": getattr(self, 'name', None), - "unique_id": getattr(self, 'unique_id', None), - "resource_type": str(getattr(self, 'resource_type', '')), - "materialized": self.config.get('materialized'), - "node_status": str(self._event_status.get('node_status')), + "node_path": getattr(self, "path", None), + "node_name": getattr(self, "name", None), + "unique_id": getattr(self, "unique_id", None), + "resource_type": str(getattr(self, "resource_type", "")), + "materialized": self.config.get("materialized"), + "node_status": str(self._event_status.get("node_status")), "node_started_at": self._event_status.get("started_at"), - "node_finished_at": self._event_status.get("finished_at") + "node_finished_at": self._event_status.get("finished_at"), } return node_info @@ -628,10 +628,7 @@ class ParsedSourceMandatory( @dataclass -class ParsedSourceDefinition( - NodeInfoMixin, - ParsedSourceMandatory -): +class ParsedSourceDefinition(NodeInfoMixin, ParsedSourceMandatory): quoting: Quoting = field(default_factory=Quoting) loaded_at_field: Optional[str] = None freshness: Optional[FreshnessThreshold] = None diff --git a/core/dbt/events/base_types.py b/core/dbt/events/base_types.py index 5fe4285ac7c..40cbba0019b 100644 --- a/core/dbt/events/base_types.py +++ b/core/dbt/events/base_types.py @@ -45,7 +45,7 @@ def code() -> str: # It should be in all subclasses that are to record actual events. @abstractmethod def to_dict(self): - raise Exception('to_dict not implemented for Event') + raise Exception("to_dict not implemented for Event") # do not define this yourself. inherit it from one of the above level types. @abstractmethod @@ -116,5 +116,5 @@ class NoStdOut(): # This class represents the node_info which is generated # by the NodeInfoMixin class in dbt.contracts.graph.parsed @dataclass -class NodeInfo(): +class NodeInfo: node_info: Dict[str, Any] diff --git a/core/dbt/events/functions.py b/core/dbt/events/functions.py index fa255a91ac0..25718170685 100644 --- a/core/dbt/events/functions.py +++ b/core/dbt/events/functions.py @@ -168,17 +168,17 @@ def event_to_serializable_dict( ) # We get the code from the event object, so we don't need it in the data - if 'code' in log_line: - del log_line['code'] + if "code" in log_line: + del log_line["code"] event_dict = { 'type': 'log_line', - 'log_version': LOG_VERSION, - 'ts': get_ts_rfc3339(), + "log_version": LOG_VERSION, + "ts": get_ts_rfc3339(), 'pid': e.get_pid(), 'msg': e.message(), 'level': e.level_tag(), - 'data': log_line, + "data": log_line, 'invocation_id': e.get_invocation_id(), 'thread_name': e.get_thread_name(), 'code': e.code @@ -202,7 +202,7 @@ def create_debug_text_log_line(e: T_Event) -> str: # Create a separator if this is the beginning of an invocation if type(e) == MainReportVersion: separator = 30 * '=' - log_line = f'\n\n{separator} {get_ts()} | {get_invocation_id()} {separator}\n' + log_line = f"\n\n{separator} {get_ts()} | {get_invocation_id()} {separator}\n" color_tag: str = '' if this.format_color else Style.RESET_ALL ts: str = get_ts().strftime("%H:%M:%S.%f") scrubbed_msg: str = scrub_secrets(e.message(), env_secrets()) @@ -390,5 +390,5 @@ def get_ts() -> datetime: # preformatted time stamp def get_ts_rfc3339() -> str: ts = get_ts() - ts_rfc3339 = ts.strftime('%Y-%m-%dT%H:%M:%S.%fZ') + ts_rfc3339 = ts.strftime("%Y-%m-%dT%H:%M:%S.%fZ") return ts_rfc3339 diff --git a/core/dbt/events/serialization.py b/core/dbt/events/serialization.py index 8ef18c37bfe..20dc54c939f 100644 --- a/core/dbt/events/serialization.py +++ b/core/dbt/events/serialization.py @@ -1,8 +1,6 @@ from dbt.helper_types import Lazy from mashumaro import DataClassDictMixin -from mashumaro.config import ( - BaseConfig as MashBaseConfig -) +from mashumaro.config import BaseConfig as MashBaseConfig from mashumaro.types import SerializationStrategy from typing import Dict, List @@ -11,13 +9,14 @@ # class. If a datetime ends up in an event class, we could use a similar class # here to serialize it in our preferred format. + class ExceptionSerialization(SerializationStrategy): def serialize(self, value): out = str(value) return out def deserialize(self, value): - return (Exception(value)) + return Exception(value) class BaseExceptionSerialization(SerializationStrategy): @@ -25,7 +24,7 @@ def serialize(self, value): return str(value) def deserialize(self, value): - return (BaseException(value)) + return BaseException(value) # This is an explicit deserializer for the type Lazy[Dict[str, List[str]]] diff --git a/core/dbt/events/types.py b/core/dbt/events/types.py index 0237fdc7837..88220a8be61 100644 --- a/core/dbt/events/types.py +++ b/core/dbt/events/types.py @@ -1708,7 +1708,9 @@ def message(self) -> str: index=self.index, total=self.total, execution_time=self.execution_time, - truncate=True) + execution_time=self.execution_time, + truncate=True, + ) @dataclass @@ -2355,11 +2357,11 @@ def message(self) -> str: # # TODO remove these lines once we run mypy everywhere. if 1 == 0: - MainReportVersion(v='') + MainReportVersion(v="") MainKeyboardInterrupt() - MainEncounteredError(e=BaseException('')) - MainStackTrace(stack_trace='') - MainTrackingUserState(user_state='') + MainEncounteredError(e=BaseException("")) + MainStackTrace(stack_trace="") + MainTrackingUserState(user_state="") ParsingStart() ParsingCompiling() ParsingWritingManifest() @@ -2402,8 +2404,7 @@ def message(self) -> str: SQLQueryStatus(status="", elapsed=0.1) SQLCommit(conn_name="") ColTypeChange( - orig_type="", new_type="", - table=_ReferenceKey(database="", schema="", identifier="") + orig_type="", new_type="", table=_ReferenceKey(database="", schema="", identifier="") ) SchemaCreation(relation=_ReferenceKey(database="", schema="", identifier="")) SchemaDrop(relation=_ReferenceKey(database="", schema="", identifier="")) @@ -2457,7 +2458,7 @@ def message(self) -> str: PartialParsingFailedBecauseProfileChange() PartialParsingFailedBecauseNewProjectDependency() PartialParsingFailedBecauseHashChanged() - PartialParsingDeletedMetric(id='') + PartialParsingDeletedMetric(id="") ParsedFileLoadFailed(path='', exc=Exception('')) PartialParseSaveFileNotFound() StaticParserCausedJinjaRendering(path='') @@ -2493,7 +2494,7 @@ def message(self) -> str: ProfileHelpMessage() CatchableExceptionOnRun(exc=Exception('')) InternalExceptionOnRun(build_path='', exc=Exception('')) - GenericExceptionOnRun(build_path='', unique_id='', exc=Exception('')) + GenericExceptionOnRun(build_path="", unique_id="", exc=Exception("")) NodeConnectionReleaseError(node_name='', exc=Exception('')) CheckCleanPath(path='') ConfirmCleanPath(path='') @@ -2531,7 +2532,7 @@ def message(self) -> str: FirstRunResultError(msg='') AfterFirstRunResultError(msg='') EndOfRunSummary(num_errors=0, num_warnings=0, keyboard_interrupt=False) - PrintStartLine(description='', index=0, total=0, node_info={}) + PrintStartLine(description="", index=0, total=0, node_info={}) PrintHookStartLine( statement='', index=0, @@ -2671,12 +2672,12 @@ def message(self) -> str: ) PrintCancelLine(conn_name='') DefaultSelector(name='') - NodeStart(node_info={}, unique_id='') - NodeFinished(node_info={}, unique_id='', run_result={}) + NodeStart(node_info={}, unique_id="") + NodeFinished(node_info={}, unique_id="", run_result={}) QueryCancelationUnsupported(type='') ConcurrencyLine(num_threads=0, target_name='') - NodeCompiling(node_info={}, unique_id='') - NodeExecuting(node_info={}, unique_id='') + NodeCompiling(node_info={}, unique_id="") + NodeExecuting(node_info={}, unique_id="") StarterProjectPath(dir='') ConfigFolderDirectory(dir='') NoSampleProfileFound(adapter='') diff --git a/core/dbt/graph/selector.py b/core/dbt/graph/selector.py index 2fef6af5339..1757dda689b 100644 --- a/core/dbt/graph/selector.py +++ b/core/dbt/graph/selector.py @@ -88,7 +88,7 @@ def get_nodes_from_criteria( except InvalidSelectorException: valid_selectors = ", ".join(self.SELECTOR_METHODS) fire_event(SelectorReportInvalidSelector( - valid_selectors=valid_selectors, + SelectorReportInvalidSelector( spec_method=spec.method, raw_spec=spec.raw )) diff --git a/core/dbt/task/base.py b/core/dbt/task/base.py index 0d69ea26708..da2b736b0e3 100644 --- a/core/dbt/task/base.py +++ b/core/dbt/task/base.py @@ -458,7 +458,7 @@ def on_skip(self): node_name=node_name, index=self.node_index, total=self.num_nodes, - node_info=self.node.node_info + node_info=self.node.node_info, ) ) diff --git a/core/dbt/task/freshness.py b/core/dbt/task/freshness.py index 482152d144c..47d101a9fe9 100644 --- a/core/dbt/task/freshness.py +++ b/core/dbt/task/freshness.py @@ -39,7 +39,7 @@ def before_execute(self): description=description, index=self.node_index, total=self.num_nodes, - node_info=self.node.node_info + node_info=self.node.node_info, ) ) @@ -58,7 +58,7 @@ def after_execute(self, result): index=self.node_index, total=self.num_nodes, execution_time=result.execution_time, - node_info=self.node.node_info + node_info=self.node.node_info, ) ) elif result.status == FreshnessStatus.Error: @@ -69,7 +69,7 @@ def after_execute(self, result): index=self.node_index, total=self.num_nodes, execution_time=result.execution_time, - node_info=self.node.node_info + node_info=self.node.node_info, ) ) elif result.status == FreshnessStatus.Warn: @@ -80,7 +80,7 @@ def after_execute(self, result): index=self.node_index, total=self.num_nodes, execution_time=result.execution_time, - node_info=self.node.node_info + node_info=self.node.node_info, ) ) else: @@ -91,7 +91,7 @@ def after_execute(self, result): index=self.node_index, total=self.num_nodes, execution_time=result.execution_time, - node_info=self.node.node_info + node_info=self.node.node_info, ) ) diff --git a/core/dbt/task/run.py b/core/dbt/task/run.py index b3f9a86c366..3ebb2dc416f 100644 --- a/core/dbt/task/run.py +++ b/core/dbt/task/run.py @@ -174,7 +174,7 @@ def print_start_line(self): description=self.describe_node(), index=self.node_index, total=self.num_nodes, - node_info=self.node.node_info + node_info=self.node.node_info, ) ) @@ -188,7 +188,7 @@ def print_result_line(self, result): index=self.node_index, total=self.num_nodes, execution_time=result.execution_time, - node_info=self.node.node_info + node_info=self.node.node_info, ) ) else: @@ -199,7 +199,7 @@ def print_result_line(self, result): index=self.node_index, total=self.num_nodes, execution_time=result.execution_time, - node_info=self.node.node_info + node_info=self.node.node_info, ) ) @@ -347,7 +347,7 @@ def run_hooks(self, adapter, hook_type: RunHookType, extra_context): statement=hook_text, index=idx, total=num_hooks, - node_info=hook.node_info + node_info=hook.node_info, ) ) @@ -369,7 +369,7 @@ def run_hooks(self, adapter, hook_type: RunHookType, extra_context): index=idx, total=num_hooks, execution_time=timer.elapsed, - node_info=hook.node_info + node_info=hook.node_info, ) ) # `_event_status` dict is only used for logging. Make sure diff --git a/core/dbt/task/seed.py b/core/dbt/task/seed.py index be5a9278a11..30b9f3aa825 100644 --- a/core/dbt/task/seed.py +++ b/core/dbt/task/seed.py @@ -11,7 +11,7 @@ from dbt.logger import TextOnly from dbt.events.functions import fire_event from dbt.events.types import ( - SeedHeader, SeedHeaderSeparator, EmptyLine, PrintSeedErrorResultLine, + SeedHeader, PrintSeedResultLine, PrintStartLine ) from dbt.node_types import NodeType @@ -28,7 +28,7 @@ def before_execute(self): description=self.describe_node(), index=self.node_index, total=self.num_nodes, - node_info=self.node.node_info + node_info=self.node.node_info, ) ) @@ -52,7 +52,7 @@ def print_result_line(self, result): execution_time=result.execution_time, schema=self.node.schema, relation=model.alias, - node_info=model.node_info + node_info=model.node_info, ) ) else: @@ -64,7 +64,7 @@ def print_result_line(self, result): execution_time=result.execution_time, schema=self.node.schema, relation=model.alias, - node_info=model.node_info + node_info=model.node_info, ) ) diff --git a/core/dbt/task/snapshot.py b/core/dbt/task/snapshot.py index 77ca74fe5b5..7bd62ffb55b 100644 --- a/core/dbt/task/snapshot.py +++ b/core/dbt/task/snapshot.py @@ -24,7 +24,7 @@ def print_result_line(self, result): index=self.node_index, total=self.num_nodes, execution_time=result.execution_time, - node_info=model.node_info + node_info=model.node_info, ) ) else: @@ -36,7 +36,7 @@ def print_result_line(self, result): index=self.node_index, total=self.num_nodes, execution_time=result.execution_time, - node_info=model.node_info + node_info=model.node_info, ) ) diff --git a/core/dbt/task/test.py b/core/dbt/task/test.py index 8930d966bb2..9cc3bf1cb2c 100644 --- a/core/dbt/task/test.py +++ b/core/dbt/task/test.py @@ -75,7 +75,7 @@ def print_result_line(self, result): index=self.node_index, num_models=self.num_nodes, execution_time=result.execution_time, - node_info=model.node_info + node_info=model.node_info, ) ) elif result.status == TestStatus.Pass: @@ -85,7 +85,7 @@ def print_result_line(self, result): index=self.node_index, num_models=self.num_nodes, execution_time=result.execution_time, - node_info=model.node_info + node_info=model.node_info, ) ) elif result.status == TestStatus.Warn: @@ -96,7 +96,7 @@ def print_result_line(self, result): num_models=self.num_nodes, execution_time=result.execution_time, failures=result.failures, - node_info=model.node_info + node_info=model.node_info, ) ) elif result.status == TestStatus.Fail: @@ -107,7 +107,7 @@ def print_result_line(self, result): num_models=self.num_nodes, execution_time=result.execution_time, failures=result.failures, - node_info=model.node_info + node_info=model.node_info, ) ) else: @@ -119,7 +119,7 @@ def print_start_line(self): description=self.describe_node(), index=self.node_index, total=self.num_nodes, - node_info=self.node.node_info + node_info=self.node.node_info, ) ) diff --git a/core/dbt/utils.py b/core/dbt/utils.py index b4b669d00c1..fd78bbb9cc4 100644 --- a/core/dbt/utils.py +++ b/core/dbt/utils.py @@ -650,23 +650,27 @@ def args_to_dict(args): dict_args = {} # remove args keys that clutter up the dictionary for key in var_args: - if key == 'cls': + if key == "cls": continue if var_args[key] is None: continue # TODO: add more default_false_keys default_false_keys = ( - 'debug', 'full_refresh', 'fail_fast', 'warn_error', - 'single_threaded', 'log_cache_events', 'store_failures', - 'use_experimental_parser', + "debug", + "full_refresh", + "fail_fast", + "warn_error", + "single_threaded", + "log_cache_events", + "store_failures", + "use_experimental_parser", ) if key in default_false_keys and var_args[key] is False: continue - if key == 'vars' and var_args[key] == '{}': + if key == "vars" and var_args[key] == "{}": continue # this was required for a test case - if (isinstance(var_args[key], PosixPath) or - isinstance(var_args[key], WindowsPath)): + if isinstance(var_args[key], PosixPath) or isinstance(var_args[key], WindowsPath): var_args[key] = str(var_args[key]) dict_args[key] = var_args[key] return dict_args