Skip to content

Commit

Permalink
[CORE-388] Log group metadata info for result logs
Browse files Browse the repository at this point in the history
  • Loading branch information
aranke committed Sep 25, 2024
1 parent 3ac20ce commit 1d9ad18
Show file tree
Hide file tree
Showing 11 changed files with 376 additions and 326 deletions.
2 changes: 1 addition & 1 deletion core/dbt/contracts/graph/nodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -1458,7 +1458,7 @@ def to_logging_dict(self) -> Dict[str, Union[str, Dict[str, str]]]:
return {
"name": self.name,
"package_name": self.package_name,
"owner": self.owner.to_dict(),
"owner": self.owner.to_dict(omit_none=True),
}


Expand Down
18 changes: 12 additions & 6 deletions core/dbt/events/core_types.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1280,6 +1280,7 @@ message LogTestResult {
int32 num_models = 5;
float execution_time = 6;
int32 num_failures = 7;
Group group = 8;
}

message LogTestResultMsg {
Expand All @@ -1304,6 +1305,12 @@ message LogStartLineMsg {
LogStartLine data = 2;
}

message Group {
string name = 1;
string package_name = 3;
map<string, string> owner = 7;
}

// Q012
message LogModelResult {
NodeInfo node_info = 1;
Expand All @@ -1312,6 +1319,7 @@ message LogModelResult {
int32 index = 4;
int32 total = 5;
float execution_time = 6;
Group group = 7;
}

message LogModelResultMsg {
Expand All @@ -1331,6 +1339,7 @@ message LogSnapshotResult {
float execution_time = 6;
map<string, string> cfg = 7;
string result_message = 8;
Group group = 9;
}

message LogSnapshotResultMsg {
Expand All @@ -1348,6 +1357,7 @@ message LogSeedResult {
float execution_time = 6;
string schema = 7;
string relation = 8;
Group group = 9;
}

message LogSeedResultMsg {
Expand All @@ -1366,6 +1376,7 @@ message LogFreshnessResult {
float execution_time = 5;
string source_name = 6;
string table_name = 7;
Group group = 8;
}

message LogFreshnessResultMsg {
Expand All @@ -1381,6 +1392,7 @@ message LogNodeNoOpResult {
int32 index = 4;
int32 total = 5;
float execution_time = 6;
Group group = 7;
}

message LogNodeNoOpResultMsg {
Expand Down Expand Up @@ -1820,12 +1832,6 @@ message ServingDocsExitInfoMsg {
ServingDocsExitInfo data = 2;
}

message Group {
string name = 1;
string package_name = 3;
map<string, string> owner = 7;
}

// Z021
message RunResultWarning {
string resource_type = 1;
Expand Down
572 changes: 286 additions & 286 deletions core/dbt/events/core_types_pb2.py

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions core/dbt/task/build.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from dbt.task.base import BaseRunner, resource_types_from_args
from dbt_common.events.functions import fire_event

from . import group_lookup
from .run import ModelRunner as run_model_runner
from .run import RunTask
from .seed import SeedRunner as seed_runner
Expand All @@ -33,12 +34,14 @@ def compile(self, manifest: Manifest):
return self.node

def after_execute(self, result) -> None:
group = group_lookup.get(result.node.unique_id)
fire_event(
LogNodeNoOpResult(
description=self.description,
index=self.node_index,
total=self.num_nodes,
node_info=self.node.node_info,
group=group,
)
)

Expand Down
3 changes: 3 additions & 0 deletions core/dbt/task/freshness.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from dbt_common.events.types import Note
from dbt_common.exceptions import DbtInternalError, DbtRuntimeError

from . import group_lookup
from .base import BaseRunner
from .printer import print_run_result_error
from .run import RunTask
Expand Down Expand Up @@ -64,6 +65,7 @@ def after_execute(self, result) -> None:
source_name = result.source_name
table_name = result.table_name
level = LogFreshnessResult.status_to_level(str(result.status))
group = group_lookup.get(result.node.unique_id)
fire_event(
LogFreshnessResult(
status=result.status,
Expand All @@ -73,6 +75,7 @@ def after_execute(self, result) -> None:
total=self.num_nodes,
execution_time=result.execution_time,
node_info=self.node.node_info,
group=group,
),
level=level,
)
Expand Down
40 changes: 40 additions & 0 deletions core/dbt/task/group_lookup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
from typing import AbstractSet, Dict, Optional, Union

from dbt.contracts.graph.manifest import Manifest
from dbt.contracts.graph.nodes import Group

_node_id_to_group_name_map: Dict[str, str] = {}
_group_name_to_group_map: Dict[str, Group] = {}


def init(manifest: Optional[Manifest], selected_ids: AbstractSet[str]) -> None:
if not manifest:
return

Check warning on line 12 in core/dbt/task/group_lookup.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/task/group_lookup.py#L12

Added line #L12 was not covered by tests

_every_group_name_to_group_map = {v.name: v for v in manifest.groups.values()}

for group_name, node_ids in manifest.group_map.items():
for node_id in node_ids:
# only add node to lookup if it's selected
if node_id in selected_ids:
_node_id_to_group_name_map[node_id] = group_name

# only add group to lookup if it's not already there and if node is selected
if group_name not in _group_name_to_group_map:
_group_name_to_group_map[group_name] = _every_group_name_to_group_map[
group_name
]


def get(node_id: str) -> Optional[Dict[str, Union[str, Dict[str, str]]]]:
group_name = _node_id_to_group_name_map.get(node_id)

if group_name is None:
return None

group = _group_name_to_group_map.get(group_name)

if group is None:
return None

Check warning on line 38 in core/dbt/task/group_lookup.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/task/group_lookup.py#L38

Added line #L38 was not covered by tests

return group.to_logging_dict()
31 changes: 13 additions & 18 deletions core/dbt/task/printer.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
from typing import Dict, Optional
from typing import Dict, Optional, Union

from dbt.artifacts.schemas.results import NodeStatus
from dbt.contracts.graph.nodes import Group
from dbt.events.types import (
CheckNodeTestFailure,
EndOfRunSummary,
Expand All @@ -14,6 +13,7 @@
StatsLine,
)
from dbt.node_types import NodeType
from dbt.task import group_lookup
from dbt_common.events.base_types import EventLevel
from dbt_common.events.format import pluralize
from dbt_common.events.functions import fire_event
Expand Down Expand Up @@ -70,7 +70,10 @@ def print_run_status_line(results) -> None:


def print_run_result_error(
result, newline: bool = True, is_warning: bool = False, group: Optional[Group] = None
result,
newline: bool = True,
is_warning: bool = False,
group: Optional[Dict[str, Union[str, Dict[str, str]]]] = None,
) -> None:
# set node_info for logging events
node_info = None
Expand All @@ -80,36 +83,31 @@ def print_run_result_error(
if newline:
fire_event(Formatting(""))
if is_warning:
group_dict = group.to_logging_dict() if group else None
fire_event(
RunResultWarning(
resource_type=result.node.resource_type,
node_name=result.node.name,
path=result.node.original_file_path,
node_info=node_info,
group=group_dict,
group=group,
)
)
else:
group_dict = group.to_logging_dict() if group else None
fire_event(
RunResultFailure(
resource_type=result.node.resource_type,
node_name=result.node.name,
path=result.node.original_file_path,
node_info=node_info,
group=group_dict,
group=group,
)
)

if result.message:
if is_warning:
fire_event(RunResultWarningMessage(msg=result.message, node_info=node_info))
else:
group_dict = group.to_logging_dict() if group else None
fire_event(
RunResultError(msg=result.message, node_info=node_info, group=group_dict)
)
fire_event(RunResultError(msg=result.message, node_info=node_info, group=group))
else:
fire_event(RunResultErrorNoMessage(status=result.status, node_info=node_info))

Expand All @@ -129,13 +127,10 @@ def print_run_result_error(
elif result.message is not None:
if newline:
fire_event(Formatting(""))
group_dict = group.to_logging_dict() if group else None
fire_event(RunResultError(msg=result.message, node_info=node_info, group=group_dict))
fire_event(RunResultError(msg=result.message, node_info=node_info, group=group))


def print_run_end_messages(
results, keyboard_interrupt: bool = False, groups: Optional[Dict[str, Group]] = None
) -> None:
def print_run_end_messages(results, keyboard_interrupt: bool = False) -> None:
errors, warnings = [], []
for r in results:
if r.status in (NodeStatus.RuntimeErr, NodeStatus.Error, NodeStatus.Fail):
Expand All @@ -157,11 +152,11 @@ def print_run_end_messages(
)

for error in errors:
group = groups.get(error.node.unique_id) if groups and hasattr(error, "node") else None
group = group_lookup.get(error.node.unique_id) if hasattr(error, "node") else None
print_run_result_error(error, is_warning=False, group=group)

for warning in warnings:
group = groups.get(warning.node.unique_id) if groups and hasattr(warning, "node") else None
group = group_lookup.get(warning.node.unique_id) if hasattr(warning, "node") else None
print_run_result_error(warning, is_warning=True, group=group)

print_run_status_line(results)
19 changes: 7 additions & 12 deletions core/dbt/task/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
from dbt_common.events.types import Formatting
from dbt_common.exceptions import DbtValidationError

from . import group_lookup
from .compile import CompileRunner, CompileTask
from .printer import get_counts, print_run_end_messages

Expand Down Expand Up @@ -216,6 +217,7 @@ def print_start_line(self):

def print_result_line(self, result):
description = self.describe_node()
group = group_lookup.get(self.node.unique_id)
if result.status == NodeStatus.Error:
status = result.status
level = EventLevel.ERROR
Expand All @@ -230,6 +232,7 @@ def print_result_line(self, result):
total=self.num_nodes,
execution_time=result.execution_time,
node_info=self.node.node_info,
group=group,
),
level=level,
)
Expand All @@ -243,6 +246,7 @@ def print_batch_result_line(
exception: Optional[Exception],
):
description = self.describe_batch(batch_start)
group = group_lookup.get(self.node.unique_id)
if result.status == NodeStatus.Error:
status = result.status
level = EventLevel.ERROR
Expand All @@ -257,6 +261,7 @@ def print_batch_result_line(
total=batch_total,
execution_time=result.execution_time,
node_info=self.node.node_info,
group=group,
),
level=level,
)
Expand Down Expand Up @@ -648,6 +653,7 @@ def before_run(self, adapter, selected_uids: AbstractSet[str]) -> None:
self.create_schemas(adapter, required_schemas)
self.populate_adapter_cache(adapter, required_schemas)
self.safe_run_hooks(adapter, RunHookType.Start, {})
group_lookup.init(self.manifest, selected_uids)

def after_run(self, adapter, results) -> None:
# in on-run-end hooks, provide the value 'database_schemas', which is a
Expand Down Expand Up @@ -685,17 +691,6 @@ def get_node_selector(self) -> ResourceTypeSelector:
def get_runner_type(self, _) -> Optional[Type[BaseRunner]]:
return ModelRunner

def get_groups_for_nodes(self, nodes):
node_to_group_name_map = {i: k for k, v in self.manifest.group_map.items() for i in v}
group_name_to_group_map = {v.name: v for v in self.manifest.groups.values()}

return {
node.unique_id: group_name_to_group_map.get(node_to_group_name_map.get(node.unique_id))
for node in nodes
}

def task_end_messages(self, results) -> None:
groups = self.get_groups_for_nodes([r.node for r in results if hasattr(r, "node")])

if results:
print_run_end_messages(results, groups=groups)
print_run_end_messages(results)
3 changes: 3 additions & 0 deletions core/dbt/task/seed.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from dbt_common.events.types import Formatting
from dbt_common.exceptions import DbtInternalError

from . import group_lookup
from .printer import print_run_end_messages
from .run import ModelRunner, RunTask

Expand Down Expand Up @@ -42,6 +43,7 @@ def compile(self, manifest: Manifest):
def print_result_line(self, result):
model = result.node
level = EventLevel.ERROR if result.status == NodeStatus.Error else EventLevel.INFO
group = group_lookup.get(model.unique_id)
fire_event(
LogSeedResult(
status=result.status,
Expand All @@ -52,6 +54,7 @@ def print_result_line(self, result):
schema=self.node.schema,
relation=model.alias,
node_info=model.node_info,
group=group,
),
level=level,
)
Expand Down
3 changes: 3 additions & 0 deletions core/dbt/task/snapshot.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from dbt_common.exceptions import DbtInternalError
from dbt_common.utils import cast_dict_to_dict_of_strings

from . import group_lookup
from .run import ModelRunner, RunTask


Expand All @@ -21,6 +22,7 @@ def print_result_line(self, result):
model = result.node
cfg = model.config.to_dict(omit_none=True)
level = EventLevel.ERROR if result.status == NodeStatus.Error else EventLevel.INFO
group = group_lookup.get(model.unique_id)
fire_event(
LogSnapshotResult(
status=result.status,
Expand All @@ -31,6 +33,7 @@ def print_result_line(self, result):
execution_time=result.execution_time,
node_info=model.node_info,
result_message=result.message,
group=group,
),
level=level,
)
Expand Down
Loading

0 comments on commit 1d9ad18

Please sign in to comment.