Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Merge pull request #5752 from matrix-org/erikj/forgotten_user
Browse files Browse the repository at this point in the history
Remove some more joins on room_memberships
  • Loading branch information
erikjohnston authored Jul 29, 2019
2 parents 2a12d76 + df3a5db commit 3e013b7
Show file tree
Hide file tree
Showing 3 changed files with 124 additions and 35 deletions.
1 change: 1 addition & 0 deletions changelog.d/5752.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Reduce database IO usage by optimising queries for current membership.
140 changes: 105 additions & 35 deletions synapse/storage/roommember.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,19 +179,27 @@ def _get_room_summary_txn(txn):

# we order by membership and then fairly arbitrarily by event_id so
# heroes are consistent
sql = """
SELECT m.user_id, m.membership, m.event_id
FROM room_memberships as m
INNER JOIN current_state_events as c
ON m.event_id = c.event_id
AND m.room_id = c.room_id
AND m.user_id = c.state_key
WHERE c.type = 'm.room.member' AND c.room_id = ?
ORDER BY
CASE m.membership WHEN ? THEN 1 WHEN ? THEN 2 ELSE 3 END ASC,
m.event_id ASC
LIMIT ?
"""
if self._current_state_events_membership_up_to_date:
sql = """
SELECT state_key, membership, event_id
FROM current_state_events
WHERE type = 'm.room.member' AND room_id = ?
ORDER BY
CASE membership WHEN ? THEN 1 WHEN ? THEN 2 ELSE 3 END ASC,
event_id ASC
LIMIT ?
"""
else:
sql = """
SELECT c.state_key, m.membership, c.event_id
FROM room_memberships as m
INNER JOIN current_state_events as c USING (room_id, event_id)
WHERE c.type = 'm.room.member' AND c.room_id = ?
ORDER BY
CASE m.membership WHEN ? THEN 1 WHEN ? THEN 2 ELSE 3 END ASC,
c.event_id ASC
LIMIT ?
"""

# 6 is 5 (number of heroes) plus 1, in case one of them is the calling user.
txn.execute(sql, (room_id, Membership.JOIN, Membership.INVITE, 6))
Expand Down Expand Up @@ -256,28 +264,35 @@ def get_invite_for_user_in_room(self, user_id, room_id):
return invite
return None

@defer.inlineCallbacks
def get_rooms_for_user_where_membership_is(self, user_id, membership_list):
""" Get all the rooms for this user where the membership for this user
matches one in the membership list.
Filters out forgotten rooms.
Args:
user_id (str): The user ID.
membership_list (list): A list of synapse.api.constants.Membership
values which the user must be in.
Returns:
A list of dictionary objects, with room_id, membership and sender
defined.
Deferred[list[RoomsForUser]]
"""
if not membership_list:
return defer.succeed(None)

return self.runInteraction(
rooms = yield self.runInteraction(
"get_rooms_for_user_where_membership_is",
self._get_rooms_for_user_where_membership_is_txn,
user_id,
membership_list,
)

# Now we filter out forgotten rooms
forgotten_rooms = yield self.get_forgotten_rooms_for_user(user_id)
return [room for room in rooms if room.room_id not in forgotten_rooms]

def _get_rooms_for_user_where_membership_is_txn(
self, txn, user_id, membership_list
):
Expand All @@ -287,26 +302,33 @@ def _get_rooms_for_user_where_membership_is_txn(

results = []
if membership_list:
where_clause = "user_id = ? AND (%s) AND forgotten = 0" % (
" OR ".join(["m.membership = ?" for _ in membership_list]),
)

args = [user_id]
args.extend(membership_list)
if self._current_state_events_membership_up_to_date:
sql = """
SELECT room_id, e.sender, c.membership, event_id, e.stream_ordering
FROM current_state_events AS c
INNER JOIN events AS e USING (room_id, event_id)
WHERE
c.type = 'm.room.member'
AND state_key = ?
AND c.membership IN (%s)
""" % (
",".join("?" * len(membership_list))
)
else:
sql = """
SELECT room_id, e.sender, m.membership, event_id, e.stream_ordering
FROM current_state_events AS c
INNER JOIN room_memberships AS m USING (room_id, event_id)
INNER JOIN events AS e USING (room_id, event_id)
WHERE
c.type = 'm.room.member'
AND state_key = ?
AND m.membership IN (%s)
""" % (
",".join("?" * len(membership_list))
)

sql = (
"SELECT m.room_id, m.sender, m.membership, m.event_id, e.stream_ordering"
" FROM current_state_events as c"
" INNER JOIN room_memberships as m"
" ON m.event_id = c.event_id"
" INNER JOIN events as e"
" ON e.event_id = c.event_id"
" AND m.room_id = c.room_id"
" AND m.user_id = c.state_key"
" WHERE c.type = 'm.room.member' AND %s"
) % (where_clause,)

txn.execute(sql, args)
txn.execute(sql, (user_id, *membership_list))
results = [RoomsForUser(**r) for r in self.cursor_to_dict(txn)]

if do_invite:
Expand Down Expand Up @@ -637,6 +659,44 @@ def f(txn):
count = yield self.runInteraction("did_forget_membership", f)
return count == 0

@cached()
def get_forgotten_rooms_for_user(self, user_id):
"""Gets all rooms the user has forgotten.
Args:
user_id (str)
Returns:
Deferred[set[str]]
"""

def _get_forgotten_rooms_for_user_txn(txn):
# This is a slightly convoluted query that first looks up all rooms
# that the user has forgotten in the past, then rechecks that list
# to see if any have subsequently been updated. This is done so that
# we can use a partial index on `forgotten = 1` on the assumption
# that few users will actually forget many rooms.
#
# Note that a room is considered "forgotten" if *all* membership
# events for that user and room have the forgotten field set (as
# when a user forgets a room we update all rows for that user and
# room, not just the current one).
sql = """
SELECT room_id, (
SELECT count(*) FROM room_memberships
WHERE room_id = m.room_id AND user_id = m.user_id AND forgotten = 0
) AS count
FROM room_memberships AS m
WHERE user_id = ? AND forgotten = 1
GROUP BY room_id, user_id;
"""
txn.execute(sql, (user_id,))
return set(row[0] for row in txn if row[1] == 0)

return self.runInteraction(
"get_forgotten_rooms_for_user", _get_forgotten_rooms_for_user_txn
)

@defer.inlineCallbacks
def get_rooms_user_has_been_in(self, user_id):
"""Get all rooms that the user has ever been in.
Expand Down Expand Up @@ -668,6 +728,13 @@ def __init__(self, db_conn, hs):
_CURRENT_STATE_MEMBERSHIP_UPDATE_NAME,
self._background_current_state_membership,
)
self.register_background_index_update(
"room_membership_forgotten_idx",
index_name="room_memberships_user_room_forgotten",
table="room_memberships",
columns=["user_id", "room_id"],
where_clause="forgotten = 1",
)

def _store_room_members_txn(self, txn, events, backfilled):
"""Store a room member in the database.
Expand Down Expand Up @@ -769,6 +836,9 @@ def f(txn):
txn.execute(sql, (user_id, room_id))

self._invalidate_cache_and_stream(txn, self.did_forget, (user_id, room_id))
self._invalidate_cache_and_stream(
txn, self.get_forgotten_rooms_for_user, (user_id,)
)

return self.runInteraction("forget_membership", f)

Expand Down
18 changes: 18 additions & 0 deletions synapse/storage/schema/delta/56/room_membership_idx.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
/* Copyright 2019 The Matrix.org Foundation C.I.C.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

-- Adds an index on room_memberships for fetching all forgotten rooms for a user
INSERT INTO background_updates (update_name, progress_json) VALUES
('room_membership_forgotten_idx', '{}');

0 comments on commit 3e013b7

Please sign in to comment.