Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Fix the inbound PDU metric #10279

Merged
merged 2 commits into from
Jun 30, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/10279.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix the prometheus `synapse_federation_server_pdu_process_time` metric. Broke in v1.37.1.
37 changes: 20 additions & 17 deletions synapse/federation/federation_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -368,22 +368,21 @@ async def process_pdus_for_room(room_id: str):

async def process_pdu(pdu: EventBase) -> JsonDict:
event_id = pdu.event_id
with pdu_process_time.time():
with nested_logging_context(event_id):
try:
await self._handle_received_pdu(origin, pdu)
return {}
except FederationError as e:
logger.warning("Error handling PDU %s: %s", event_id, e)
return {"error": str(e)}
except Exception as e:
f = failure.Failure()
logger.error(
"Failed to handle PDU %s",
event_id,
exc_info=(f.type, f.value, f.getTracebackObject()), # type: ignore
)
return {"error": str(e)}
with nested_logging_context(event_id):
try:
await self._handle_received_pdu(origin, pdu)
return {}
except FederationError as e:
logger.warning("Error handling PDU %s: %s", event_id, e)
return {"error": str(e)}
except Exception as e:
f = failure.Failure()
logger.error(
"Failed to handle PDU %s",
event_id,
exc_info=(f.type, f.value, f.getTracebackObject()), # type: ignore
)
return {"error": str(e)}

await concurrently_execute(
process_pdus_for_room, pdus_by_room.keys(), TRANSACTION_CONCURRENCY_LIMIT
Expand Down Expand Up @@ -909,9 +908,13 @@ async def _process_incoming_pdus_in_room_inner(
exc_info=(f.type, f.value, f.getTracebackObject()), # type: ignore
)

await self.store.remove_received_event_from_staging(
received_ts = await self.store.remove_received_event_from_staging(
origin, event.event_id
)
if received_ts is not None:
pdu_process_time.observe(
(self._clock.time_msec() - received_ts) / 1000
)

# We need to do this check outside the lock to avoid a race between
# a new event being inserted by another instance and it attempting
Expand Down
66 changes: 56 additions & 10 deletions synapse/storage/databases/main/event_federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -1075,16 +1075,62 @@ async def remove_received_event_from_staging(
self,
origin: str,
event_id: str,
) -> None:
"""Remove the given event from the staging area"""
await self.db_pool.simple_delete(
table="federation_inbound_events_staging",
keyvalues={
"origin": origin,
"event_id": event_id,
},
desc="remove_received_event_from_staging",
)
) -> Optional[int]:
"""Remove the given event from the staging area.

Returns:
The received_ts of the row that was deleted, if any.
"""
if self.db_pool.engine.supports_returning:

def _remove_received_event_from_staging_txn(txn):
sql = """
DELETE FROM federation_inbound_events_staging
WHERE origin = ? AND event_id = ?
RETURNING received_ts
"""

txn.execute(sql, (origin, event_id))
return txn.fetchone()

row = await self.db_pool.runInteraction(
"remove_received_event_from_staging",
_remove_received_event_from_staging_txn,
db_autocommit=True,
)
if row is None:
return None

return row[0]

else:

def _remove_received_event_from_staging_txn(txn):
received_ts = self.db_pool.simple_select_one_onecol_txn(
txn,
table="federation_inbound_events_staging",
keyvalues={
"origin": origin,
"event_id": event_id,
},
retcol="received_ts",
allow_none=True,
)
self.db_pool.simple_delete_txn(
txn,
table="federation_inbound_events_staging",
keyvalues={
"origin": origin,
"event_id": event_id,
},
)

return received_ts

return await self.db_pool.runInteraction(
"remove_received_event_from_staging",
_remove_received_event_from_staging_txn,
)

async def get_next_staged_event_id_for_room(
self,
Expand Down
6 changes: 6 additions & 0 deletions synapse/storage/engines/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,12 @@ def supports_using_any_list(self) -> bool:
"""
...

@property
@abc.abstractmethod
def supports_returning(self) -> bool:
"""Do we support the `RETURNING` clause in insert/update/delete?"""
...

@abc.abstractmethod
def check_database(
self, db_conn: ConnectionType, allow_outdated_version: bool = False
Expand Down
5 changes: 5 additions & 0 deletions synapse/storage/engines/postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,11 @@ def supports_using_any_list(self):
"""Do we support using `a = ANY(?)` and passing a list"""
return True

@property
def supports_returning(self) -> bool:
"""Do we support the `RETURNING` clause in insert/update/delete?"""
return True

def is_deadlock(self, error):
if isinstance(error, self.module.DatabaseError):
# https://www.postgresql.org/docs/current/static/errcodes-appendix.html
Expand Down
5 changes: 5 additions & 0 deletions synapse/storage/engines/sqlite.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,11 @@ def supports_using_any_list(self):
"""Do we support using `a = ANY(?)` and passing a list"""
return False

@property
def supports_returning(self) -> bool:
"""Do we support the `RETURNING` clause in insert/update/delete?"""
return self.module.sqlite_version_info >= (3, 35, 0)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


def check_database(self, db_conn, allow_outdated_version: bool = False):
if not allow_outdated_version:
version = self.module.sqlite_version_info
Expand Down