Skip to content

Commit

Permalink
CT 1440 Fix code to emit ConnectionReused event (#6605)
Browse files Browse the repository at this point in the history
* Refactor "set_connection_name" to properly handle reused connection

* Update test

* Changie

* Limit test of ConnectionUsed events to non-Windows
  • Loading branch information
gshank authored Jan 17, 2023
1 parent e1b5e68 commit 89d111a
Show file tree
Hide file tree
Showing 6 changed files with 48 additions and 27 deletions.
6 changes: 6 additions & 0 deletions .changes/unreleased/Under the Hood-20230113-132513.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Under the Hood
body: Fix use of ConnectionReused logging event
time: 2023-01-13T13:25:13.023168-05:00
custom:
Author: gshank
Issue: "6168"
52 changes: 26 additions & 26 deletions core/dbt/adapters/base/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,44 +142,44 @@ def exception_handler(self, sql: str) -> ContextManager:
)

def set_connection_name(self, name: Optional[str] = None) -> Connection:
conn_name: str
if name is None:
# if a name isn't specified, we'll re-use a single handle
# named 'master'
conn_name = "master"
else:
if not isinstance(name, str):
raise dbt.exceptions.CompilerException(
f"For connection name, got {name} - not a string!"
)
assert isinstance(name, str)
conn_name = name
"""Called by 'acquire_connection' in BaseAdapter, which is called by
'connection_named', called by 'connection_for(node)'.
Creates a connection for this thread if one doesn't already
exist, and will rename an existing connection."""

conn_name: str = "master" if name is None else name

# Get a connection for this thread
conn = self.get_if_exists()

if conn and conn.name == conn_name and conn.state == "open":
# Found a connection and nothing to do, so just return it
return conn

if conn is None:
# Create a new connection
conn = Connection(
type=Identifier(self.TYPE),
name=None,
name=conn_name,
state=ConnectionState.INIT,
transaction_open=False,
handle=None,
credentials=self.profile.credentials,
)
self.set_thread_connection(conn)

if conn.name == conn_name and conn.state == "open":
return conn

fire_event(
NewConnection(conn_name=conn_name, conn_type=self.TYPE, node_info=get_node_info())
)

if conn.state == "open":
fire_event(ConnectionReused(conn_name=conn_name))
else:
conn.handle = LazyHandle(self.open)
# Add the connection to thread_connections for this thread
self.set_thread_connection(conn)
fire_event(
NewConnection(conn_name=conn_name, conn_type=self.TYPE, node_info=get_node_info())
)
else: # existing connection either wasn't open or didn't have the right name
if conn.state != "open":
conn.handle = LazyHandle(self.open)
if conn.name != conn_name:
orig_conn_name: str = conn.name or ""
conn.name = conn_name
fire_event(ConnectionReused(orig_conn_name=orig_conn_name, conn_name=conn_name))

conn.name = conn_name
return conn

@classmethod
Expand Down
1 change: 1 addition & 0 deletions core/dbt/events/proto_types.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions core/dbt/events/types.proto
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,7 @@ message NewConnectionMsg {
// E006
message ConnectionReused {
string conn_name = 1;
string orig_conn_name = 2;
}

message ConnectionReusedMsg {
Expand Down
2 changes: 1 addition & 1 deletion core/dbt/events/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -449,7 +449,7 @@ def code(self):
return "E006"

def message(self) -> str:
return f"Re-using an available connection from the pool (formerly {self.conn_name})"
return f"Re-using an available connection from the pool (formerly {self.orig_conn_name}, now {self.conn_name})"


@dataclass
Expand Down
13 changes: 13 additions & 0 deletions tests/functional/logging/test_logging.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import pytest
from dbt.tests.util import run_dbt, get_manifest, read_file
import json
import os


my_model_sql = """
Expand All @@ -26,6 +27,7 @@ def test_basic(project, logs_dir):
assert log_file
node_start = False
node_finished = False
connection_reused_data = []
for log_line in log_file.split("\n"):
# skip empty lines
if len(log_line) == 0:
Expand All @@ -36,6 +38,8 @@ def test_basic(project, logs_dir):
log_dct = json.loads(log_line)
log_data = log_dct["data"]
log_event = log_dct["info"]["name"]
if log_event == "ConnectionReused":
connection_reused_data.append(log_data)
if log_event == "NodeStart":
node_start = True
if log_event == "NodeFinished":
Expand All @@ -50,3 +54,12 @@ def test_basic(project, logs_dir):
if log_event == "TimingInfoCollected":
assert "node_info" in log_data
assert "timing_info" in log_data

# windows doesn't have the same thread/connection flow so the ConnectionReused
# events don't show up
if os.name != "nt":
# Verify the ConnectionReused event occurs and has the right data
assert connection_reused_data
for data in connection_reused_data:
assert "conn_name" in data and data["conn_name"]
assert "orig_conn_name" in data and data["orig_conn_name"]

0 comments on commit 89d111a

Please sign in to comment.