Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Faster room joins: Try other destinations when resyncing the state of a partial-state room #12812

Merged
Merged
1 change: 1 addition & 0 deletions changelog.d/12812.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Try other homeservers when re-syncing state for rooms with partial state.
5 changes: 4 additions & 1 deletion synapse/federation/federation_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,9 @@ async def get_room_state_ids(

Returns:
a tuple of (state event_ids, auth event_ids)

Raises:
InvalidResponseError: if fields in the response have the wrong type.
Comment on lines +409 to +410
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this probably needs chasing up the call stack (through _get_state_ids_after_missing_prev_event, _resolve_state_at_missing_prevs, etc) too.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, I'll add a note to _get_state_ids_after_missing_prev_event.
_resolve_state_at_missing_prevs replaces it with a FederationError, which is already noted, so I'll leave that one alone.

"""
result = await self.transport_layer.get_room_state_ids(
destination, room_id, event_id=event_id
Expand All @@ -416,7 +419,7 @@ async def get_room_state_ids(
if not isinstance(state_event_ids, list) or not isinstance(
auth_event_ids, list
):
raise Exception("invalid response from /state_ids")
raise InvalidResponseError("invalid response from /state_ids")
richvdh marked this conversation as resolved.
Show resolved Hide resolved

return state_event_ids, auth_event_ids

Expand Down
86 changes: 78 additions & 8 deletions synapse/handlers/federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,16 @@
import logging
from enum import Enum
from http import HTTPStatus
from typing import TYPE_CHECKING, Dict, Iterable, List, Optional, Tuple, Union
from typing import (
TYPE_CHECKING,
Collection,
Dict,
Iterable,
List,
Optional,
Tuple,
Union,
)

import attr
from signedjson.key import decode_verify_key_bytes
Expand All @@ -34,6 +43,7 @@
CodeMessageException,
Codes,
FederationDeniedError,
FederationError,
HttpResponseException,
NotFoundError,
RequestSendFailed,
Expand Down Expand Up @@ -545,7 +555,8 @@ async def do_invite_join(
run_as_background_process(
desc="sync_partial_state_room",
func=self._sync_partial_state_room,
destination=origin,
initial_destination=origin,
other_destinations=ret.servers_in_room,
room_id=room_id,
)

Expand Down Expand Up @@ -1450,13 +1461,16 @@ async def get_room_complexity(

async def _sync_partial_state_room(
self,
destination: str,
initial_destination: Optional[str],
other_destinations: Collection[str],
room_id: str,
) -> None:
"""Background process to resync the state of a partial-state room

Args:
destination: homeserver to pull the state from
initial_destination: the initial homeserver to pull the state from
other_destinations: other homeservers to try to pull the state from, if
`initial_destination` is unavailable
room_id: room to be resynced
"""

Expand All @@ -1468,8 +1482,29 @@ async def _sync_partial_state_room(
# really leave, that might mean we have difficulty getting the room state over
# federation.
#
# TODO(faster_joins): try other destinations if the one we have fails
# TODO(faster_joins): we need some way of prioritising which homeservers in
# `other_destinations` to try first, otherwise we'll spend ages trying dead
# homeservers for large rooms.

if initial_destination is None and len(other_destinations) == 0:
raise ValueError(
f"Cannot resync state of {room_id}: no destinations provided"
)

# Make an infinite iterator of destinations to try. Once we find a working
# destination, we'll stick with it until it flakes.
if initial_destination is not None:
# Move `initial_destination` to the front of the list.
destinations = list(other_destinations)
if initial_destination in destinations:
destinations.remove(initial_destination)
destinations = [initial_destination] + destinations
destination_iter = itertools.cycle(destinations)
else:
destination_iter = itertools.cycle(other_destinations)

# `destination` is the current remote homeserver we're pulling from.
destination = next(destination_iter)
logger.info("Syncing state for room %s via %s", room_id, destination)

# we work through the queue in order of increasing stream ordering.
Expand Down Expand Up @@ -1505,6 +1540,41 @@ async def _sync_partial_state_room(
allow_rejected=True,
)
for event in events:
await self._federation_event_handler.update_state_for_partial_state_event(
destination, event
)
for attempt in itertools.count():
try:
await self._federation_event_handler.update_state_for_partial_state_event(
destination, event
)
break
except FederationError as e:
if attempt == len(destinations) - 1:
# We have tried every remote server for this event. Give up.
# TODO(faster_joins) giving up isn't the right thing to do
# if there's a temporary network outage. retrying
# indefinitely is also not the right thing to do if we can
# reach all homeservers and they all claim they don't have
# the state we want.
logger.error(
"Failed to get state for %s at %s from %s because %s, "
"giving up!",
room_id,
event,
destination,
e,
)
raise

# Try the next remote server.
logger.info(
"Failed to get state for %s at %s from %s because %s",
room_id,
event,
destination,
e,
)
destination = next(destination_iter)
logger.info(
"Syncing state for room %s via %s instead",
room_id,
destination,
)
11 changes: 11 additions & 0 deletions synapse/handlers/federation_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -505,6 +505,9 @@ async def update_state_for_partial_state_event(
Args:
destination: server to request full state from
event: partial-state event to be de-partial-stated

Raises:
FederationError if we fail to request state from the remote server.
"""
logger.info("Updating state for %s", event.event_id)
with nested_logging_context(suffix=event.event_id):
Expand Down Expand Up @@ -813,6 +816,10 @@ async def _resolve_state_at_missing_prevs(
Returns:
if we already had all the prev events, `None`. Otherwise, returns
the event ids of the state at `event`.

Raises:
FederationError if we fail to get the state from the remote server after any
missing `prev_event`s.
"""
room_id = event.room_id
event_id = event.event_id
Expand Down Expand Up @@ -897,6 +904,10 @@ async def _get_state_ids_after_missing_prev_event(

Returns:
The event ids of the state *after* the given event.

Raises:
InvalidResponseError: if the remote homeserver's response contains fields
of the wrong type.
"""
(
state_event_ids,
Expand Down