Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sliding sync: various fixes to background update #17636

Merged
merged 7 commits into from
Sep 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/17636.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Pre-populate room data used in experimental [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575) Sliding Sync `/sync` endpoint for quick filtering/sorting.
2 changes: 1 addition & 1 deletion synapse/storage/databases/main/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -1861,7 +1861,7 @@ def _update_current_state_txn(
VALUES (
?, ?, ?, ?, ?,
(SELECT stream_ordering FROM events WHERE event_id = ?),
(SELECT instance_name FROM events WHERE event_id = ?)
(SELECT COALESCE(instance_name, 'master') FROM events WHERE event_id = ?)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. I assume this became obvious after running the background update on matrix.org since this is a NOT NULL column 😵

{("," + ", ".join("?" for _ in sliding_sync_snapshot_values)) if sliding_sync_snapshot_values else ""}
)
ON CONFLICT (room_id, user_id)
Expand Down
46 changes: 35 additions & 11 deletions synapse/storage/databases/main/events_bg_updates.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
SlidingSyncMembershipSnapshotSharedInsertValues,
SlidingSyncStateInsertValues,
)
from synapse.storage.databases.main.events_worker import DatabaseCorruptionError
from synapse.storage.databases.main.state_deltas import StateDeltasStore
from synapse.storage.databases.main.stream import StreamWorkerStore
from synapse.storage.types import Cursor
Expand Down Expand Up @@ -1857,6 +1858,7 @@ async def _sliding_sync_membership_snapshots_bg_update(
initial_phase = True

last_room_id = progress.get("last_room_id", "")
last_user_id = progress.get("last_user_id", "")
last_event_stream_ordering = progress["last_event_stream_ordering"]

def _find_memberships_to_update_txn(
Expand Down Expand Up @@ -1887,11 +1889,11 @@ def _find_memberships_to_update_txn(
FROM local_current_membership AS c
INNER JOIN events AS e USING (event_id)
LEFT JOIN rooms AS r ON (c.room_id = r.room_id)
WHERE c.room_id > ?
ORDER BY c.room_id ASC
WHERE (c.room_id, c.user_id) > (?, ?)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm guessing this means the background update needed to be restarted to ensure we didn't drop anything along the way before?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or actually, I'm guessing the background update was just stuck on one room because it had more members than the batch size. So it doesn't need to be restarted.

ORDER BY c.room_id ASC, c.user_id ASC
LIMIT ?
""",
(last_room_id, batch_size),
(last_room_id, last_user_id, batch_size),
)
elif last_event_stream_ordering is not None:
# It's important to sort by `event_stream_ordering` *ascending* (oldest to
Expand Down Expand Up @@ -1993,13 +1995,16 @@ def _find_previous_membership_txn(
WHERE
room_id = ?
AND m.user_id = ?
AND (m.membership = ? OR m.membership = ?)
Copy link
Contributor

@MadLittleMods MadLittleMods Aug 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What was the root cause to add this caveat to only check for invite/knock?

Perhaps a double-leave event somewhere although Synapse doesn't seem to allow that

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, this was a ban + leave I think

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ahh so ban and unban

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Documented in #17654

AND e.event_id != ?
ORDER BY e.topological_ordering DESC
LIMIT 1
""",
(
room_id,
user_id,
Membership.INVITE,
Membership.KNOCK,
event_id,
),
)
Expand Down Expand Up @@ -2081,9 +2086,17 @@ def _find_previous_membership_txn(
# have `current_state_events` and we should have some current state
# for each room
if current_state_ids_map:
fetched_events = await self.get_events(
current_state_ids_map.values()
)
try:
fetched_events = await self.get_events(
current_state_ids_map.values()
)
except DatabaseCorruptionError as e:
logger.warning(
"Failed to fetch state for room '%s' due to corrupted events. Ignoring. Error: %s",
room_id,
e,
)
continue

current_state_map: StateMap[EventBase] = {
state_key: fetched_events[event_id]
Expand Down Expand Up @@ -2124,7 +2137,7 @@ def _find_previous_membership_txn(
False
)
elif membership in (Membership.INVITE, Membership.KNOCK) or (
membership == Membership.LEAVE and is_outlier
membership in (Membership.LEAVE, Membership.BAN) and is_outlier
):
invite_or_knock_event_id = membership_event_id
invite_or_knock_membership = membership
Expand All @@ -2135,7 +2148,7 @@ def _find_previous_membership_txn(
# us a consistent view of the room state regardless of your
# membership (i.e. the room shouldn't disappear if your using the
# `is_encrypted` filter and you leave).
if membership == Membership.LEAVE and is_outlier:
if membership in (Membership.LEAVE, Membership.BAN) and is_outlier:
invite_or_knock_event_id, invite_or_knock_membership = (
await self.db_pool.runInteraction(
"sliding_sync_membership_snapshots_bg_update._find_previous_membership",
Expand Down Expand Up @@ -2182,7 +2195,15 @@ def _find_previous_membership_txn(
await_full_state=False,
)

fetched_events = await self.get_events(state_ids_map.values())
try:
fetched_events = await self.get_events(state_ids_map.values())
except DatabaseCorruptionError as e:
logger.warning(
"Failed to fetch state for room '%s' due to corrupted events. Ignoring. Error: %s",
room_id,
e,
)
continue

state_map: StateMap[EventBase] = {
state_key: fetched_events[event_id]
Expand Down Expand Up @@ -2296,7 +2317,7 @@ def _fill_table_txn(txn: LoggingTransaction) -> None:
(
room_id,
_room_id_from_rooms_table,
_user_id,
user_id,
_sender,
_membership_event_id,
_membership,
Expand All @@ -2308,8 +2329,11 @@ def _fill_table_txn(txn: LoggingTransaction) -> None:
progress = {
"initial_phase": initial_phase,
"last_room_id": room_id,
"last_event_stream_ordering": membership_event_stream_ordering,
"last_user_id": user_id,
"last_event_stream_ordering": last_event_stream_ordering,
}
if not initial_phase:
progress["last_event_stream_ordering"] = membership_event_stream_ordering

await self.db_pool.updates._background_update_progress(
_BackgroundUpdates.SLIDING_SYNC_MEMBERSHIP_SNAPSHOTS_BG_UPDATE,
Expand Down
26 changes: 22 additions & 4 deletions synapse/storage/databases/main/events_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,26 @@
logger = logging.getLogger(__name__)


class DatabaseCorruptionError(RuntimeError):
"""We found an event in the DB that has a persisted event ID that doesn't
match its computed event ID."""

def __init__(
self, room_id: str, persisted_event_id: str, computed_event_id: str
) -> None:
self.room_id = room_id
self.persisted_event_id = persisted_event_id
self.computed_event_id = computed_event_id

message = (
f"Database corruption: Event {persisted_event_id} in room {room_id} "
f"from the database appears to have been modified (calculated "
f"event id {computed_event_id})"
)

super().__init__(message)


# These values are used in the `enqueue_event` and `_fetch_loop` methods to
# control how we batch/bulk fetch events from the database.
# The values are plucked out of thing air to make initial sync run faster
Expand Down Expand Up @@ -1364,10 +1384,8 @@ async def _fetch_event_ids_and_get_outstanding_redactions(
if original_ev.event_id != event_id:
# it's difficult to see what to do here. Pretty much all bets are off
# if Synapse cannot rely on the consistency of its database.
raise RuntimeError(
f"Database corruption: Event {event_id} in room {d['room_id']} "
f"from the database appears to have been modified (calculated "
f"event id {original_ev.event_id})"
raise DatabaseCorruptionError(
d["room_id"], event_id, original_ev.event_id
)

event_map[event_id] = original_ev
Expand Down
Loading