Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Add OpenTracing for database activity. #10113

Merged
merged 6 commits into from
Jun 3, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/10113.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Report OpenTracing spans for database activity.
6 changes: 6 additions & 0 deletions synapse/logging/opentracing.py
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,12 @@ class SynapseTags:
# HTTP request tag (used to distinguish full vs incremental syncs, etc)
REQUEST_TAG = "request_tag"

# Text description of a database transaction
DB_TXN_DESC = "db.txn_desc"

# Uniqueish ID of a database transaction
DB_TXN_ID = "db.txn_id"


# Block everything by default
# A regex which matches the server_names to expose traces for.
Expand Down
86 changes: 53 additions & 33 deletions synapse/storage/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@

from synapse.api.errors import StoreError
from synapse.config.database import DatabaseConnectionConfig
from synapse.logging import opentracing
from synapse.logging.context import (
LoggingContext,
current_context,
Expand Down Expand Up @@ -313,7 +314,14 @@ def _do_execute(self, func: Callable[..., R], sql: str, *args: Any) -> R:
start = time.time()

try:
return func(sql, *args)
with opentracing.start_active_span(
"db.query",
tags={
opentracing.tags.DATABASE_TYPE: "sql",
opentracing.tags.DATABASE_STATEMENT: sql,
},
):
return func(sql, *args)
except Exception as e:
sql_logger.debug("[SQL FAIL] {%s} %s", self.name, e)
raise
Expand Down Expand Up @@ -525,9 +533,16 @@ def new_transaction(
exception_callbacks=exception_callbacks,
)
try:
r = func(cursor, *args, **kwargs)
conn.commit()
return r
with opentracing.start_active_span(
"db.txn",
tags={
opentracing.SynapseTags.DB_TXN_DESC: desc,
opentracing.SynapseTags.DB_TXN_ID: name,
},
):
r = func(cursor, *args, **kwargs)
conn.commit()
return r
except self.engine.module.OperationalError as e:
# This can happen if the database disappears mid
# transaction.
Expand Down Expand Up @@ -653,16 +668,17 @@ async def runInteraction(
logger.warning("Starting db txn '%s' from sentinel context", desc)

try:
result = await self.runWithConnection(
self.new_transaction,
desc,
after_callbacks,
exception_callbacks,
func,
*args,
db_autocommit=db_autocommit,
**kwargs,
)
with opentracing.start_active_span(f"db.{desc}"):
result = await self.runWithConnection(
self.new_transaction,
desc,
after_callbacks,
exception_callbacks,
func,
*args,
db_autocommit=db_autocommit,
**kwargs,
)

for after_callback, after_args, after_kwargs in after_callbacks:
after_callback(*after_args, **after_kwargs)
Expand Down Expand Up @@ -718,25 +734,29 @@ def inner_func(conn, *args, **kwargs):
with LoggingContext(
str(curr_context), parent_context=parent_context
) as context:
sched_duration_sec = monotonic_time() - start_time
sql_scheduling_timer.observe(sched_duration_sec)
context.add_database_scheduled(sched_duration_sec)

if self.engine.is_connection_closed(conn):
logger.debug("Reconnecting closed database connection")
conn.reconnect()

try:
if db_autocommit:
self.engine.attempt_to_set_autocommit(conn, True)

db_conn = LoggingDatabaseConnection(
conn, self.engine, "runWithConnection"
)
return func(db_conn, *args, **kwargs)
finally:
if db_autocommit:
self.engine.attempt_to_set_autocommit(conn, False)
with opentracing.start_active_span(
operation_name="db.connection",
):
sched_duration_sec = monotonic_time() - start_time
sql_scheduling_timer.observe(sched_duration_sec)
context.add_database_scheduled(sched_duration_sec)

if self.engine.is_connection_closed(conn):
logger.debug("Reconnecting closed database connection")
conn.reconnect()
opentracing.log_kv({"message": "reconnected"})

try:
if db_autocommit:
self.engine.attempt_to_set_autocommit(conn, True)

db_conn = LoggingDatabaseConnection(
conn, self.engine, "runWithConnection"
)
return func(db_conn, *args, **kwargs)
finally:
if db_autocommit:
self.engine.attempt_to_set_autocommit(conn, False)

return await make_deferred_yieldable(
self._db_pool.runWithConnection(inner_func, *args, **kwargs)
Expand Down