From 2fba1076c56e76410fd901120f0e8df2ef33d1c4 Mon Sep 17 00:00:00 2001 From: Sean Quah <8349537+squahtx@users.noreply.github.com> Date: Tue, 31 May 2022 15:50:29 +0100 Subject: [PATCH] Faster room joins: Try other destinations when resyncing the state of a partial-state room (#12812) Signed-off-by: Sean Quah --- changelog.d/12812.misc | 1 + synapse/federation/federation_client.py | 5 +- synapse/handlers/federation.py | 86 ++++++++++++++++++++++--- synapse/handlers/federation_event.py | 11 ++++ 4 files changed, 94 insertions(+), 9 deletions(-) create mode 100644 changelog.d/12812.misc diff --git a/changelog.d/12812.misc b/changelog.d/12812.misc new file mode 100644 index 000000000000..53cb936a022b --- /dev/null +++ b/changelog.d/12812.misc @@ -0,0 +1 @@ +Try other homeservers when re-syncing state for rooms with partial state. diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 17eff60909a2..b60b8983ea41 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -405,6 +405,9 @@ async def get_room_state_ids( Returns: a tuple of (state event_ids, auth event_ids) + + Raises: + InvalidResponseError: if fields in the response have the wrong type. """ result = await self.transport_layer.get_room_state_ids( destination, room_id, event_id=event_id @@ -416,7 +419,7 @@ async def get_room_state_ids( if not isinstance(state_event_ids, list) or not isinstance( auth_event_ids, list ): - raise Exception("invalid response from /state_ids") + raise InvalidResponseError("invalid response from /state_ids") return state_event_ids, auth_event_ids diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 80ee7e7b4e7c..b4b63a342ad5 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -20,7 +20,16 @@ import logging from enum import Enum from http import HTTPStatus -from typing import TYPE_CHECKING, Dict, Iterable, List, Optional, Tuple, Union +from typing import ( + TYPE_CHECKING, + Collection, + Dict, + Iterable, + List, + Optional, + Tuple, + Union, +) import attr from signedjson.key import decode_verify_key_bytes @@ -34,6 +43,7 @@ CodeMessageException, Codes, FederationDeniedError, + FederationError, HttpResponseException, NotFoundError, RequestSendFailed, @@ -545,7 +555,8 @@ async def do_invite_join( run_as_background_process( desc="sync_partial_state_room", func=self._sync_partial_state_room, - destination=origin, + initial_destination=origin, + other_destinations=ret.servers_in_room, room_id=room_id, ) @@ -1454,13 +1465,16 @@ async def get_room_complexity( async def _sync_partial_state_room( self, - destination: str, + initial_destination: Optional[str], + other_destinations: Collection[str], room_id: str, ) -> None: """Background process to resync the state of a partial-state room Args: - destination: homeserver to pull the state from + initial_destination: the initial homeserver to pull the state from + other_destinations: other homeservers to try to pull the state from, if + `initial_destination` is unavailable room_id: room to be resynced """ @@ -1472,8 +1486,29 @@ async def _sync_partial_state_room( # really leave, that might mean we have difficulty getting the room state over # federation. # - # TODO(faster_joins): try other destinations if the one we have fails + # TODO(faster_joins): we need some way of prioritising which homeservers in + # `other_destinations` to try first, otherwise we'll spend ages trying dead + # homeservers for large rooms. + + if initial_destination is None and len(other_destinations) == 0: + raise ValueError( + f"Cannot resync state of {room_id}: no destinations provided" + ) + # Make an infinite iterator of destinations to try. Once we find a working + # destination, we'll stick with it until it flakes. + if initial_destination is not None: + # Move `initial_destination` to the front of the list. + destinations = list(other_destinations) + if initial_destination in destinations: + destinations.remove(initial_destination) + destinations = [initial_destination] + destinations + destination_iter = itertools.cycle(destinations) + else: + destination_iter = itertools.cycle(other_destinations) + + # `destination` is the current remote homeserver we're pulling from. + destination = next(destination_iter) logger.info("Syncing state for room %s via %s", room_id, destination) # we work through the queue in order of increasing stream ordering. @@ -1511,6 +1546,41 @@ async def _sync_partial_state_room( allow_rejected=True, ) for event in events: - await self._federation_event_handler.update_state_for_partial_state_event( - destination, event - ) + for attempt in itertools.count(): + try: + await self._federation_event_handler.update_state_for_partial_state_event( + destination, event + ) + break + except FederationError as e: + if attempt == len(destinations) - 1: + # We have tried every remote server for this event. Give up. + # TODO(faster_joins) giving up isn't the right thing to do + # if there's a temporary network outage. retrying + # indefinitely is also not the right thing to do if we can + # reach all homeservers and they all claim they don't have + # the state we want. + logger.error( + "Failed to get state for %s at %s from %s because %s, " + "giving up!", + room_id, + event, + destination, + e, + ) + raise + + # Try the next remote server. + logger.info( + "Failed to get state for %s at %s from %s because %s", + room_id, + event, + destination, + e, + ) + destination = next(destination_iter) + logger.info( + "Syncing state for room %s via %s instead", + room_id, + destination, + ) diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py index b9086745298a..549b066dd9dc 100644 --- a/synapse/handlers/federation_event.py +++ b/synapse/handlers/federation_event.py @@ -505,6 +505,9 @@ async def update_state_for_partial_state_event( Args: destination: server to request full state from event: partial-state event to be de-partial-stated + + Raises: + FederationError if we fail to request state from the remote server. """ logger.info("Updating state for %s", event.event_id) with nested_logging_context(suffix=event.event_id): @@ -815,6 +818,10 @@ async def _resolve_state_at_missing_prevs( Returns: if we already had all the prev events, `None`. Otherwise, returns the event ids of the state at `event`. + + Raises: + FederationError if we fail to get the state from the remote server after any + missing `prev_event`s. """ room_id = event.room_id event_id = event.event_id @@ -901,6 +908,10 @@ async def _get_state_ids_after_missing_prev_event( Returns: The event ids of the state *after* the given event. + + Raises: + InvalidResponseError: if the remote homeserver's response contains fields + of the wrong type. """ ( state_event_ids,