Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Factor out common code for persisting fetched auth events #10896

46 changes: 32 additions & 14 deletions synapse/handlers/federation_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -1107,7 +1107,7 @@ async def _get_events_and_persist(

room_version = await self._store.get_room_version(room_id)

event_map: Dict[str, EventBase] = {}
events: List[EventBase] = []

async def get_event(event_id: str) -> None:
with nested_logging_context(event_id):
Expand All @@ -1125,8 +1125,7 @@ async def get_event(event_id: str) -> None:
event_id,
)
return

event_map[event.event_id] = event
events.append(event)

except Exception as e:
logger.warning(
Expand All @@ -1137,7 +1136,30 @@ async def get_event(event_id: str) -> None:
)

await concurrently_execute(get_event, event_ids, 5)
logger.info("Fetched %i events of %i requested", len(event_map), len(event_ids))
logger.info("Fetched %i events of %i requested", len(events), len(event_ids))
await self._auth_and_persist_fetched_events(destination, room_id, events)

async def _auth_and_persist_fetched_events(
self, origin: str, room_id: str, events: Iterable[EventBase]
) -> None:
"""Persist the events fetched by _get_events_and_persist.

The events should not depend on one another, e.g. this should be used to persist
richvdh marked this conversation as resolved.
Show resolved Hide resolved
a bunch of outliers, but not a chunk of individual events that depend
on each other for state calculations.

We first sort the events so that they do not depend on each other, then
persist them.
richvdh marked this conversation as resolved.
Show resolved Hide resolved

Notifies about the events where appropriate.

Params:
origin: where the events came from
room_id: the room that the events are meant to be in (though this has
not yet been checked)
events: the events that have been fetched
"""
event_map = {event.event_id: event for event in events}

# we now need to auth the events in an order which ensures that each event's
# auth_events are authed before the event itself.
Expand Down Expand Up @@ -1168,30 +1190,26 @@ async def get_event(event_id: str) -> None:
"Persisting %i of %i remaining events", len(roots), len(event_map)
)

await self._auth_and_persist_fetched_events(destination, room_id, roots)
await self._auth_and_persist_fetched_events_inner(origin, room_id, roots)
richvdh marked this conversation as resolved.
Show resolved Hide resolved

for ev in roots:
del event_map[ev.event_id]

async def _auth_and_persist_fetched_events(
async def _auth_and_persist_fetched_events_inner(
self, origin: str, room_id: str, fetched_events: Collection[EventBase]
) -> None:
"""Persist the events fetched by _get_events_and_persist.

The events should not depend on one another, e.g. this should be used to persist
a bunch of outliers, but not a chunk of individual events that depend
on each other for state calculations.
"""Helper for _auth_and_persist_fetched_events

We also assume that all of the auth events for all of the events have already
been persisted.
Persists a batch of events where we have (theoretically) already persisted all
of their auth events.

Notifies about the events where appropriate.

Params:
origin: where the events came from
room_id: the room that the events are meant to be in (though this has
not yet been checked)
event_id: map from event_id -> event for the fetched events
fetched_events: the events to persist
"""
# get all the auth events for all the events in this batch. By now, they should
# have been persisted.
Expand Down