Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Optimise calculating device_list changes in /sync. #11974

Merged
merged 10 commits into from
Feb 15, 2022
1 change: 1 addition & 0 deletions changelog.d/11974.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Optimise calculating device_list changes in `/sync`.
68 changes: 53 additions & 15 deletions synapse/handlers/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -1289,23 +1289,54 @@ async def _generate_sync_entry_for_device_list(
# room with by looking at all users that have left a room plus users
# that were in a room we've left.

users_who_share_room = await self.store.get_users_who_share_room_with_user(
user_id
)

# Always tell the user about their own devices. We check as the user
# ID is almost certainly already included (unless they're not in any
# rooms) and taking a copy of the set is relatively expensive.
if user_id not in users_who_share_room:
users_who_share_room = set(users_who_share_room)
users_who_share_room.add(user_id)
users_that_have_changed = set()

tracked_users = users_who_share_room
joined_rooms = sync_result_builder.joined_room_ids

# Step 1a, check for changes in devices of users we share a room with
users_that_have_changed = await self.store.get_users_whose_devices_changed(
since_token.device_list_key, tracked_users
# Step 1a, check for changes in devices of users we share a room
# with
#
# We do this in two different ways depending on what we have cached.
# If we already have a list of all the user that have changed since
# the last sync then it's likely more efficient to compare the rooms
# they're in with the rooms the syncing user is in.
#
# If we don't have that info cached then we get all the users that
# share a room with our user and check if those users have changed.
changed_users = self.store.get_cached_device_list_changes(
since_token.device_list_key
)
if changed_users is not None:
result = await self.store.get_rooms_for_users_with_stream_ordering(
reivilibre marked this conversation as resolved.
Show resolved Hide resolved
changed_users
)

for changed_user_id, entries in result.items():
# Check if the changed user shares any rooms with the user,
# or if the changed user is the syncing user (as we always
# want to include device list updates of their own devices).
if user_id == changed_user_id or any(
e.room_id in joined_rooms for e in entries
):
users_that_have_changed.add(changed_user_id)
else:
users_who_share_room = (
await self.store.get_users_who_share_room_with_user(user_id)
)

# Always tell the user about their own devices. We check as the user
# ID is almost certainly already included (unless they're not in any
# rooms) and taking a copy of the set is relatively expensive.
if user_id not in users_who_share_room:
users_who_share_room = set(users_who_share_room)
users_who_share_room.add(user_id)

tracked_users = users_who_share_room
users_that_have_changed = (
await self.store.get_users_whose_devices_changed(
since_token.device_list_key, tracked_users
)
)

# Step 1b, check for newly joined rooms
for room_id in newly_joined_rooms:
Expand All @@ -1329,7 +1360,14 @@ async def _generate_sync_entry_for_device_list(
newly_left_users.update(left_users)

# Remove any users that we still share a room with.
newly_left_users -= users_who_share_room
left_users_rooms = (
await self.store.get_rooms_for_users_with_stream_ordering(
newly_left_users
)
)
for user_id, entries in left_users_rooms.items():
if any(e.room_id in joined_rooms for e in entries):
newly_left_users.discard(user_id)

return DeviceLists(changed=users_that_have_changed, left=newly_left_users)
else:
Expand Down
10 changes: 10 additions & 0 deletions synapse/storage/databases/main/devices.py
Original file line number Diff line number Diff line change
Expand Up @@ -670,6 +670,16 @@ async def get_cached_devices_for_user(self, user_id: str) -> Dict[str, JsonDict]
device["device_id"]: db_to_json(device["content"]) for device in devices
}

def get_cached_device_list_changes(
self,
from_key: int,
) -> Optional[Set[str]]:
"""Get set of users whose devices have changed since `from_key`, or None
if that information is not in our cache.
"""

return self._device_list_stream_cache.get_all_entities_changed(from_key)

async def get_users_whose_devices_changed(
self, from_key: int, user_ids: Iterable[str]
) -> Set[str]:
Expand Down
62 changes: 62 additions & 0 deletions synapse/storage/databases/main/roommember.py
Original file line number Diff line number Diff line change
Expand Up @@ -504,6 +504,68 @@ def _get_rooms_for_user_with_stream_ordering_txn(
for room_id, instance, stream_id in txn
)

@cachedList(
cached_method_name="get_rooms_for_user_with_stream_ordering",
list_name="user_ids",
)
async def get_rooms_for_users_with_stream_ordering(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

may want a little docstring here, mostly for the return type, which is not very obvious.

It's roughly user → (room, event_pos) to my reading. What does the event_pos mean here?

… seems to be the position of the event that gives the user their membership in the room?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have no need to invalidate this if a user leaves or so on?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, missed that this didn't have a docstring.

I think you're missing that this is using a @cachedList, which means that its simply a batch version of get_rooms_for_user_with_stream_ordering and reuses the same cache. Hence why it includes the stream ordering and the like. Also means that invalidation is already taken care of.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oooh, I see, very cute. That makes sense!

self, user_ids: Collection[str]
) -> Dict[str, FrozenSet[GetRoomsForUserWithStreamOrdering]]:
"""A batched version of `get_rooms_for_user_with_stream_ordering`.

Returns:
Map from user_id to set of rooms that is currently in.
"""
return await self.db_pool.runInteraction(
"get_rooms_for_users_with_stream_ordering",
self._get_rooms_for_users_with_stream_ordering_txn,
user_ids,
)

def _get_rooms_for_users_with_stream_ordering_txn(
self, txn, user_ids: Collection[str]
) -> Dict[str, FrozenSet[GetRoomsForUserWithStreamOrdering]]:

clause, args = make_in_list_sql_clause(
self.database_engine,
"c.state_key",
user_ids,
)

if self._current_state_events_membership_up_to_date:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we stuck with this thing until the rest of time? I'm guessing it's some kind of optimisation we came up with that got populated in the background?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At some point I think we can change it from a background update to happen synchronously? Or require that people upgrade from vX -> vY and then vY -> vZ? I think we've done something along these lines before?

sql = f"""
SELECT c.state_key, room_id, e.instance_name, e.stream_ordering
FROM current_state_events AS c
INNER JOIN events AS e USING (room_id, event_id)
WHERE
c.type = 'm.room.member'
AND c.membership = ?
AND {clause}
"""
else:
sql = """
SELECT c.state_key, room_id, e.instance_name, e.stream_ordering
FROM current_state_events AS c
INNER JOIN room_memberships AS m USING (room_id, event_id)
INNER JOIN events AS e USING (room_id, event_id)
WHERE
c.type = 'm.room.member'
AND m.membership = ?
AND {clause}
"""

txn.execute(sql, [Membership.JOIN] + args)

result = {user_id: set() for user_id in user_ids}
for user_id, room_id, instance, stream_id in txn:
result[user_id].add(
GetRoomsForUserWithStreamOrdering(
room_id, PersistedEventPosition(instance, stream_id)
)
)

return {user_id: frozenset(v) for user_id, v in result.items()}

async def get_users_server_still_shares_room_with(
self, user_ids: Collection[str]
) -> Set[str]:
Expand Down