Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Increase perf of handling presence when joining large rooms. #9916

Merged
merged 9 commits into from
May 5, 2021
1 change: 1 addition & 0 deletions changelog.d/9910.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Improve performance after joining a large room when presence is enabled.
1 change: 1 addition & 0 deletions changelog.d/9916.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Improve performance of handling presence when joining large rooms.
154 changes: 82 additions & 72 deletions synapse/handlers/presence.py
Original file line number Diff line number Diff line change
Expand Up @@ -1183,7 +1183,16 @@ async def _unsafe_process(self) -> None:
max_pos, deltas = await self.store.get_current_state_deltas(
self._event_pos, room_max_stream_ordering
)
await self._handle_state_delta(deltas)

# We may get multiple deltas for different rooms, but we want to
# handle them on a room by room basis, so we batch them up by
# room.
deltas_by_room: Dict[str, List[JsonDict]] = {}
for delta in deltas:
deltas_by_room.setdefault(delta["room_id"], []).append(delta)

for room_id, deltas_for_room in deltas_by_room.items():
await self._handle_state_delta(room_id, deltas_for_room)

self._event_pos = max_pos

Expand All @@ -1192,17 +1201,21 @@ async def _unsafe_process(self) -> None:
max_pos
)

async def _handle_state_delta(self, deltas: List[JsonDict]) -> None:
"""Process current state deltas to find new joins that need to be
handled.
async def _handle_state_delta(self, room_id: str, deltas: List[JsonDict]) -> None:
"""Process current state deltas for the room to find new joins that need
to be handled.
"""
# A map of destination to a set of user state that they should receive
presence_destinations = {} # type: Dict[str, Set[UserPresenceState]]

# Sets of newly joined users. Note that if the local server is
# joining a remote room for the first time we'll see both the joining
# user and all remote users as newly joined.
newly_joined_users = set()

for delta in deltas:
assert room_id == delta["room_id"]

typ = delta["type"]
state_key = delta["state_key"]
room_id = delta["room_id"]
event_id = delta["event_id"]
prev_event_id = delta["prev_event_id"]

Expand Down Expand Up @@ -1231,72 +1244,55 @@ async def _handle_state_delta(self, deltas: List[JsonDict]) -> None:
# Ignore changes to join events.
continue

# Retrieve any user presence state updates that need to be sent as a result,
# and the destinations that need to receive it
destinations, user_presence_states = await self._on_user_joined_room(
room_id, state_key
)

# Insert the destinations and respective updates into our destinations dict
for destination in destinations:
presence_destinations.setdefault(destination, set()).update(
user_presence_states
)

# Send out user presence updates for each destination
for destination, user_state_set in presence_destinations.items():
self._federation_queue.send_presence_to_destinations(
destinations=[destination], states=user_state_set
)

async def _on_user_joined_room(
self, room_id: str, user_id: str
) -> Tuple[List[str], List[UserPresenceState]]:
"""Called when we detect a user joining the room via the current state
delta stream. Returns the destinations that need to be updated and the
presence updates to send to them.

Args:
room_id: The ID of the room that the user has joined.
user_id: The ID of the user that has joined the room.

Returns:
A tuple of destinations and presence updates to send to them.
"""
if self.is_mine_id(user_id):
# If this is a local user then we need to send their presence
# out to hosts in the room (who don't already have it)

# TODO: We should be able to filter the hosts down to those that
# haven't previously seen the user

remote_hosts = await self.state.get_current_hosts_in_room(room_id)
newly_joined_users.add(state_key)

# Filter out ourselves.
filtered_remote_hosts = [
host for host in remote_hosts if host != self.server_name
]

state = await self.current_state_for_user(user_id)
return filtered_remote_hosts, [state]
else:
# A remote user has joined the room, so we need to:
# 1. Check if this is a new server in the room
# 2. If so send any presence they don't already have for
# local users in the room.

# TODO: We should be able to filter the users down to those that
# the server hasn't previously seen

# TODO: Check that this is actually a new server joining the
# room.

remote_host = get_domain_from_id(user_id)
if not newly_joined_users:
# If nobody has joined then there's nothing to do.
return

users = await self.state.get_current_users_in_room(room_id)
user_ids = list(filter(self.is_mine_id, users))
# We want to send:
# 1. presence states of all local users in the room to newly joined
# remote servers
# 2. presence states of newly joined users to all remote servers in
# the room.
#
# TODO: Only send presence states to remote hosts that don't already
# have them (because they already share rooms).

# Get all the users who were already in the room, by fetching the
# current users in the room and removing the newly joined users.
users = await self.store.get_users_in_room(room_id)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we rely on all deltas being for the same room id? and can we rely on there being at least one delta (so that room_id is set)?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, that is a good point. I don't think we can rely on there only being one room per call (though that is the common path).

prev_users = set(users) - newly_joined_users

# Construct sets for all the local users and remote hosts that were
# already in the room
prev_local_users = []
prev_remote_hosts = set()
for user_id in prev_users:
if self.is_mine_id(user_id):
prev_local_users.append(user_id)
else:
prev_remote_hosts.add(get_domain_from_id(user_id))

# Similarly, construct sets for all the local users and remote hosts
# that were *not* already in the room. Care needs to be taken with the
# calculating the remote hosts, as a host may have already been in the
# room even if there is a newly joined user from that host.
newly_joined_local_users = []
newly_joined_remote_hosts = set()
for user_id in newly_joined_users:
if self.is_mine_id(user_id):
newly_joined_local_users.append(user_id)
else:
host = get_domain_from_id(user_id)
if host not in prev_remote_hosts:
newly_joined_remote_hosts.add(host)

states_d = await self.current_state_for_users(user_ids)
# Send presence states of all local users in the room to newly joined
# remote servers. (We actually only send states for local users already
# in the room, as we'll send states for newly joined local users below.)
if prev_local_users and newly_joined_remote_hosts:
local_states = await self.current_state_for_users(prev_local_users)

# Filter out old presence, i.e. offline presence states where
# the user hasn't been active for a week. We can change this
Expand All @@ -1306,13 +1302,27 @@ async def _on_user_joined_room(
now = self.clock.time_msec()
states = [
state
for state in states_d.values()
for state in local_states.values()
if state.state != PresenceState.OFFLINE
or now - state.last_active_ts < 7 * 24 * 60 * 60 * 1000
or state.status_msg is not None
]

return [remote_host], states
self._federation_queue.send_presence_to_destinations(
destinations=newly_joined_remote_hosts,
states=states,
)

# Send presence states of newly joined users to all remote servers in
# the room
if newly_joined_local_users and (
prev_remote_hosts or newly_joined_remote_hosts
):
local_states = await self.current_state_for_users(newly_joined_local_users)
self._federation_queue.send_presence_to_destinations(
destinations=prev_remote_hosts | newly_joined_remote_hosts,
states=list(local_states.values()),
)


def should_notify(old_state: UserPresenceState, new_state: UserPresenceState) -> bool:
Expand Down
14 changes: 4 additions & 10 deletions tests/handlers/test_presence.py
Original file line number Diff line number Diff line change
Expand Up @@ -729,7 +729,7 @@ def test_remote_joins(self):
)
self.assertEqual(expected_state.state, PresenceState.ONLINE)
self.federation_sender.send_presence_to_destinations.assert_called_once_with(
destinations=["server2"], states={expected_state}
destinations={"server2"}, states=[expected_state]
)

#
Expand All @@ -740,7 +740,7 @@ def test_remote_joins(self):
self._add_new_user(room_id, "@bob:server3")

self.federation_sender.send_presence_to_destinations.assert_called_once_with(
destinations=["server3"], states={expected_state}
destinations={"server3"}, states=[expected_state]
)

def test_remote_gets_presence_when_local_user_joins(self):
Expand Down Expand Up @@ -788,14 +788,8 @@ def test_remote_gets_presence_when_local_user_joins(self):
self.presence_handler.current_state_for_user("@test2:server")
)
self.assertEqual(expected_state.state, PresenceState.ONLINE)
self.assertEqual(
self.federation_sender.send_presence_to_destinations.call_count, 2
)
self.federation_sender.send_presence_to_destinations.assert_any_call(
destinations=["server3"], states={expected_state}
)
self.federation_sender.send_presence_to_destinations.assert_any_call(
destinations=["server2"], states={expected_state}
self.federation_sender.send_presence_to_destinations.assert_called_once_with(
destinations={"server2", "server3"}, states=[expected_state]
)

def _add_new_user(self, room_id, user_id):
Expand Down