Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sliding sync: various fixups to the background update #17652

Merged
merged 23 commits into from
Sep 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
b5ad7da
Skip over existing rows
erikjohnston Sep 2, 2024
269dc55
Handle missing previous memberships
erikjohnston Sep 2, 2024
01860e1
Don't overwrite forgotten flag
erikjohnston Sep 2, 2024
f71dd25
Ignore leave events for bg updates
erikjohnston Sep 2, 2024
8140ca3
Fix tests for leaves
erikjohnston Sep 2, 2024
4f3333b
Ignore invites to rooms with unknown room version
erikjohnston Sep 2, 2024
4369e94
Ignore nulls in room names
erikjohnston Sep 2, 2024
330e614
Ignore nulls in invite state
erikjohnston Sep 2, 2024
037cb10
Handle the case where there is a missing `room_membership` row
erikjohnston Sep 3, 2024
20542f0
Newsfile
erikjohnston Sep 3, 2024
0a7f41c
Comment why we strip null bytes
erikjohnston Sep 5, 2024
47bfb1b
Only skip rows if we have the same event ID
erikjohnston Sep 5, 2024
2a48840
Update comment on room versions
erikjohnston Sep 5, 2024
5f04c2f
Don't inherit values
erikjohnston Sep 5, 2024
b778219
Add note on why we're skipping old left rooms
erikjohnston Sep 5, 2024
1679ba0
Fix old sqlite versions
erikjohnston Sep 5, 2024
1963f18
Update synapse/storage/databases/main/events.py
erikjohnston Sep 10, 2024
ec6b2d5
Update synapse/storage/databases/main/events.py
erikjohnston Sep 10, 2024
46804dd
Also ignore room types with null bytes
erikjohnston Sep 10, 2024
7e84e7c
Merge remote-tracking branch 'origin/develop' into erikj/ss_fixups
erikjohnston Sep 10, 2024
06173a3
Also ignore null bytes in tombstone room IDs
erikjohnston Sep 10, 2024
e0e7a8b
Revert "Ignore leave events for bg updates"
erikjohnston Sep 11, 2024
cf4e6cb
Revert "Fix tests for leaves"
erikjohnston Sep 11, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/17652.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Pre-populate room data used in experimental [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575) Sliding Sync `/sync` endpoint for quick filtering/sorting.
45 changes: 41 additions & 4 deletions synapse/storage/databases/main/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -1980,7 +1980,12 @@ def _get_sliding_sync_insert_values_from_state_map(
if state_key == (EventTypes.Create, ""):
room_type = event.content.get(EventContentFields.ROOM_TYPE)
# Scrutinize JSON values
if room_type is None or isinstance(room_type, str):
if room_type is None or (
isinstance(room_type, str)
# We ignore values with null bytes as Postgres doesn't allow them in
# text columns.
and "\0" not in room_type
):
sliding_sync_insert_map["room_type"] = room_type
elif state_key == (EventTypes.RoomEncryption, ""):
encryption_algorithm = event.content.get(
Expand All @@ -1990,15 +1995,26 @@ def _get_sliding_sync_insert_values_from_state_map(
sliding_sync_insert_map["is_encrypted"] = is_encrypted
elif state_key == (EventTypes.Name, ""):
room_name = event.content.get(EventContentFields.ROOM_NAME)
# Scrutinize JSON values
if room_name is None or isinstance(room_name, str):
# Scrutinize JSON values. We ignore values with nulls as
# postgres doesn't allow null bytes in text columns.
if room_name is None or (
isinstance(room_name, str)
# We ignore values with null bytes as Postgres doesn't allow them in
# text columns.
and "\0" not in room_name
):
sliding_sync_insert_map["room_name"] = room_name
elif state_key == (EventTypes.Tombstone, ""):
successor_room_id = event.content.get(
EventContentFields.TOMBSTONE_SUCCESSOR_ROOM
)
# Scrutinize JSON values
if successor_room_id is None or isinstance(successor_room_id, str):
if successor_room_id is None or (
isinstance(successor_room_id, str)
# We ignore values with null bytes as Postgres doesn't allow them in
# text columns.
and "\0" not in successor_room_id
):
sliding_sync_insert_map["tombstone_successor_room_id"] = (
successor_room_id
)
Expand Down Expand Up @@ -2081,6 +2097,21 @@ def _get_sliding_sync_insert_values_from_stripped_state(
else None
)

# Check for null bytes in the room name and type. We have to
# ignore values with null bytes as Postgres doesn't allow them
# in text columns.
if (
sliding_sync_insert_map["room_name"] is not None
and "\0" in sliding_sync_insert_map["room_name"]
):
sliding_sync_insert_map.pop("room_name")

if (
sliding_sync_insert_map["room_type"] is not None
and "\0" in sliding_sync_insert_map["room_type"]
):
sliding_sync_insert_map.pop("room_type")

# Find the tombstone_successor_room_id
# Note: This isn't one of the stripped state events according to the spec
# but seems like there is no reason not to support this kind of thing.
Expand All @@ -2095,6 +2126,12 @@ def _get_sliding_sync_insert_values_from_stripped_state(
else None
)

if (
sliding_sync_insert_map["tombstone_successor_room_id"] is not None
and "\0" in sliding_sync_insert_map["tombstone_successor_room_id"]
):
sliding_sync_insert_map.pop("tombstone_successor_room_id")

else:
# No stripped state provided
sliding_sync_insert_map["has_known_state"] = False
Expand Down
190 changes: 144 additions & 46 deletions synapse/storage/databases/main/events_bg_updates.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
)
from synapse.storage.databases.main.state_deltas import StateDeltasStore
from synapse.storage.databases.main.stream import StreamWorkerStore
from synapse.storage.engines import PostgresEngine
from synapse.storage.types import Cursor
from synapse.types import JsonDict, RoomStreamToken, StateMap, StrCollection
from synapse.types.handlers import SLIDING_SYNC_DEFAULT_BUMP_EVENT_TYPES
Expand Down Expand Up @@ -1877,9 +1878,29 @@ async def _sliding_sync_membership_snapshots_bg_update(
def _find_memberships_to_update_txn(
txn: LoggingTransaction,
) -> List[
Tuple[str, Optional[str], str, str, str, str, int, Optional[str], bool]
Tuple[
str,
Optional[str],
Optional[str],
str,
str,
str,
str,
int,
Optional[str],
bool,
]
]:
# Fetch the set of event IDs that we want to update
#
# We skip over rows which we've already handled, i.e. have a
# matching row in `sliding_sync_membership_snapshots` with the same
# room, user and event ID.
#
# We also ignore rooms that the user has left themselves (i.e. not
# kicked). This is to avoid having to port lots of old rooms that we
# will never send down sliding sync (as we exclude such rooms from
# initial syncs).

if initial_phase:
# There are some old out-of-band memberships (before
Expand All @@ -1892,6 +1913,7 @@ def _find_memberships_to_update_txn(
SELECT
c.room_id,
r.room_id,
r.room_version,
c.user_id,
e.sender,
c.event_id,
Expand All @@ -1900,9 +1922,11 @@ def _find_memberships_to_update_txn(
e.instance_name,
e.outlier
FROM local_current_membership AS c
LEFT JOIN sliding_sync_membership_snapshots AS m USING (room_id, user_id)
INNER JOIN events AS e USING (event_id)
LEFT JOIN rooms AS r ON (c.room_id = r.room_id)
WHERE (c.room_id, c.user_id) > (?, ?)
AND (m.user_id IS NULL OR c.event_id != m.membership_event_id)
ORDER BY c.room_id ASC, c.user_id ASC
LIMIT ?
""",
Expand All @@ -1922,7 +1946,8 @@ def _find_memberships_to_update_txn(
"""
SELECT
c.room_id,
c.room_id,
r.room_id,
r.room_version,
c.user_id,
e.sender,
c.event_id,
Expand All @@ -1931,9 +1956,12 @@ def _find_memberships_to_update_txn(
e.instance_name,
e.outlier
FROM local_current_membership AS c
LEFT JOIN sliding_sync_membership_snapshots AS m USING (room_id, user_id)
INNER JOIN events AS e USING (event_id)
WHERE event_stream_ordering > ?
ORDER BY event_stream_ordering ASC
LEFT JOIN rooms AS r ON (c.room_id = r.room_id)
WHERE c.event_stream_ordering > ?
AND (m.user_id IS NULL OR c.event_id != m.membership_event_id)
ORDER BY c.event_stream_ordering ASC
LIMIT ?
""",
(last_event_stream_ordering, batch_size),
Expand All @@ -1944,7 +1972,16 @@ def _find_memberships_to_update_txn(
memberships_to_update_rows = cast(
List[
Tuple[
str, Optional[str], str, str, str, str, int, Optional[str], bool
str,
Optional[str],
Optional[str],
str,
str,
str,
str,
int,
Optional[str],
bool,
]
],
txn.fetchall(),
Expand Down Expand Up @@ -1977,7 +2014,7 @@ def _find_memberships_to_update_txn(

def _find_previous_invite_or_knock_membership_txn(
txn: LoggingTransaction, room_id: str, user_id: str, event_id: str
) -> Tuple[str, str]:
) -> Optional[Tuple[str, str]]:
# Find the previous invite/knock event before the leave event
#
# Here are some notes on how we landed on this query:
Expand Down Expand Up @@ -2027,8 +2064,13 @@ def _find_previous_invite_or_knock_membership_txn(
)
row = txn.fetchone()

# We should see a corresponding previous invite/knock event
assert row is not None
if row is None:
# Generally we should have an invite or knock event for leaves
# that are outliers, however this may not always be the case
# (e.g. a local user got kicked but the kick event got pulled in
# as an outlier).
return None

event_id, membership = row

return event_id, membership
Expand All @@ -2043,6 +2085,7 @@ def _find_previous_invite_or_knock_membership_txn(
for (
room_id,
room_id_from_rooms_table,
room_version_id,
user_id,
sender,
membership_event_id,
Expand All @@ -2061,6 +2104,14 @@ def _find_previous_invite_or_knock_membership_txn(
Membership.BAN,
)

if (
room_version_id is not None
and room_version_id not in KNOWN_ROOM_VERSIONS
):
# Ignore rooms with unknown room versions (these were
# experimental rooms, that we no longer support).
continue

# There are some old out-of-band memberships (before
# https://github.com/matrix-org/synapse/issues/6983) where we don't have the
# corresponding room stored in the `rooms` table`. We have a `FOREIGN KEY`
Expand Down Expand Up @@ -2148,14 +2199,17 @@ def _find_previous_invite_or_knock_membership_txn(
# in the events table though. We'll just say that we don't
# know the state for these rooms and continue on with our
# day.
sliding_sync_membership_snapshots_insert_map["has_known_state"] = (
False
)
sliding_sync_membership_snapshots_insert_map = {
"has_known_state": False,
"room_type": None,
"room_name": None,
"is_encrypted": False,
}
elif membership in (Membership.INVITE, Membership.KNOCK) or (
membership in (Membership.LEAVE, Membership.BAN) and is_outlier
):
invite_or_knock_event_id = membership_event_id
invite_or_knock_membership = membership
invite_or_knock_event_id = None
invite_or_knock_membership = None

# If the event is an `out_of_band_membership` (special case of
# `outlier`), we never had historical state so we have to pull from
Expand All @@ -2164,35 +2218,55 @@ def _find_previous_invite_or_knock_membership_txn(
# membership (i.e. the room shouldn't disappear if your using the
# `is_encrypted` filter and you leave).
if membership in (Membership.LEAVE, Membership.BAN) and is_outlier:
(
invite_or_knock_event_id,
invite_or_knock_membership,
) = await self.db_pool.runInteraction(
previous_membership = await self.db_pool.runInteraction(
"sliding_sync_membership_snapshots_bg_update._find_previous_invite_or_knock_membership_txn",
_find_previous_invite_or_knock_membership_txn,
room_id,
user_id,
membership_event_id,
)
if previous_membership is not None:
(
invite_or_knock_event_id,
invite_or_knock_membership,
) = previous_membership
else:
invite_or_knock_event_id = membership_event_id
invite_or_knock_membership = membership

# Pull from the stripped state on the invite/knock event
invite_or_knock_event = await self.get_event(invite_or_knock_event_id)

raw_stripped_state_events = None
if invite_or_knock_membership == Membership.INVITE:
invite_room_state = invite_or_knock_event.unsigned.get(
"invite_room_state"
)
raw_stripped_state_events = invite_room_state
elif invite_or_knock_membership == Membership.KNOCK:
knock_room_state = invite_or_knock_event.unsigned.get(
"knock_room_state"
if (
invite_or_knock_event_id is not None
and invite_or_knock_membership is not None
):
# Pull from the stripped state on the invite/knock event
invite_or_knock_event = await self.get_event(
invite_or_knock_event_id
)
raw_stripped_state_events = knock_room_state

sliding_sync_membership_snapshots_insert_map = PersistEventsStore._get_sliding_sync_insert_values_from_stripped_state(
raw_stripped_state_events
)
raw_stripped_state_events = None
if invite_or_knock_membership == Membership.INVITE:
invite_room_state = invite_or_knock_event.unsigned.get(
"invite_room_state"
)
raw_stripped_state_events = invite_room_state
elif invite_or_knock_membership == Membership.KNOCK:
knock_room_state = invite_or_knock_event.unsigned.get(
"knock_room_state"
)
raw_stripped_state_events = knock_room_state

sliding_sync_membership_snapshots_insert_map = PersistEventsStore._get_sliding_sync_insert_values_from_stripped_state(
raw_stripped_state_events
)
else:
# We couldn't find any state for the membership, so we just have to
# leave it as empty.
sliding_sync_membership_snapshots_insert_map = {
"has_known_state": False,
"room_type": None,
"room_name": None,
"is_encrypted": False,
}

# We should have some insert values for each room, even if no
# stripped state is on the event because we still want to record
Expand Down Expand Up @@ -2311,19 +2385,42 @@ def _fill_table_txn(txn: LoggingTransaction) -> None:
)
# We need to find the `forgotten` value during the transaction because
# we can't risk inserting stale data.
txn.execute(
"""
UPDATE sliding_sync_membership_snapshots
SET
forgotten = (SELECT forgotten FROM room_memberships WHERE event_id = ?)
WHERE room_id = ? and user_id = ?
""",
(
membership_event_id,
room_id,
user_id,
),
)
if isinstance(txn.database_engine, PostgresEngine):
txn.execute(
"""
UPDATE sliding_sync_membership_snapshots
SET
forgotten = m.forgotten
FROM room_memberships AS m
WHERE sliding_sync_membership_snapshots.room_id = ?
AND sliding_sync_membership_snapshots.user_id = ?
AND membership_event_id = ?
AND membership_event_id = m.event_id
AND m.event_id IS NOT NULL
""",
(
room_id,
user_id,
membership_event_id,
),
)
else:
# SQLite doesn't support UPDATE FROM before 3.33.0, so we do
# this via sub-selects.
txn.execute(
"""
UPDATE sliding_sync_membership_snapshots
SET
forgotten = (SELECT forgotten FROM room_memberships WHERE event_id = ?)
WHERE room_id = ? and user_id = ? AND membership_event_id = ?
""",
(
membership_event_id,
room_id,
user_id,
membership_event_id,
),
)

await self.db_pool.runInteraction(
"sliding_sync_membership_snapshots_bg_update", _fill_table_txn
Expand All @@ -2333,6 +2430,7 @@ def _fill_table_txn(txn: LoggingTransaction) -> None:
(
room_id,
_room_id_from_rooms_table,
_room_version_id,
user_id,
_sender,
_membership_event_id,
Expand Down
Loading
Loading