Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Structured Logging #4055

Merged
merged 31 commits into from
Nov 9, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
57ae918
init
Oct 13, 2021
f9ef9da
Initial structured logging work with `fire_event` (#4137)
Oct 29, 2021
69aa6bf
context call sites (#4164)
emmyoop Nov 8, 2021
44a9da6
Handle exec info (#4168)
Oct 29, 2021
5b2562a
Client call sites (#4163)
Oct 29, 2021
51d8440
Change Graph logger call sites (#4165)
emmyoop Nov 8, 2021
b141620
contracts call sites (#4166)
emmyoop Nov 1, 2021
d8868c5
Dataclass compatibility (#4180)
iknox-fa Nov 2, 2021
6b36b18
config call sites (#4169)
emmyoop Nov 2, 2021
9857e1d
parser call sites (#4177)
emmyoop Nov 2, 2021
281d249
task call sites part 1 (#4183)
emmyoop Nov 2, 2021
d513491
Show Exception should trigger a stack trace (#4190)
Nov 3, 2021
13f31ae
scrub the secrets (#4203)
Nov 4, 2021
3cafc9e
task callsites: part 2 (#4188)
emmyoop Nov 4, 2021
e0b0eda
deps call sites (#4199)
emmyoop Nov 4, 2021
d8b97c1
call sites in core/dbt (excluding main.py) (#4202)
emmyoop Nov 8, 2021
6334365
trivial logger removal (#4216)
emmyoop Nov 5, 2021
bd3e623
test/integration call sites (#4209)
emmyoop Nov 5, 2021
b5c6f09
remove unused import (#4217)
emmyoop Nov 5, 2021
25c974a
lazy logging in event module (#4210)
Nov 5, 2021
f95e9ef
use event types in main even before the logger is set up. (#4219)
Nov 5, 2021
5cc8626
updates associated with merging main
emmyoop Nov 8, 2021
43b39fd
removed redundant timestamp (#4239)
emmyoop Nov 8, 2021
b2aea11
Struct log for adapter call sites (#4189)
iknox-fa Nov 8, 2021
a40550b
std logger for structured logging (#4231)
Nov 8, 2021
2ca6ce6
whitespace change
Nov 9, 2021
ebb84c4
postgres adapter to use new logger
Nov 9, 2021
683190b
fixes
Nov 9, 2021
31acb95
rebased on main and added new partial parsing event
Nov 9, 2021
5e6be16
configure event logger for integration tests (#4257)
Nov 9, 2021
6dd9c2c
Env var shim to enable legacy logger (#4255)
jtcohen6 Nov 9, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 20 additions & 19 deletions core/dbt/adapters/base/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,17 @@
from dbt.adapters.base.query_headers import (
MacroQueryStringSetter,
)
from dbt.logger import GLOBAL_LOGGER as logger
from dbt.events.functions import fire_event
from dbt.events.types import (
NewConnection,
ConnectionReused,
ConnectionLeftOpen,
ConnectionLeftOpen2,
ConnectionClosed,
ConnectionClosed2,
Rollback,
RollbackFailed
)
from dbt import flags


Expand Down Expand Up @@ -136,14 +146,10 @@ def set_connection_name(self, name: Optional[str] = None) -> Connection:
if conn.name == conn_name and conn.state == 'open':
return conn

logger.debug(
'Acquiring new {} connection "{}".'.format(self.TYPE, conn_name))
fire_event(NewConnection(conn_name=conn_name, conn_type=self.TYPE))

if conn.state == 'open':
logger.debug(
'Re-using an available connection from the pool (formerly {}).'
.format(conn.name)
)
fire_event(ConnectionReused(conn_name=conn_name))
else:
conn.handle = LazyHandle(self.open)

Expand Down Expand Up @@ -190,11 +196,9 @@ def cleanup_all(self) -> None:
with self.lock:
for connection in self.thread_connections.values():
if connection.state not in {'closed', 'init'}:
logger.debug("Connection '{}' was left open."
.format(connection.name))
fire_event(ConnectionLeftOpen(conn_name=connection.name))
else:
logger.debug("Connection '{}' was properly closed."
.format(connection.name))
fire_event(ConnectionClosed(conn_name=connection.name))
self.close(connection)

# garbage collect these connections
Expand All @@ -220,20 +224,17 @@ def _rollback_handle(cls, connection: Connection) -> None:
try:
connection.handle.rollback()
except Exception:
logger.debug(
'Failed to rollback {}'.format(connection.name),
exc_info=True
)
fire_event(RollbackFailed(conn_name=connection.name))

@classmethod
def _close_handle(cls, connection: Connection) -> None:
"""Perform the actual close operation."""
# On windows, sometimes connection handles don't have a close() attr.
if hasattr(connection.handle, 'close'):
logger.debug(f'On {connection.name}: Close')
fire_event(ConnectionClosed2(conn_name=connection.name))
connection.handle.close()
else:
logger.debug(f'On {connection.name}: No close available on handle')
fire_event(ConnectionLeftOpen2(conn_name=connection.name))

@classmethod
def _rollback(cls, connection: Connection) -> None:
Expand All @@ -244,7 +245,7 @@ def _rollback(cls, connection: Connection) -> None:
f'"{connection.name}", but it does not have one open!'
)

logger.debug(f'On {connection.name}: ROLLBACK')
fire_event(Rollback(conn_name=connection.name))
cls._rollback_handle(connection)

connection.transaction_open = False
Expand All @@ -256,7 +257,7 @@ def close(cls, connection: Connection) -> Connection:
return connection

if connection.transaction_open and connection.handle:
logger.debug('On {}: ROLLBACK'.format(connection.name))
fire_event(Rollback(conn_name=connection.name))
cls._rollback_handle(connection)
connection.transaction_open = False

Expand Down
15 changes: 9 additions & 6 deletions core/dbt/adapters/base/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@
from dbt.contracts.graph.manifest import Manifest, MacroManifest
from dbt.contracts.graph.parsed import ParsedSeedNode
from dbt.exceptions import warn_or_error
from dbt.logger import GLOBAL_LOGGER as logger
from dbt.events.functions import fire_event
from dbt.events.types import CacheMiss, ListRelations
from dbt.utils import filter_null_values, executor

from dbt.adapters.base.connections import Connection, AdapterResponse
Expand Down Expand Up @@ -288,9 +289,12 @@ def _schema_is_cached(self, database: Optional[str], schema: str) -> bool:
"""Check if the schema is cached, and by default logs if it is not."""

if (database, schema) not in self.cache:
logger.debug(
'On "{}": cache miss for schema "{}.{}", this is inefficient'
.format(self.nice_connection_name(), database, schema)
fire_event(
CacheMiss(
conn_name=self.nice_connection_name,
database=database,
schema=schema
)
)
return False
else:
Expand Down Expand Up @@ -672,9 +676,8 @@ def list_relations(
relations = self.list_relations_without_caching(
schema_relation
)
fire_event(ListRelations(database=database, schema=schema, relations=relations))

logger.debug('with database={}, schema={}, relations={}'
.format(database, schema, relations))
return relations

def _make_match_kwargs(
Expand Down
75 changes: 34 additions & 41 deletions core/dbt/adapters/cache.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,26 @@
import threading
from collections import namedtuple
from copy import deepcopy
from typing import List, Iterable, Optional, Dict, Set, Tuple, Any
import threading
from typing import Any, Dict, Iterable, List, Optional, Set, Tuple

from dbt.logger import CACHE_LOGGER as logger
from dbt.utils import lowercase
import dbt.exceptions
from dbt.events.functions import fire_event
from dbt.events.types import (
AddLink,
AddRelation,
DropCascade,
DropMissingRelation,
DropRelation,
DumpAfterAddGraph,
DumpAfterRenameSchema,
DumpBeforeAddGraph,
DumpBeforeRenameSchema,
RenameSchema,
TemporaryRelation,
UncachedRelation,
UpdateReference
)
from dbt.utils import lowercase

_ReferenceKey = namedtuple('_ReferenceKey', 'database schema identifier')

Expand Down Expand Up @@ -157,12 +172,6 @@ def dump_graph_entry(self):
return [dot_separated(r) for r in self.referenced_by]


def lazy_log(msg, func):
if logger.disabled:
return
logger.debug(msg.format(func()))


class RelationsCache:
"""A cache of the relations known to dbt. Keeps track of relationships
declared between tables and handles renames/drops as a real database would.
Expand Down Expand Up @@ -278,6 +287,7 @@ def _add_link(self, referenced_key, dependent_key):

referenced.add_reference(dependent)

# TODO: Is this dead code? I can't seem to find it grepping the codebase.
def add_link(self, referenced, dependent):
"""Add a link between two relations to the database. If either relation
does not exist, it will be added as an "external" relation.
Expand All @@ -297,11 +307,7 @@ def add_link(self, referenced, dependent):
# if we have not cached the referenced schema at all, we must be
# referring to a table outside our control. There's no need to make
# a link - we will never drop the referenced relation during a run.
logger.debug(
'{dep!s} references {ref!s} but {ref.database}.{ref.schema} '
'is not in the cache, skipping assumed external relation'
.format(dep=dependent, ref=ref_key)
)
fire_event(UncachedRelation(dep_key=dependent, ref_key=ref_key))
return
if ref_key not in self.relations:
# Insert a dummy "external" relation.
Expand All @@ -317,9 +323,7 @@ def add_link(self, referenced, dependent):
type=referenced.External
)
self.add(dependent)
logger.debug(
'adding link, {!s} references {!s}'.format(dep_key, ref_key)
)
fire_event(AddLink(dep_key=dep_key, ref_key=ref_key))
with self.lock:
self._add_link(ref_key, dep_key)

Expand All @@ -330,14 +334,12 @@ def add(self, relation):
:param BaseRelation relation: The underlying relation.
"""
cached = _CachedRelation(relation)
logger.debug('Adding relation: {!s}'.format(cached))

lazy_log('before adding: {!s}', self.dump_graph)
fire_event(AddRelation(relation=cached))
fire_event(DumpBeforeAddGraph(graph_func=self.dump_graph))

with self.lock:
self._setdefault(cached)

lazy_log('after adding: {!s}', self.dump_graph)
fire_event(DumpAfterAddGraph(graph_func=self.dump_graph))

def _remove_refs(self, keys):
"""Removes all references to all entries in keys. This does not
Expand All @@ -359,13 +361,10 @@ def _drop_cascade_relation(self, dropped):
:param _CachedRelation dropped: An existing _CachedRelation to drop.
"""
if dropped not in self.relations:
logger.debug('dropped a nonexistent relationship: {!s}'
.format(dropped))
fire_event(DropMissingRelation(relation=dropped))
return
consequences = self.relations[dropped].collect_consequences()
logger.debug(
'drop {} is cascading to {}'.format(dropped, consequences)
)
fire_event(DropCascade(dropped=dropped, consequences=consequences))
self._remove_refs(consequences)

def drop(self, relation):
Expand All @@ -380,7 +379,7 @@ def drop(self, relation):
:param str identifier: The identifier of the relation to drop.
"""
dropped = _make_key(relation)
logger.debug('Dropping relation: {!s}'.format(dropped))
fire_event(DropRelation(dropped=dropped))
with self.lock:
self._drop_cascade_relation(dropped)

Expand All @@ -403,9 +402,8 @@ def _rename_relation(self, old_key, new_relation):
# update all the relations that refer to it
for cached in self.relations.values():
if cached.is_referenced_by(old_key):
logger.debug(
'updated reference from {0} -> {2} to {1} -> {2}'
.format(old_key, new_key, cached.key())
fire_event(
UpdateReference(old_key=old_key, new_key=new_key, cached_key=cached.key())
)
cached.rename_key(old_key, new_key)

Expand Down Expand Up @@ -435,10 +433,7 @@ def _check_rename_constraints(self, old_key, new_key):
)

if old_key not in self.relations:
logger.debug(
'old key {} not found in self.relations, assuming temporary'
.format(old_key)
)
fire_event(TemporaryRelation(key=old_key))
return False
return True

Expand All @@ -456,19 +451,17 @@ def rename(self, old, new):
"""
old_key = _make_key(old)
new_key = _make_key(new)
logger.debug('Renaming relation {!s} to {!s}'.format(
old_key, new_key
))
fire_event(RenameSchema(old_key=old_key, new_key=new_key))

lazy_log('before rename: {!s}', self.dump_graph)
fire_event(DumpBeforeRenameSchema(graph_func=self.dump_graph))

with self.lock:
if self._check_rename_constraints(old_key, new_key):
self._rename_relation(old_key, _CachedRelation(new))
else:
self._setdefault(_CachedRelation(new))

lazy_log('after rename: {!s}', self.dump_graph)
fire_event(DumpAfterRenameSchema(graph_func=self.dump_graph))

def get_relations(
self, database: Optional[str], schema: Optional[str]
Expand Down
10 changes: 5 additions & 5 deletions core/dbt/adapters/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,9 @@
PACKAGE_PATH as GLOBAL_PROJECT_PATH,
PROJECT_NAME as GLOBAL_PROJECT_NAME,
)
from dbt.logger import GLOBAL_LOGGER as logger
from dbt.events.functions import fire_event
from dbt.events.types import AdapterImportError, PluginLoadError
from dbt.contracts.connection import Credentials, AdapterRequiredConfig


from dbt.adapters.protocol import (
AdapterProtocol,
AdapterConfig,
Expand Down Expand Up @@ -67,11 +66,12 @@ def load_plugin(self, name: str) -> Type[Credentials]:
# if we failed to import the target module in particular, inform
# the user about it via a runtime error
if exc.name == 'dbt.adapters.' + name:
fire_event(AdapterImportError(exc=exc))
raise RuntimeException(f'Could not find adapter type {name}!')
logger.info(f'Error importing adapter: {exc}')
# otherwise, the error had to have come from some underlying
# library. Log the stack trace.
logger.debug('', exc_info=True)

fire_event(PluginLoadError())
raise
plugin: AdapterPlugin = mod.Plugin
plugin_type = plugin.adapter.type()
Expand Down
24 changes: 10 additions & 14 deletions core/dbt/adapters/sql/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@
from dbt.contracts.connection import (
Connection, ConnectionState, AdapterResponse
)
from dbt.logger import GLOBAL_LOGGER as logger
from dbt.events.functions import fire_event
from dbt.events.types import ConnectionUsed, SQLQuery, SQLCommit, SQLQueryStatus


class SQLConnectionManager(BaseConnectionManager):
Expand Down Expand Up @@ -58,29 +59,24 @@ def add_query(
connection = self.get_thread_connection()
if auto_begin and connection.transaction_open is False:
self.begin()

logger.debug('Using {} connection "{}".'
.format(self.TYPE, connection.name))
fire_event(ConnectionUsed(conn_type=self.TYPE, conn_name=connection.name))

with self.exception_handler(sql):
if abridge_sql_log:
log_sql = '{}...'.format(sql[:512])
else:
log_sql = sql

logger.debug(
'On {connection_name}: {sql}',
connection_name=connection.name,
sql=log_sql,
)
fire_event(SQLQuery(conn_name=connection.name, sql=log_sql))
pre = time.time()

cursor = connection.handle.cursor()
cursor.execute(sql, bindings)
logger.debug(
"SQL status: {status} in {elapsed:0.2f} seconds",
status=self.get_response(cursor),
elapsed=(time.time() - pre)

fire_event(
SQLQueryStatus(
status=self.get_response(cursor), elapsed=round((time.time() - pre), 2)
)
)

return connection, cursor
Expand Down Expand Up @@ -160,7 +156,7 @@ def commit(self):
'Tried to commit transaction on connection "{}", but '
'it does not have one open!'.format(connection.name))

logger.debug('On {}: COMMIT'.format(connection.name))
fire_event(SQLCommit(conn_name=connection.name))
self.add_commit_query()

connection.transaction_open = False
Expand Down
Loading