Skip to content

Commit

Permalink
Sliding sync: various fixups to the background update (#17652)
Browse files Browse the repository at this point in the history
  • Loading branch information
erikjohnston committed Sep 11, 2024
1 parent 5562a89 commit b732d13
Show file tree
Hide file tree
Showing 4 changed files with 186 additions and 180 deletions.
1 change: 1 addition & 0 deletions changelog.d/17652.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Pre-populate room data used in experimental [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575) Sliding Sync `/sync` endpoint for quick filtering/sorting.
45 changes: 41 additions & 4 deletions synapse/storage/databases/main/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -1980,7 +1980,12 @@ def _get_sliding_sync_insert_values_from_state_map(
if state_key == (EventTypes.Create, ""):
room_type = event.content.get(EventContentFields.ROOM_TYPE)
# Scrutinize JSON values
if room_type is None or isinstance(room_type, str):
if room_type is None or (
isinstance(room_type, str)
# We ignore values with null bytes as Postgres doesn't allow them in
# text columns.
and "\0" not in room_type
):
sliding_sync_insert_map["room_type"] = room_type
elif state_key == (EventTypes.RoomEncryption, ""):
encryption_algorithm = event.content.get(
Expand All @@ -1990,15 +1995,26 @@ def _get_sliding_sync_insert_values_from_state_map(
sliding_sync_insert_map["is_encrypted"] = is_encrypted
elif state_key == (EventTypes.Name, ""):
room_name = event.content.get(EventContentFields.ROOM_NAME)
# Scrutinize JSON values
if room_name is None or isinstance(room_name, str):
# Scrutinize JSON values. We ignore values with nulls as
# postgres doesn't allow null bytes in text columns.
if room_name is None or (
isinstance(room_name, str)
# We ignore values with null bytes as Postgres doesn't allow them in
# text columns.
and "\0" not in room_name
):
sliding_sync_insert_map["room_name"] = room_name
elif state_key == (EventTypes.Tombstone, ""):
successor_room_id = event.content.get(
EventContentFields.TOMBSTONE_SUCCESSOR_ROOM
)
# Scrutinize JSON values
if successor_room_id is None or isinstance(successor_room_id, str):
if successor_room_id is None or (
isinstance(successor_room_id, str)
# We ignore values with null bytes as Postgres doesn't allow them in
# text columns.
and "\0" not in successor_room_id
):
sliding_sync_insert_map["tombstone_successor_room_id"] = (
successor_room_id
)
Expand Down Expand Up @@ -2081,6 +2097,21 @@ def _get_sliding_sync_insert_values_from_stripped_state(
else None
)

# Check for null bytes in the room name and type. We have to
# ignore values with null bytes as Postgres doesn't allow them
# in text columns.
if (
sliding_sync_insert_map["room_name"] is not None
and "\0" in sliding_sync_insert_map["room_name"]
):
sliding_sync_insert_map.pop("room_name")

if (
sliding_sync_insert_map["room_type"] is not None
and "\0" in sliding_sync_insert_map["room_type"]
):
sliding_sync_insert_map.pop("room_type")

# Find the tombstone_successor_room_id
# Note: This isn't one of the stripped state events according to the spec
# but seems like there is no reason not to support this kind of thing.
Expand All @@ -2095,6 +2126,12 @@ def _get_sliding_sync_insert_values_from_stripped_state(
else None
)

if (
sliding_sync_insert_map["tombstone_successor_room_id"] is not None
and "\0" in sliding_sync_insert_map["tombstone_successor_room_id"]
):
sliding_sync_insert_map.pop("tombstone_successor_room_id")

else:
# No stripped state provided
sliding_sync_insert_map["has_known_state"] = False
Expand Down
190 changes: 144 additions & 46 deletions synapse/storage/databases/main/events_bg_updates.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
)
from synapse.storage.databases.main.state_deltas import StateDeltasStore
from synapse.storage.databases.main.stream import StreamWorkerStore
from synapse.storage.engines import PostgresEngine
from synapse.storage.types import Cursor
from synapse.types import JsonDict, RoomStreamToken, StateMap, StrCollection
from synapse.types.handlers import SLIDING_SYNC_DEFAULT_BUMP_EVENT_TYPES
Expand Down Expand Up @@ -1877,9 +1878,29 @@ async def _sliding_sync_membership_snapshots_bg_update(
def _find_memberships_to_update_txn(
txn: LoggingTransaction,
) -> List[
Tuple[str, Optional[str], str, str, str, str, int, Optional[str], bool]
Tuple[
str,
Optional[str],
Optional[str],
str,
str,
str,
str,
int,
Optional[str],
bool,
]
]:
# Fetch the set of event IDs that we want to update
#
# We skip over rows which we've already handled, i.e. have a
# matching row in `sliding_sync_membership_snapshots` with the same
# room, user and event ID.
#
# We also ignore rooms that the user has left themselves (i.e. not
# kicked). This is to avoid having to port lots of old rooms that we
# will never send down sliding sync (as we exclude such rooms from
# initial syncs).

if initial_phase:
# There are some old out-of-band memberships (before
Expand All @@ -1892,6 +1913,7 @@ def _find_memberships_to_update_txn(
SELECT
c.room_id,
r.room_id,
r.room_version,
c.user_id,
e.sender,
c.event_id,
Expand All @@ -1900,9 +1922,11 @@ def _find_memberships_to_update_txn(
e.instance_name,
e.outlier
FROM local_current_membership AS c
LEFT JOIN sliding_sync_membership_snapshots AS m USING (room_id, user_id)
INNER JOIN events AS e USING (event_id)
LEFT JOIN rooms AS r ON (c.room_id = r.room_id)
WHERE (c.room_id, c.user_id) > (?, ?)
AND (m.user_id IS NULL OR c.event_id != m.membership_event_id)
ORDER BY c.room_id ASC, c.user_id ASC
LIMIT ?
""",
Expand All @@ -1922,7 +1946,8 @@ def _find_memberships_to_update_txn(
"""
SELECT
c.room_id,
c.room_id,
r.room_id,
r.room_version,
c.user_id,
e.sender,
c.event_id,
Expand All @@ -1931,9 +1956,12 @@ def _find_memberships_to_update_txn(
e.instance_name,
e.outlier
FROM local_current_membership AS c
LEFT JOIN sliding_sync_membership_snapshots AS m USING (room_id, user_id)
INNER JOIN events AS e USING (event_id)
WHERE event_stream_ordering > ?
ORDER BY event_stream_ordering ASC
LEFT JOIN rooms AS r ON (c.room_id = r.room_id)
WHERE c.event_stream_ordering > ?
AND (m.user_id IS NULL OR c.event_id != m.membership_event_id)
ORDER BY c.event_stream_ordering ASC
LIMIT ?
""",
(last_event_stream_ordering, batch_size),
Expand All @@ -1944,7 +1972,16 @@ def _find_memberships_to_update_txn(
memberships_to_update_rows = cast(
List[
Tuple[
str, Optional[str], str, str, str, str, int, Optional[str], bool
str,
Optional[str],
Optional[str],
str,
str,
str,
str,
int,
Optional[str],
bool,
]
],
txn.fetchall(),
Expand Down Expand Up @@ -1977,7 +2014,7 @@ def _find_memberships_to_update_txn(

def _find_previous_invite_or_knock_membership_txn(
txn: LoggingTransaction, room_id: str, user_id: str, event_id: str
) -> Tuple[str, str]:
) -> Optional[Tuple[str, str]]:
# Find the previous invite/knock event before the leave event
#
# Here are some notes on how we landed on this query:
Expand Down Expand Up @@ -2027,8 +2064,13 @@ def _find_previous_invite_or_knock_membership_txn(
)
row = txn.fetchone()

# We should see a corresponding previous invite/knock event
assert row is not None
if row is None:
# Generally we should have an invite or knock event for leaves
# that are outliers, however this may not always be the case
# (e.g. a local user got kicked but the kick event got pulled in
# as an outlier).
return None

event_id, membership = row

return event_id, membership
Expand All @@ -2043,6 +2085,7 @@ def _find_previous_invite_or_knock_membership_txn(
for (
room_id,
room_id_from_rooms_table,
room_version_id,
user_id,
sender,
membership_event_id,
Expand All @@ -2061,6 +2104,14 @@ def _find_previous_invite_or_knock_membership_txn(
Membership.BAN,
)

if (
room_version_id is not None
and room_version_id not in KNOWN_ROOM_VERSIONS
):
# Ignore rooms with unknown room versions (these were
# experimental rooms, that we no longer support).
continue

# There are some old out-of-band memberships (before
# https://github.com/matrix-org/synapse/issues/6983) where we don't have the
# corresponding room stored in the `rooms` table`. We have a `FOREIGN KEY`
Expand Down Expand Up @@ -2148,14 +2199,17 @@ def _find_previous_invite_or_knock_membership_txn(
# in the events table though. We'll just say that we don't
# know the state for these rooms and continue on with our
# day.
sliding_sync_membership_snapshots_insert_map["has_known_state"] = (
False
)
sliding_sync_membership_snapshots_insert_map = {
"has_known_state": False,
"room_type": None,
"room_name": None,
"is_encrypted": False,
}
elif membership in (Membership.INVITE, Membership.KNOCK) or (
membership in (Membership.LEAVE, Membership.BAN) and is_outlier
):
invite_or_knock_event_id = membership_event_id
invite_or_knock_membership = membership
invite_or_knock_event_id = None
invite_or_knock_membership = None

# If the event is an `out_of_band_membership` (special case of
# `outlier`), we never had historical state so we have to pull from
Expand All @@ -2164,35 +2218,55 @@ def _find_previous_invite_or_knock_membership_txn(
# membership (i.e. the room shouldn't disappear if your using the
# `is_encrypted` filter and you leave).
if membership in (Membership.LEAVE, Membership.BAN) and is_outlier:
(
invite_or_knock_event_id,
invite_or_knock_membership,
) = await self.db_pool.runInteraction(
previous_membership = await self.db_pool.runInteraction(
"sliding_sync_membership_snapshots_bg_update._find_previous_invite_or_knock_membership_txn",
_find_previous_invite_or_knock_membership_txn,
room_id,
user_id,
membership_event_id,
)
if previous_membership is not None:
(
invite_or_knock_event_id,
invite_or_knock_membership,
) = previous_membership
else:
invite_or_knock_event_id = membership_event_id
invite_or_knock_membership = membership

# Pull from the stripped state on the invite/knock event
invite_or_knock_event = await self.get_event(invite_or_knock_event_id)

raw_stripped_state_events = None
if invite_or_knock_membership == Membership.INVITE:
invite_room_state = invite_or_knock_event.unsigned.get(
"invite_room_state"
)
raw_stripped_state_events = invite_room_state
elif invite_or_knock_membership == Membership.KNOCK:
knock_room_state = invite_or_knock_event.unsigned.get(
"knock_room_state"
if (
invite_or_knock_event_id is not None
and invite_or_knock_membership is not None
):
# Pull from the stripped state on the invite/knock event
invite_or_knock_event = await self.get_event(
invite_or_knock_event_id
)
raw_stripped_state_events = knock_room_state

sliding_sync_membership_snapshots_insert_map = PersistEventsStore._get_sliding_sync_insert_values_from_stripped_state(
raw_stripped_state_events
)
raw_stripped_state_events = None
if invite_or_knock_membership == Membership.INVITE:
invite_room_state = invite_or_knock_event.unsigned.get(
"invite_room_state"
)
raw_stripped_state_events = invite_room_state
elif invite_or_knock_membership == Membership.KNOCK:
knock_room_state = invite_or_knock_event.unsigned.get(
"knock_room_state"
)
raw_stripped_state_events = knock_room_state

sliding_sync_membership_snapshots_insert_map = PersistEventsStore._get_sliding_sync_insert_values_from_stripped_state(
raw_stripped_state_events
)
else:
# We couldn't find any state for the membership, so we just have to
# leave it as empty.
sliding_sync_membership_snapshots_insert_map = {
"has_known_state": False,
"room_type": None,
"room_name": None,
"is_encrypted": False,
}

# We should have some insert values for each room, even if no
# stripped state is on the event because we still want to record
Expand Down Expand Up @@ -2311,19 +2385,42 @@ def _fill_table_txn(txn: LoggingTransaction) -> None:
)
# We need to find the `forgotten` value during the transaction because
# we can't risk inserting stale data.
txn.execute(
"""
UPDATE sliding_sync_membership_snapshots
SET
forgotten = (SELECT forgotten FROM room_memberships WHERE event_id = ?)
WHERE room_id = ? and user_id = ?
""",
(
membership_event_id,
room_id,
user_id,
),
)
if isinstance(txn.database_engine, PostgresEngine):
txn.execute(
"""
UPDATE sliding_sync_membership_snapshots
SET
forgotten = m.forgotten
FROM room_memberships AS m
WHERE sliding_sync_membership_snapshots.room_id = ?
AND sliding_sync_membership_snapshots.user_id = ?
AND membership_event_id = ?
AND membership_event_id = m.event_id
AND m.event_id IS NOT NULL
""",
(
room_id,
user_id,
membership_event_id,
),
)
else:
# SQLite doesn't support UPDATE FROM before 3.33.0, so we do
# this via sub-selects.
txn.execute(
"""
UPDATE sliding_sync_membership_snapshots
SET
forgotten = (SELECT forgotten FROM room_memberships WHERE event_id = ?)
WHERE room_id = ? and user_id = ? AND membership_event_id = ?
""",
(
membership_event_id,
room_id,
user_id,
membership_event_id,
),
)

await self.db_pool.runInteraction(
"sliding_sync_membership_snapshots_bg_update", _fill_table_txn
Expand All @@ -2333,6 +2430,7 @@ def _fill_table_txn(txn: LoggingTransaction) -> None:
(
room_id,
_room_id_from_rooms_table,
_room_version_id,
user_id,
_sender,
_membership_event_id,
Expand Down
Loading

0 comments on commit b732d13

Please sign in to comment.