Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Prune inbound federation queues if they get too long #10390

Merged
merged 6 commits into from
Aug 2, 2021
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 28 additions & 6 deletions synapse/storage/databases/main/event_federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -1238,7 +1238,7 @@ async def prune_staged_events_in_room(

# If the queue is too large, then we want clear the entire queue,
# keeping only the forward extremities (i.e. the events not referenced
# by other events in the queue). We do this as that we can always
# by other events in the queue). We do this so that we can always
# backpaginate in all the events we have dropped.
rows = await self.db_pool.simple_select_list(
table="federation_inbound_events_staging",
Expand All @@ -1252,17 +1252,39 @@ async def prune_staged_events_in_room(
referenced_events: Set[str] = set()
seen_events: Set[str] = set()
for row in rows:
seen_events.add(row["event_id"])
event_id = row["event_id"]
seen_events.add(event_id)
event_d = db_to_json(row["event_json"])

# We don't bother parsing the dicts into full blown event objects,
# as that is needlessly expensive.

# We haven't checked that the `prev_events` have the right format
# yet, so we check as we go.
prev_events = event_d.get("prev_events", [])
if not isinstance(prev_events, list):
logger.info("Invalid prev_events for %s", event_id)
continue
Comment on lines +1262 to +1267
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems like we're going to end up logging this every time we get a new event, potentially for every event in the room. Shouldn't we just drop the offending event? (or better yet, check it before we get this far).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeeees, ideally we'd have checked long before here that the Event was valid (given we'd already have parsed the JSON into an event object before persistence). I'm slightly hesitant about dropping the "invalid" events here in case we update the format of events and forget to change it here, which would cause us to sometimes drop those events (as this function does not get called for each inbound event).


if room_version.event_format == EventFormatVersions.V1:
referenced_events.update(
event_id for event_id, _ in event_d.get("prev_events", [])
)
for prev_event_tuple in prev_events:
if not isinstance(prev_event_tuple, list) or len(prev_events) != 2:
logger.info("Invalid prev_events for %s", event_id)
break

prev_event_id = prev_event_tuple[0]
if not isinstance(prev_event_id, str):
logger.info("Invalid prev_events for %s", event_id)
break

referenced_events.add(prev_event_id)
else:
referenced_events.update(event_d.get("prev_events", []))
for prev_event_id in prev_events:
if not isinstance(prev_event_id, str):
logger.info("Invalid prev_events for %s", event_id)
break

referenced_events.add(prev_event_id)

to_delete = referenced_events & seen_events
if not to_delete:
Expand Down