From ec174d047005e4ac976311f4d3730452b2c5710f Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 2 Apr 2024 15:33:56 +0100 Subject: [PATCH 01/13] Refactor chain fetching (#17044) Since these queries are duplicated in two places. --- changelog.d/17044.misc | 1 + .../databases/main/event_federation.py | 162 +++++++----------- 2 files changed, 67 insertions(+), 96 deletions(-) create mode 100644 changelog.d/17044.misc diff --git a/changelog.d/17044.misc b/changelog.d/17044.misc new file mode 100644 index 0000000000..a1439752d3 --- /dev/null +++ b/changelog.d/17044.misc @@ -0,0 +1 @@ +Refactor auth chain fetching to reduce duplication. diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py index 846c3f363a..fb132ef090 100644 --- a/synapse/storage/databases/main/event_federation.py +++ b/synapse/storage/databases/main/event_federation.py @@ -27,6 +27,7 @@ Collection, Dict, FrozenSet, + Generator, Iterable, List, Optional, @@ -279,64 +280,16 @@ def _get_auth_chain_ids_using_cover_index_txn( # Now we look up all links for the chains we have, adding chains that # are reachable from any event. - # - # This query is structured to first get all chain IDs reachable, and - # then pull out all links from those chains. This does pull out more - # rows than is strictly necessary, however there isn't a way of - # structuring the recursive part of query to pull out the links without - # also returning large quantities of redundant data (which can make it a - # lot slower). - sql = """ - WITH RECURSIVE links(chain_id) AS ( - SELECT - DISTINCT origin_chain_id - FROM event_auth_chain_links WHERE %s - UNION - SELECT - target_chain_id - FROM event_auth_chain_links - INNER JOIN links ON (chain_id = origin_chain_id) - ) - SELECT - origin_chain_id, origin_sequence_number, - target_chain_id, target_sequence_number - FROM links - INNER JOIN event_auth_chain_links ON (chain_id = origin_chain_id) - """ # A map from chain ID to max sequence number *reachable* from any event ID. chains: Dict[int, int] = {} - - # Add all linked chains reachable from initial set of chains. - chains_to_fetch = set(event_chains.keys()) - while chains_to_fetch: - batch2 = tuple(itertools.islice(chains_to_fetch, 1000)) - chains_to_fetch.difference_update(batch2) - clause, args = make_in_list_sql_clause( - txn.database_engine, "origin_chain_id", batch2 - ) - txn.execute(sql % (clause,), args) - - links: Dict[int, List[Tuple[int, int, int]]] = {} - - for ( - origin_chain_id, - origin_sequence_number, - target_chain_id, - target_sequence_number, - ) in txn: - links.setdefault(origin_chain_id, []).append( - (origin_sequence_number, target_chain_id, target_sequence_number) - ) - + for links in self._get_chain_links(txn, set(event_chains.keys())): for chain_id in links: if chain_id not in event_chains: continue _materialize(chain_id, event_chains[chain_id], links, chains) - chains_to_fetch.difference_update(chains) - # Add the initial set of chains, excluding the sequence corresponding to # initial event. for chain_id, seq_no in event_chains.items(): @@ -380,6 +333,68 @@ def _get_auth_chain_ids_using_cover_index_txn( return results + @classmethod + def _get_chain_links( + cls, txn: LoggingTransaction, chains_to_fetch: Set[int] + ) -> Generator[Dict[int, List[Tuple[int, int, int]]], None, None]: + """Fetch all auth chain links from the given set of chains, and all + links from those chains, recursively. + + Note: This may return links that are not reachable from the given + chains. + + Returns a generator that produces dicts from origin chain ID to 3-tuple + of origin sequence number, target chain ID and target sequence number. + """ + + # This query is structured to first get all chain IDs reachable, and + # then pull out all links from those chains. This does pull out more + # rows than is strictly necessary, however there isn't a way of + # structuring the recursive part of query to pull out the links without + # also returning large quantities of redundant data (which can make it a + # lot slower). + sql = """ + WITH RECURSIVE links(chain_id) AS ( + SELECT + DISTINCT origin_chain_id + FROM event_auth_chain_links WHERE %s + UNION + SELECT + target_chain_id + FROM event_auth_chain_links + INNER JOIN links ON (chain_id = origin_chain_id) + ) + SELECT + origin_chain_id, origin_sequence_number, + target_chain_id, target_sequence_number + FROM links + INNER JOIN event_auth_chain_links ON (chain_id = origin_chain_id) + """ + + while chains_to_fetch: + batch2 = tuple(itertools.islice(chains_to_fetch, 1000)) + chains_to_fetch.difference_update(batch2) + clause, args = make_in_list_sql_clause( + txn.database_engine, "origin_chain_id", batch2 + ) + txn.execute(sql % (clause,), args) + + links: Dict[int, List[Tuple[int, int, int]]] = {} + + for ( + origin_chain_id, + origin_sequence_number, + target_chain_id, + target_sequence_number, + ) in txn: + links.setdefault(origin_chain_id, []).append( + (origin_sequence_number, target_chain_id, target_sequence_number) + ) + + chains_to_fetch.difference_update(links) + + yield links + def _get_auth_chain_ids_txn( self, txn: LoggingTransaction, event_ids: Collection[str], include_given: bool ) -> Set[str]: @@ -564,53 +579,9 @@ def fetch_chain_info(events_to_fetch: Collection[str]) -> None: # Now we look up all links for the chains we have, adding chains that # are reachable from any event. - # - # This query is structured to first get all chain IDs reachable, and - # then pull out all links from those chains. This does pull out more - # rows than is strictly necessary, however there isn't a way of - # structuring the recursive part of query to pull out the links without - # also returning large quantities of redundant data (which can make it a - # lot slower). - sql = """ - WITH RECURSIVE links(chain_id) AS ( - SELECT - DISTINCT origin_chain_id - FROM event_auth_chain_links WHERE %s - UNION - SELECT - target_chain_id - FROM event_auth_chain_links - INNER JOIN links ON (chain_id = origin_chain_id) - ) - SELECT - origin_chain_id, origin_sequence_number, - target_chain_id, target_sequence_number - FROM links - INNER JOIN event_auth_chain_links ON (chain_id = origin_chain_id) - """ - - # (We need to take a copy of `seen_chains` as we want to mutate it in - # the loop) - chains_to_fetch = set(seen_chains) - while chains_to_fetch: - batch2 = tuple(itertools.islice(chains_to_fetch, 1000)) - clause, args = make_in_list_sql_clause( - txn.database_engine, "origin_chain_id", batch2 - ) - txn.execute(sql % (clause,), args) - - links: Dict[int, List[Tuple[int, int, int]]] = {} - - for ( - origin_chain_id, - origin_sequence_number, - target_chain_id, - target_sequence_number, - ) in txn: - links.setdefault(origin_chain_id, []).append( - (origin_sequence_number, target_chain_id, target_sequence_number) - ) + # (We need to take a copy of `seen_chains` as the function mutates it) + for links in self._get_chain_links(txn, set(seen_chains)): for chains in set_to_chain: for chain_id in links: if chain_id not in chains: @@ -618,7 +589,6 @@ def fetch_chain_info(events_to_fetch: Collection[str]) -> None: _materialize(chain_id, chains[chain_id], links, chains) - chains_to_fetch.difference_update(chains) seen_chains.update(chains) # Now for each chain we figure out the maximum sequence number reachable From ca27b516656223150d218bdd838df302fedf838c Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 2 Apr 2024 17:17:02 +0100 Subject: [PATCH 02/13] 1.104.0 --- CHANGES.md | 7 +++++++ changelog.d/17031.feature | 1 - debian/changelog | 6 ++++++ pyproject.toml | 2 +- 4 files changed, 14 insertions(+), 2 deletions(-) delete mode 100644 changelog.d/17031.feature diff --git a/CHANGES.md b/CHANGES.md index fa9af218a6..168e29f1b2 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,3 +1,10 @@ +# Synapse 1.104.0 (2024-04-02) + +### Bugfixes + +- Fix regression when using OIDC provider. Introduced in v1.104.0rc1. ([\#17031](https://github.com/element-hq/synapse/issues/17031)) + + # Synapse 1.104.0rc1 (2024-03-26) ### Features diff --git a/changelog.d/17031.feature b/changelog.d/17031.feature deleted file mode 100644 index 0f28cbbcd6..0000000000 --- a/changelog.d/17031.feature +++ /dev/null @@ -1 +0,0 @@ -OIDC: try to JWT decode userinfo response if JSON parsing failed. diff --git a/debian/changelog b/debian/changelog index b915b6e2cb..28451044ab 100644 --- a/debian/changelog +++ b/debian/changelog @@ -1,3 +1,9 @@ +matrix-synapse-py3 (1.104.0) stable; urgency=medium + + * New Synapse release 1.104.0. + + -- Synapse Packaging team Tue, 02 Apr 2024 17:15:45 +0100 + matrix-synapse-py3 (1.104.0~rc1) stable; urgency=medium * New Synapse release 1.104.0rc1. diff --git a/pyproject.toml b/pyproject.toml index 8369139301..9a645079c3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -96,7 +96,7 @@ module-name = "synapse.synapse_rust" [tool.poetry] name = "matrix-synapse" -version = "1.104.0rc1" +version = "1.104.0" description = "Homeserver for the Matrix decentralised comms protocol" authors = ["Matrix.org Team and Contributors "] license = "AGPL-3.0-or-later" From 31122b71bcf29b4a034be4fc14770f4b8a45b2c5 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 4 Apr 2024 11:05:40 +0100 Subject: [PATCH 03/13] Add missing index to `access_tokens` table (#17045) This was causing sequential scans when using refresh tokens. --- changelog.d/17045.misc | 1 + synapse/storage/databases/main/registration.py | 7 +++++++ 2 files changed, 8 insertions(+) create mode 100644 changelog.d/17045.misc diff --git a/changelog.d/17045.misc b/changelog.d/17045.misc new file mode 100644 index 0000000000..0d042a43ff --- /dev/null +++ b/changelog.d/17045.misc @@ -0,0 +1 @@ +Improve database performance by adding a missing index to `access_tokens.refresh_token_id`. diff --git a/synapse/storage/databases/main/registration.py b/synapse/storage/databases/main/registration.py index d939ade427..30a3ae3055 100644 --- a/synapse/storage/databases/main/registration.py +++ b/synapse/storage/databases/main/registration.py @@ -2266,6 +2266,13 @@ def __init__( ): super().__init__(database, db_conn, hs) + self.db_pool.updates.register_background_index_update( + update_name="access_tokens_refresh_token_id_idx", + index_name="access_tokens_refresh_token_id_idx", + table="access_tokens", + columns=("refresh_token_id",), + ) + self._ignore_unknown_session_error = ( hs.config.server.request_token_inhibit_3pid_errors ) From 05957ac70f5d634eafbea61bd79a9a89196507c2 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> Date: Thu, 4 Apr 2024 12:47:59 +0100 Subject: [PATCH 04/13] Fix bug in `/sync` response for archived rooms (#16932) This PR fixes a very, very niche edge-case, but I've got some more work coming which will otherwise make the problem worse. The bug happens when the syncing user leaves a room, and has a sync filter which includes "left" rooms, but sets the timeline limit to 0. In that case, the state returned in the `state` section is calculated incorrectly. The fix is to pass a token corresponding to the point that the user leaves the room through to `compute_state_delta`. --- changelog.d/16932.bugfix | 1 + synapse/handlers/sync.py | 121 ++++++++++++++++++--- tests/handlers/test_sync.py | 208 +++++++++++++++++++++++++++++++++--- tests/rest/client/utils.py | 18 ++-- 4 files changed, 314 insertions(+), 34 deletions(-) create mode 100644 changelog.d/16932.bugfix diff --git a/changelog.d/16932.bugfix b/changelog.d/16932.bugfix new file mode 100644 index 0000000000..624388ea8e --- /dev/null +++ b/changelog.d/16932.bugfix @@ -0,0 +1 @@ +Fix a long-standing bug which could cause incorrect state to be returned from `/sync` for rooms where the user has left. \ No newline at end of file diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 3aa2e2b7ba..7fcd54ac55 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -953,7 +953,7 @@ async def compute_state_delta( batch: TimelineBatch, sync_config: SyncConfig, since_token: Optional[StreamToken], - now_token: StreamToken, + end_token: StreamToken, full_state: bool, ) -> MutableStateMap[EventBase]: """Works out the difference in state between the end of the previous sync and @@ -964,7 +964,9 @@ async def compute_state_delta( batch: The timeline batch for the room that will be sent to the user. sync_config: since_token: Token of the end of the previous batch. May be `None`. - now_token: Token of the end of the current batch. + end_token: Token of the end of the current batch. Normally this will be + the same as the global "now_token", but if the user has left the room, + the point just after their leave event. full_state: Whether to force returning the full state. `lazy_load_members` still applies when `full_state` is `True`. @@ -1044,7 +1046,7 @@ async def compute_state_delta( room_id, sync_config.user, batch, - now_token, + end_token, members_to_fetch, timeline_state, ) @@ -1058,7 +1060,7 @@ async def compute_state_delta( room_id, batch, since_token, - now_token, + end_token, members_to_fetch, timeline_state, ) @@ -1130,7 +1132,7 @@ async def _compute_state_delta_for_full_sync( room_id: str, syncing_user: UserID, batch: TimelineBatch, - now_token: StreamToken, + end_token: StreamToken, members_to_fetch: Optional[Set[str]], timeline_state: StateMap[str], ) -> StateMap[str]: @@ -1143,7 +1145,9 @@ async def _compute_state_delta_for_full_sync( room_id: The room we are calculating for. syncing_user: The user that is calling `/sync`. batch: The timeline batch for the room that will be sent to the user. - now_token: Token of the end of the current batch. + end_token: Token of the end of the current batch. Normally this will be + the same as the global "now_token", but if the user has left the room, + the point just after their leave event. members_to_fetch: If lazy-loading is enabled, the memberships needed for events in the timeline. timeline_state: The contribution to the room state from state events in @@ -1202,7 +1206,7 @@ async def _compute_state_delta_for_full_sync( else: state_at_timeline_end = await self.get_state_at( room_id, - stream_position=now_token, + stream_position=end_token, state_filter=state_filter, await_full_state=await_full_state, ) @@ -1223,7 +1227,7 @@ async def _compute_state_delta_for_incremental_sync( room_id: str, batch: TimelineBatch, since_token: StreamToken, - now_token: StreamToken, + end_token: StreamToken, members_to_fetch: Optional[Set[str]], timeline_state: StateMap[str], ) -> StateMap[str]: @@ -1239,7 +1243,9 @@ async def _compute_state_delta_for_incremental_sync( room_id: The room we are calculating for. batch: The timeline batch for the room that will be sent to the user. since_token: Token of the end of the previous batch. - now_token: Token of the end of the current batch. + end_token: Token of the end of the current batch. Normally this will be + the same as the global "now_token", but if the user has left the room, + the point just after their leave event. members_to_fetch: If lazy-loading is enabled, the memberships needed for events in the timeline. Otherwise, `None`. timeline_state: The contribution to the room state from state events in @@ -1273,7 +1279,7 @@ async def _compute_state_delta_for_incremental_sync( # the recent events. state_at_timeline_start = await self.get_state_at( room_id, - stream_position=now_token, + stream_position=end_token, state_filter=state_filter, await_full_state=await_full_state, ) @@ -1312,7 +1318,7 @@ async def _compute_state_delta_for_incremental_sync( # the recent events. state_at_timeline_end = await self.get_state_at( room_id, - stream_position=now_token, + stream_position=end_token, state_filter=state_filter, await_full_state=await_full_state, ) @@ -2344,6 +2350,7 @@ async def _get_room_changes_for_incremental_sync( full_state=False, since_token=since_token, upto_token=leave_token, + end_token=leave_token, out_of_band=leave_event.internal_metadata.is_out_of_band_membership(), ) ) @@ -2381,6 +2388,7 @@ async def _get_room_changes_for_incremental_sync( full_state=False, since_token=None if newly_joined else since_token, upto_token=prev_batch_token, + end_token=now_token, ) else: entry = RoomSyncResultBuilder( @@ -2391,6 +2399,7 @@ async def _get_room_changes_for_incremental_sync( full_state=False, since_token=since_token, upto_token=since_token, + end_token=now_token, ) room_entries.append(entry) @@ -2449,6 +2458,7 @@ async def _get_room_changes_for_initial_sync( full_state=True, since_token=since_token, upto_token=now_token, + end_token=now_token, ) ) elif event.membership == Membership.INVITE: @@ -2478,6 +2488,7 @@ async def _get_room_changes_for_initial_sync( full_state=True, since_token=since_token, upto_token=leave_token, + end_token=leave_token, ) ) @@ -2548,6 +2559,7 @@ async def _generate_room_entry( { "since_token": since_token, "upto_token": upto_token, + "end_token": room_builder.end_token, } ) @@ -2621,7 +2633,7 @@ async def _generate_room_entry( batch, sync_config, since_token, - now_token, + room_builder.end_token, full_state=full_state, ) else: @@ -2781,6 +2793,70 @@ def _calculate_state( e for t, e in timeline_start.items() if t[0] == EventTypes.Member ) + # Naively, we would just return the difference between the state at the start + # of the timeline (`timeline_start_ids`) and that at the end of the previous sync + # (`previous_timeline_end_ids`). However, that fails in the presence of forks in + # the DAG. + # + # For example, consider a DAG such as the following: + # + # E1 + # ↗ ↖ + # | S2 + # | ↑ + # --|------|---- + # | | + # E3 | + # ↖ / + # E4 + # + # ... and a filter that means we only return 2 events, represented by the dashed + # horizontal line. Assuming S2 was *not* included in the previous sync, we need to + # include it in the `state` section. + # + # Note that the state at the start of the timeline (E3) does not include S2. So, + # to make sure it gets included in the calculation here, we actually look at + # the state at the *end* of the timeline, and subtract any events that are present + # in the timeline. + # + # ---------- + # + # Aside 1: You may then wonder if we need to include `timeline_start` in the + # calculation. Consider a linear DAG: + # + # E1 + # ↑ + # S2 + # ↑ + # ----|------ + # | + # E3 + # ↑ + # S4 + # ↑ + # E5 + # + # ... where S2 and S4 change the same piece of state; and where we have a filter + # that returns 3 events (E3, S4, E5). We still need to tell the client about S2, + # because it might affect the display of E3. However, the state at the end of the + # timeline only tells us about S4; if we don't inspect `timeline_start` we won't + # find out about S2. + # + # (There are yet more complicated cases in which a state event is excluded from the + # timeline, but whose effect actually lands in the DAG in the *middle* of the + # timeline. We have no way to represent that in the /sync response, and we don't + # even try; it is ether omitted or plonked into `state` as if it were at the start + # of the timeline, depending on what else is in the timeline.) + # + # ---------- + # + # Aside 2: it's worth noting that `timeline_end`, as provided to us, is actually + # the state *before* the final event in the timeline. In other words: if the final + # event in the timeline is a state event, it won't be included in `timeline_end`. + # However, that doesn't matter here, because the only difference can be in that + # one piece of state, and by definition that event is in the timeline, so we + # don't need to include it in the `state` section. + state_ids = ( (timeline_end_ids | timeline_start_ids) - previous_timeline_end_ids @@ -2883,13 +2959,30 @@ class RoomSyncResultBuilder: Attributes: room_id + rtype: One of `"joined"` or `"archived"` + events: List of events to include in the room (more events may be added when generating result). + newly_joined: If the user has newly joined the room + full_state: Whether the full state should be sent in result + since_token: Earliest point to return events from, or None - upto_token: Latest point to return events from. + + upto_token: Latest point to return events from. If `events` is populated, + this is set to the token at the start of `events` + + end_token: The last point in the timeline that the client should see events + from. Normally this will be the same as the global `now_token`, but in + the case of rooms where the user has left the room, this will be the point + just after their leave event. + + This is used in the calculation of the state which is returned in `state`: + any state changes *up to* `end_token` (and not beyond!) which are not + reflected in the timeline need to be returned in `state`. + out_of_band: whether the events in the room are "out of band" events and the server isn't in the room. """ @@ -2901,5 +2994,5 @@ class RoomSyncResultBuilder: full_state: bool since_token: Optional[StreamToken] upto_token: StreamToken - + end_token: StreamToken out_of_band: bool = False diff --git a/tests/handlers/test_sync.py b/tests/handlers/test_sync.py index 1b36324b8f..897c52c785 100644 --- a/tests/handlers/test_sync.py +++ b/tests/handlers/test_sync.py @@ -17,14 +17,16 @@ # [This file includes modifications made by New Vector Limited] # # -from typing import Collection, List, Optional +from typing import Collection, ContextManager, List, Optional from unittest.mock import AsyncMock, Mock, patch +from parameterized import parameterized + from twisted.test.proto_helpers import MemoryReactor from synapse.api.constants import EventTypes, JoinRules from synapse.api.errors import Codes, ResourceLimitError -from synapse.api.filtering import Filtering +from synapse.api.filtering import FilterCollection, Filtering from synapse.api.room_versions import RoomVersion, RoomVersions from synapse.events import EventBase from synapse.events.snapshot import EventContext @@ -33,7 +35,7 @@ from synapse.rest import admin from synapse.rest.client import knock, login, room from synapse.server import HomeServer -from synapse.types import UserID, create_requester +from synapse.types import JsonDict, UserID, create_requester from synapse.util import Clock import tests.unittest @@ -258,13 +260,7 @@ def test_ban_wins_race_with_join(self) -> None: # Eve tries to join the room. We monkey patch the internal logic which selects # the prev_events used when creating the join event, such that the ban does not # precede the join. - mocked_get_prev_events = patch.object( - self.hs.get_datastores().main, - "get_prev_events_for_room", - new_callable=AsyncMock, - return_value=[last_room_creation_event_id], - ) - with mocked_get_prev_events: + with self._patch_get_latest_events([last_room_creation_event_id]): self.helper.join(room_id, eve, tok=eve_token) # Eve makes a second, incremental sync. @@ -288,6 +284,180 @@ def test_ban_wins_race_with_join(self) -> None: ) self.assertEqual(eve_initial_sync_after_join.joined, []) + def test_state_includes_changes_on_forks(self) -> None: + """State changes that happen on a fork of the DAG must be included in `state` + + Given the following DAG: + + E1 + ↗ ↖ + | S2 + | ↑ + --|------|---- + | | + E3 | + ↖ / + E4 + + ... and a filter that means we only return 2 events, represented by the dashed + horizontal line: `S2` must be included in the `state` section. + """ + alice = self.register_user("alice", "password") + alice_tok = self.login(alice, "password") + alice_requester = create_requester(alice) + room_id = self.helper.create_room_as(alice, is_public=True, tok=alice_tok) + + # Do an initial sync as Alice to get a known starting point. + initial_sync_result = self.get_success( + self.sync_handler.wait_for_sync_for_user( + alice_requester, generate_sync_config(alice) + ) + ) + last_room_creation_event_id = ( + initial_sync_result.joined[0].timeline.events[-1].event_id + ) + + # Send a state event, and a regular event, both using the same prev ID + with self._patch_get_latest_events([last_room_creation_event_id]): + s2_event = self.helper.send_state(room_id, "s2", {}, tok=alice_tok)[ + "event_id" + ] + e3_event = self.helper.send(room_id, "e3", tok=alice_tok)["event_id"] + + # Send a final event, joining the two branches of the dag + e4_event = self.helper.send(room_id, "e4", tok=alice_tok)["event_id"] + + # do an incremental sync, with a filter that will ensure we only get two of + # the three new events. + incremental_sync = self.get_success( + self.sync_handler.wait_for_sync_for_user( + alice_requester, + generate_sync_config( + alice, + filter_collection=FilterCollection( + self.hs, {"room": {"timeline": {"limit": 2}}} + ), + ), + since_token=initial_sync_result.next_batch, + ) + ) + + # The state event should appear in the 'state' section of the response. + room_sync = incremental_sync.joined[0] + self.assertEqual(room_sync.room_id, room_id) + self.assertTrue(room_sync.timeline.limited) + self.assertEqual( + [e.event_id for e in room_sync.timeline.events], + [e3_event, e4_event], + ) + self.assertEqual( + [e.event_id for e in room_sync.state.values()], + [s2_event], + ) + + @parameterized.expand( + [ + (False, False), + (True, False), + (False, True), + (True, True), + ] + ) + def test_archived_rooms_do_not_include_state_after_leave( + self, initial_sync: bool, empty_timeline: bool + ) -> None: + """If the user leaves the room, state changes that happen after they leave are not returned. + + We try with both a zero and a normal timeline limit, + and we try both an initial sync and an incremental sync for both. + """ + if empty_timeline and not initial_sync: + # FIXME synapse doesn't return the room at all in this situation! + self.skipTest("Synapse does not correctly handle this case") + + # Alice creates the room, and bob joins. + alice = self.register_user("alice", "password") + alice_tok = self.login(alice, "password") + + bob = self.register_user("bob", "password") + bob_tok = self.login(bob, "password") + bob_requester = create_requester(bob) + + room_id = self.helper.create_room_as(alice, is_public=True, tok=alice_tok) + self.helper.join(room_id, bob, tok=bob_tok) + + initial_sync_result = self.get_success( + self.sync_handler.wait_for_sync_for_user( + bob_requester, generate_sync_config(bob) + ) + ) + + # Alice sends a message and a state + before_message_event = self.helper.send(room_id, "before", tok=alice_tok)[ + "event_id" + ] + before_state_event = self.helper.send_state( + room_id, "test_state", {"body": "before"}, tok=alice_tok + )["event_id"] + + # Bob leaves + leave_event = self.helper.leave(room_id, bob, tok=bob_tok)["event_id"] + + # Alice sends some more stuff + self.helper.send(room_id, "after", tok=alice_tok)["event_id"] + self.helper.send_state(room_id, "test_state", {"body": "after"}, tok=alice_tok)[ + "event_id" + ] + + # And now, Bob resyncs. + filter_dict: JsonDict = {"room": {"include_leave": True}} + if empty_timeline: + filter_dict["room"]["timeline"] = {"limit": 0} + sync_room_result = self.get_success( + self.sync_handler.wait_for_sync_for_user( + bob_requester, + generate_sync_config( + bob, filter_collection=FilterCollection(self.hs, filter_dict) + ), + since_token=None if initial_sync else initial_sync_result.next_batch, + ) + ).archived[0] + + if empty_timeline: + # The timeline should be empty + self.assertEqual(sync_room_result.timeline.events, []) + + # And the state should include the leave event... + self.assertEqual( + sync_room_result.state[("m.room.member", bob)].event_id, leave_event + ) + # ... and the state change before he left. + self.assertEqual( + sync_room_result.state[("test_state", "")].event_id, before_state_event + ) + else: + # The last three events in the timeline should be those leading up to the + # leave + self.assertEqual( + [e.event_id for e in sync_room_result.timeline.events[-3:]], + [before_message_event, before_state_event, leave_event], + ) + # ... And the state should be empty + self.assertEqual(sync_room_result.state, {}) + + def _patch_get_latest_events(self, latest_events: List[str]) -> ContextManager: + """Monkey-patch `get_prev_events_for_room` + + Returns a context manager which will replace the implementation of + `get_prev_events_for_room` with one which returns `latest_events`. + """ + return patch.object( + self.hs.get_datastores().main, + "get_prev_events_for_room", + new_callable=AsyncMock, + return_value=latest_events, + ) + def test_call_invite_in_public_room_not_returned(self) -> None: user = self.register_user("alice", "password") tok = self.login(user, "password") @@ -401,14 +571,26 @@ async def _check_sigs_and_hash_for_pulled_events_and_fetch( def generate_sync_config( - user_id: str, device_id: Optional[str] = "device_id" + user_id: str, + device_id: Optional[str] = "device_id", + filter_collection: Optional[FilterCollection] = None, ) -> SyncConfig: - """Generate a sync config (with a unique request key).""" + """Generate a sync config (with a unique request key). + + Args: + user_id: user who is syncing. + device_id: device that is syncing. Defaults to "device_id". + filter_collection: filter to apply. Defaults to the default filter (ie, + return everything, with a default limit) + """ + if filter_collection is None: + filter_collection = Filtering(Mock()).DEFAULT_FILTER_COLLECTION + global _request_key _request_key += 1 return SyncConfig( user=UserID.from_string(user_id), - filter_collection=Filtering(Mock()).DEFAULT_FILTER_COLLECTION, + filter_collection=filter_collection, is_guest=False, request_key=("request_key", _request_key), device_id=device_id, diff --git a/tests/rest/client/utils.py b/tests/rest/client/utils.py index daa68d78b9..fe00afe198 100644 --- a/tests/rest/client/utils.py +++ b/tests/rest/client/utils.py @@ -170,8 +170,8 @@ def invite( targ: Optional[str] = None, expect_code: int = HTTPStatus.OK, tok: Optional[str] = None, - ) -> None: - self.change_membership( + ) -> JsonDict: + return self.change_membership( room=room, src=src, targ=targ, @@ -189,8 +189,8 @@ def join( appservice_user_id: Optional[str] = None, expect_errcode: Optional[Codes] = None, expect_additional_fields: Optional[dict] = None, - ) -> None: - self.change_membership( + ) -> JsonDict: + return self.change_membership( room=room, src=user, targ=user, @@ -242,8 +242,8 @@ def leave( user: Optional[str] = None, expect_code: int = HTTPStatus.OK, tok: Optional[str] = None, - ) -> None: - self.change_membership( + ) -> JsonDict: + return self.change_membership( room=room, src=user, targ=user, @@ -282,7 +282,7 @@ def change_membership( expect_code: int = HTTPStatus.OK, expect_errcode: Optional[str] = None, expect_additional_fields: Optional[dict] = None, - ) -> None: + ) -> JsonDict: """ Send a membership state event into a room. @@ -298,6 +298,9 @@ def change_membership( using an application service access token in `tok`. expect_code: The expected HTTP response code expect_errcode: The expected Matrix error code + + Returns: + The JSON response """ temp_id = self.auth_user_id self.auth_user_id = src @@ -356,6 +359,7 @@ def change_membership( ) self.auth_user_id = temp_id + return channel.json_body def send( self, From 230b709d9d8b09fd4884be1265db535263975e35 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> Date: Thu, 4 Apr 2024 13:14:24 +0100 Subject: [PATCH 05/13] `/sync`: fix bug in calculating `state` response (#16930) Fix a long-standing issue which could cause state to be omitted from the sync response if the last event was filtered out. Fixes: https://github.com/element-hq/synapse/issues/16928 --- changelog.d/16930.bugfix | 1 + synapse/handlers/sync.py | 54 ++++++------------------- tests/handlers/test_sync.py | 80 +++++++++++++++++++++++++++++++++++++ 3 files changed, 94 insertions(+), 41 deletions(-) create mode 100644 changelog.d/16930.bugfix diff --git a/changelog.d/16930.bugfix b/changelog.d/16930.bugfix new file mode 100644 index 0000000000..21f964ef97 --- /dev/null +++ b/changelog.d/16930.bugfix @@ -0,0 +1 @@ +Fix a long-standing bug which could cause state to be omitted from `/sync` responses when certain events are filtered out of the timeline. diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 7fcd54ac55..773e291aa8 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -1187,15 +1187,14 @@ async def _compute_state_delta_for_full_sync( await_full_state = True lazy_load_members = False - if batch: - state_at_timeline_end = ( - await self._state_storage_controller.get_state_ids_for_event( - batch.events[-1].event_id, - state_filter=state_filter, - await_full_state=await_full_state, - ) - ) + state_at_timeline_end = await self.get_state_at( + room_id, + stream_position=end_token, + state_filter=state_filter, + await_full_state=await_full_state, + ) + if batch: state_at_timeline_start = ( await self._state_storage_controller.get_state_ids_for_event( batch.events[0].event_id, @@ -1204,13 +1203,6 @@ async def _compute_state_delta_for_full_sync( ) ) else: - state_at_timeline_end = await self.get_state_at( - room_id, - stream_position=end_token, - state_filter=state_filter, - await_full_state=await_full_state, - ) - state_at_timeline_start = state_at_timeline_end state_ids = _calculate_state( @@ -1305,23 +1297,12 @@ async def _compute_state_delta_for_incremental_sync( await_full_state=await_full_state, ) - if batch: - state_at_timeline_end = ( - await self._state_storage_controller.get_state_ids_for_event( - batch.events[-1].event_id, - state_filter=state_filter, - await_full_state=await_full_state, - ) - ) - else: - # We can get here if the user has ignored the senders of all - # the recent events. - state_at_timeline_end = await self.get_state_at( - room_id, - stream_position=end_token, - state_filter=state_filter, - await_full_state=await_full_state, - ) + state_at_timeline_end = await self.get_state_at( + room_id, + stream_position=end_token, + state_filter=state_filter, + await_full_state=await_full_state, + ) state_ids = _calculate_state( timeline_contains=timeline_state, @@ -2847,15 +2828,6 @@ def _calculate_state( # timeline. We have no way to represent that in the /sync response, and we don't # even try; it is ether omitted or plonked into `state` as if it were at the start # of the timeline, depending on what else is in the timeline.) - # - # ---------- - # - # Aside 2: it's worth noting that `timeline_end`, as provided to us, is actually - # the state *before* the final event in the timeline. In other words: if the final - # event in the timeline is a state event, it won't be included in `timeline_end`. - # However, that doesn't matter here, because the only difference can be in that - # one piece of state, and by definition that event is in the timeline, so we - # don't need to include it in the `state` section. state_ids = ( (timeline_end_ids | timeline_start_ids) diff --git a/tests/handlers/test_sync.py b/tests/handlers/test_sync.py index 897c52c785..5d8e886541 100644 --- a/tests/handlers/test_sync.py +++ b/tests/handlers/test_sync.py @@ -355,6 +355,86 @@ def test_state_includes_changes_on_forks(self) -> None: [s2_event], ) + def test_state_includes_changes_on_forks_when_events_excluded(self) -> None: + """A variation on the previous test, but where one event is filtered + + The DAG is the same as the previous test, but E4 is excluded by the filter. + + E1 + ↗ ↖ + | S2 + | ↑ + --|------|---- + | | + E3 | + ↖ / + (E4) + + """ + + alice = self.register_user("alice", "password") + alice_tok = self.login(alice, "password") + alice_requester = create_requester(alice) + room_id = self.helper.create_room_as(alice, is_public=True, tok=alice_tok) + + # Do an initial sync as Alice to get a known starting point. + initial_sync_result = self.get_success( + self.sync_handler.wait_for_sync_for_user( + alice_requester, generate_sync_config(alice) + ) + ) + last_room_creation_event_id = ( + initial_sync_result.joined[0].timeline.events[-1].event_id + ) + + # Send a state event, and a regular event, both using the same prev ID + with self._patch_get_latest_events([last_room_creation_event_id]): + s2_event = self.helper.send_state(room_id, "s2", {}, tok=alice_tok)[ + "event_id" + ] + e3_event = self.helper.send(room_id, "e3", tok=alice_tok)["event_id"] + + # Send a final event, joining the two branches of the dag + self.helper.send(room_id, "e4", type="not_a_normal_message", tok=alice_tok)[ + "event_id" + ] + + # do an incremental sync, with a filter that will only return E3, excluding S2 + # and E4. + incremental_sync = self.get_success( + self.sync_handler.wait_for_sync_for_user( + alice_requester, + generate_sync_config( + alice, + filter_collection=FilterCollection( + self.hs, + { + "room": { + "timeline": { + "limit": 1, + "not_types": ["not_a_normal_message"], + } + } + }, + ), + ), + since_token=initial_sync_result.next_batch, + ) + ) + + # The state event should appear in the 'state' section of the response. + room_sync = incremental_sync.joined[0] + self.assertEqual(room_sync.room_id, room_id) + self.assertTrue(room_sync.timeline.limited) + self.assertEqual( + [e.event_id for e in room_sync.timeline.events], + [e3_event], + ) + self.assertEqual( + [e.event_id for e in room_sync.state.values()], + [s2_event], + ) + @parameterized.expand( [ (False, False), From 0e68e9b7f4dd64d1b4b28feb4050e4b4fd85fb9d Mon Sep 17 00:00:00 2001 From: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> Date: Thu, 4 Apr 2024 17:15:35 +0100 Subject: [PATCH 06/13] Fix bug in calculating state for non-gappy syncs (#16942) Unfortunately, the optimisation we applied here for non-gappy syncs is not actually valid. Fixes https://github.com/element-hq/synapse/issues/16941. ~~Based on https://github.com/element-hq/synapse/pull/16930.~~ Requires https://github.com/matrix-org/sytest/pull/1374. --- changelog.d/16930.bugfix | 2 +- changelog.d/16932.bugfix | 2 +- changelog.d/16942.bugfix | 1 + synapse/handlers/sync.py | 91 +++++++++++++------------------ tests/handlers/test_sync.py | 105 ++++++++++++++++++++++++++++++++++++ 5 files changed, 145 insertions(+), 56 deletions(-) create mode 100644 changelog.d/16942.bugfix diff --git a/changelog.d/16930.bugfix b/changelog.d/16930.bugfix index 21f964ef97..99ed435d75 100644 --- a/changelog.d/16930.bugfix +++ b/changelog.d/16930.bugfix @@ -1 +1 @@ -Fix a long-standing bug which could cause state to be omitted from `/sync` responses when certain events are filtered out of the timeline. +Fix various long-standing bugs which could cause incorrect state to be returned from `/sync` in certain situations. diff --git a/changelog.d/16932.bugfix b/changelog.d/16932.bugfix index 624388ea8e..99ed435d75 100644 --- a/changelog.d/16932.bugfix +++ b/changelog.d/16932.bugfix @@ -1 +1 @@ -Fix a long-standing bug which could cause incorrect state to be returned from `/sync` for rooms where the user has left. \ No newline at end of file +Fix various long-standing bugs which could cause incorrect state to be returned from `/sync` in certain situations. diff --git a/changelog.d/16942.bugfix b/changelog.d/16942.bugfix new file mode 100644 index 0000000000..99ed435d75 --- /dev/null +++ b/changelog.d/16942.bugfix @@ -0,0 +1 @@ +Fix various long-standing bugs which could cause incorrect state to be returned from `/sync` in certain situations. diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 773e291aa8..554c820f79 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -1195,6 +1195,8 @@ async def _compute_state_delta_for_full_sync( ) if batch: + # Strictly speaking, this returns the state *after* the first event in the + # timeline, but that is good enough here. state_at_timeline_start = ( await self._state_storage_controller.get_state_ids_for_event( batch.events[0].event_id, @@ -1257,25 +1259,25 @@ async def _compute_state_delta_for_incremental_sync( await_full_state = True lazy_load_members = False - if batch.limited: - if batch: - state_at_timeline_start = ( - await self._state_storage_controller.get_state_ids_for_event( - batch.events[0].event_id, - state_filter=state_filter, - await_full_state=await_full_state, - ) - ) - else: - # We can get here if the user has ignored the senders of all - # the recent events. - state_at_timeline_start = await self.get_state_at( - room_id, - stream_position=end_token, + if batch: + state_at_timeline_start = ( + await self._state_storage_controller.get_state_ids_for_event( + batch.events[0].event_id, state_filter=state_filter, await_full_state=await_full_state, ) + ) + else: + # We can get here if the user has ignored the senders of all + # the recent events. + state_at_timeline_start = await self.get_state_at( + room_id, + stream_position=end_token, + state_filter=state_filter, + await_full_state=await_full_state, + ) + if batch.limited: # for now, we disable LL for gappy syncs - see # https://github.com/vector-im/riot-web/issues/7211#issuecomment-419976346 # N.B. this slows down incr syncs as we are now processing way @@ -1290,47 +1292,28 @@ async def _compute_state_delta_for_incremental_sync( # about them). state_filter = StateFilter.all() - state_at_previous_sync = await self.get_state_at( - room_id, - stream_position=since_token, - state_filter=state_filter, - await_full_state=await_full_state, - ) + state_at_previous_sync = await self.get_state_at( + room_id, + stream_position=since_token, + state_filter=state_filter, + await_full_state=await_full_state, + ) - state_at_timeline_end = await self.get_state_at( - room_id, - stream_position=end_token, - state_filter=state_filter, - await_full_state=await_full_state, - ) + state_at_timeline_end = await self.get_state_at( + room_id, + stream_position=end_token, + state_filter=state_filter, + await_full_state=await_full_state, + ) + + state_ids = _calculate_state( + timeline_contains=timeline_state, + timeline_start=state_at_timeline_start, + timeline_end=state_at_timeline_end, + previous_timeline_end=state_at_previous_sync, + lazy_load_members=lazy_load_members, + ) - state_ids = _calculate_state( - timeline_contains=timeline_state, - timeline_start=state_at_timeline_start, - timeline_end=state_at_timeline_end, - previous_timeline_end=state_at_previous_sync, - lazy_load_members=lazy_load_members, - ) - else: - state_ids = {} - if lazy_load_members: - if members_to_fetch and batch.events: - # We're returning an incremental sync, with no - # "gap" since the previous sync, so normally there would be - # no state to return. - # But we're lazy-loading, so the client might need some more - # member events to understand the events in this timeline. - # So we fish out all the member events corresponding to the - # timeline here. The caller will then dedupe any redundant ones. - - state_ids = await self._state_storage_controller.get_state_ids_for_event( - batch.events[0].event_id, - # we only want members! - state_filter=StateFilter.from_types( - (EventTypes.Member, member) for member in members_to_fetch - ), - await_full_state=False, - ) return state_ids async def _find_missing_partial_state_memberships( diff --git a/tests/handlers/test_sync.py b/tests/handlers/test_sync.py index 5d8e886541..57e14d79ca 100644 --- a/tests/handlers/test_sync.py +++ b/tests/handlers/test_sync.py @@ -435,6 +435,111 @@ def test_state_includes_changes_on_forks_when_events_excluded(self) -> None: [s2_event], ) + def test_state_includes_changes_on_ungappy_syncs(self) -> None: + """Test `state` where the sync is not gappy. + + We start with a DAG like this: + + E1 + ↗ ↖ + | S2 + | + --|--- + | + E3 + + ... and initialsync with `limit=1`, represented by the horizontal dashed line. + At this point, we do not expect S2 to appear in the response at all (since + it is excluded from the timeline by the `limit`, and the state is based on the + state after the most recent event before the sync token (E3), which doesn't + include S2. + + Now more events arrive, and we do an incremental sync: + + E1 + ↗ ↖ + | S2 + | ↑ + E3 | + ↑ | + --|------|---- + | | + E4 | + ↖ / + E5 + + This is the last chance for us to tell the client about S2, so it *must* be + included in the response. + """ + alice = self.register_user("alice", "password") + alice_tok = self.login(alice, "password") + alice_requester = create_requester(alice) + room_id = self.helper.create_room_as(alice, is_public=True, tok=alice_tok) + + # Do an initial sync to get a known starting point. + initial_sync_result = self.get_success( + self.sync_handler.wait_for_sync_for_user( + alice_requester, generate_sync_config(alice) + ) + ) + last_room_creation_event_id = ( + initial_sync_result.joined[0].timeline.events[-1].event_id + ) + + # Send a state event, and a regular event, both using the same prev ID + with self._patch_get_latest_events([last_room_creation_event_id]): + s2_event = self.helper.send_state(room_id, "s2", {}, tok=alice_tok)[ + "event_id" + ] + e3_event = self.helper.send(room_id, "e3", tok=alice_tok)["event_id"] + + # Another initial sync, with limit=1 + initial_sync_result = self.get_success( + self.sync_handler.wait_for_sync_for_user( + alice_requester, + generate_sync_config( + alice, + filter_collection=FilterCollection( + self.hs, {"room": {"timeline": {"limit": 1}}} + ), + ), + ) + ) + room_sync = initial_sync_result.joined[0] + self.assertEqual(room_sync.room_id, room_id) + self.assertEqual( + [e.event_id for e in room_sync.timeline.events], + [e3_event], + ) + self.assertNotIn(s2_event, [e.event_id for e in room_sync.state.values()]) + + # More events, E4 and E5 + with self._patch_get_latest_events([e3_event]): + e4_event = self.helper.send(room_id, "e4", tok=alice_tok)["event_id"] + e5_event = self.helper.send(room_id, "e5", tok=alice_tok)["event_id"] + + # Now incremental sync + incremental_sync = self.get_success( + self.sync_handler.wait_for_sync_for_user( + alice_requester, + generate_sync_config(alice), + since_token=initial_sync_result.next_batch, + ) + ) + + # The state event should appear in the 'state' section of the response. + room_sync = incremental_sync.joined[0] + self.assertEqual(room_sync.room_id, room_id) + self.assertFalse(room_sync.timeline.limited) + self.assertEqual( + [e.event_id for e in room_sync.timeline.events], + [e4_event, e5_event], + ) + self.assertEqual( + [e.event_id for e in room_sync.state.values()], + [s2_event], + ) + @parameterized.expand( [ (False, False), From 5360baeb6439366c29d55038da7f677c64eea4bf Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 5 Apr 2024 12:46:34 +0100 Subject: [PATCH 07/13] Pull out fewer receipts from DB when doing push (#17049) Before we were pulling out *all* read receipts for a user for every event we pushed. Instead let's only pull out the relevant receipts. This also pulled out the event rows for each receipt, causing load on the events table. --- changelog.d/17049.misc | 1 + .../databases/main/event_push_actions.py | 124 ++++++++++++++---- 2 files changed, 103 insertions(+), 22 deletions(-) create mode 100644 changelog.d/17049.misc diff --git a/changelog.d/17049.misc b/changelog.d/17049.misc new file mode 100644 index 0000000000..f71a6473a2 --- /dev/null +++ b/changelog.d/17049.misc @@ -0,0 +1 @@ +Improve database performance by reducing number of receipts fetched when sending push notifications. diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py index 3a5666cd9b..40bf000e9c 100644 --- a/synapse/storage/databases/main/event_push_actions.py +++ b/synapse/storage/databases/main/event_push_actions.py @@ -106,7 +106,7 @@ ) from synapse.storage.databases.main.receipts import ReceiptsWorkerStore from synapse.storage.databases.main.stream import StreamWorkerStore -from synapse.types import JsonDict +from synapse.types import JsonDict, StrCollection from synapse.util import json_encoder from synapse.util.caches.descriptors import cached @@ -859,37 +859,86 @@ def f(txn: LoggingTransaction) -> List[str]: return await self.db_pool.runInteraction("get_push_action_users_in_range", f) - def _get_receipts_by_room_txn( - self, txn: LoggingTransaction, user_id: str + def _get_receipts_for_room_and_threads_txn( + self, + txn: LoggingTransaction, + user_id: str, + room_ids: StrCollection, + thread_ids: StrCollection, ) -> Dict[str, _RoomReceipt]: """ - Generate a map of room ID to the latest stream ordering that has been - read by the given user. + Get (private) read receipts for a user in each of the given room IDs + and thread IDs. - Args: - txn: - user_id: The user to fetch receipts for. + Note: The corresponding room ID for each thread must appear in + `room_ids` arg. Returns: A map including all rooms the user is in with a receipt. It maps room IDs to _RoomReceipt instances """ - receipt_types_clause, args = make_in_list_sql_clause( + + receipt_types_clause, receipts_args = make_in_list_sql_clause( self.database_engine, "receipt_type", (ReceiptTypes.READ, ReceiptTypes.READ_PRIVATE), ) + thread_ids_clause, thread_ids_args = make_in_list_sql_clause( + self.database_engine, + "thread_id", + thread_ids, + ) + + room_ids_clause, room_ids_args = make_in_list_sql_clause( + self.database_engine, + "room_id", + room_ids, + ) + + # We use the union of two (almost identical) queries here, the first to + # fetch the specific thread receipts and the second to fetch the + # unthreaded receipts. + # + # This SQL is optimized to use the indices we have on + # `receipts_linearized`. + # + # We compare room ID and thread IDs independently due to the above, + # which means that this query might return more rows than we need if the + # same thread ID appears across different rooms (e.g. 'main' thread ID). + # This doesn't cause any logic issues, and isn't a performance concern + # given this function generally gets called with only one room and + # thread ID. sql = f""" SELECT room_id, thread_id, MAX(stream_ordering) FROM receipts_linearized INNER JOIN events USING (room_id, event_id) WHERE {receipt_types_clause} + AND {thread_ids_clause} + AND {room_ids_clause} + AND user_id = ? + GROUP BY room_id, thread_id + + UNION ALL + + SELECT room_id, thread_id, MAX(stream_ordering) + FROM receipts_linearized + INNER JOIN events USING (room_id, event_id) + WHERE {receipt_types_clause} + AND {room_ids_clause} + AND thread_id IS NULL AND user_id = ? GROUP BY room_id, thread_id """ - args.extend((user_id,)) + args = list(receipts_args) + args.extend(thread_ids_args) + args.extend(room_ids_args) + args.append(user_id) + args.extend(receipts_args) + args.extend(room_ids_args) + args.append(user_id) + txn.execute(sql, args) result: Dict[str, _RoomReceipt] = {} @@ -925,12 +974,6 @@ async def get_unread_push_actions_for_user_in_range_for_http( The list will have between 0~limit entries. """ - receipts_by_room = await self.db_pool.runInteraction( - "get_unread_push_actions_for_user_in_range_http_receipts", - self._get_receipts_by_room_txn, - user_id=user_id, - ) - def get_push_actions_txn( txn: LoggingTransaction, ) -> List[Tuple[str, str, str, int, str, bool]]: @@ -952,6 +995,27 @@ def get_push_actions_txn( "get_unread_push_actions_for_user_in_range_http", get_push_actions_txn ) + room_ids = set() + thread_ids = [] + for ( + _, + room_id, + thread_id, + _, + _, + _, + ) in push_actions: + room_ids.add(room_id) + thread_ids.append(thread_id) + + receipts_by_room = await self.db_pool.runInteraction( + "get_unread_push_actions_for_user_in_range_http_receipts", + self._get_receipts_for_room_and_threads_txn, + user_id=user_id, + room_ids=room_ids, + thread_ids=thread_ids, + ) + notifs = [ HttpPushAction( event_id=event_id, @@ -998,12 +1062,6 @@ async def get_unread_push_actions_for_user_in_range_for_email( The list will have between 0~limit entries. """ - receipts_by_room = await self.db_pool.runInteraction( - "get_unread_push_actions_for_user_in_range_email_receipts", - self._get_receipts_by_room_txn, - user_id=user_id, - ) - def get_push_actions_txn( txn: LoggingTransaction, ) -> List[Tuple[str, str, str, int, str, bool, int]]: @@ -1026,6 +1084,28 @@ def get_push_actions_txn( "get_unread_push_actions_for_user_in_range_email", get_push_actions_txn ) + room_ids = set() + thread_ids = [] + for ( + _, + room_id, + thread_id, + _, + _, + _, + _, + ) in push_actions: + room_ids.add(room_id) + thread_ids.append(thread_id) + + receipts_by_room = await self.db_pool.runInteraction( + "get_unread_push_actions_for_user_in_range_email_receipts", + self._get_receipts_for_room_and_threads_txn, + user_id=user_id, + room_ids=room_ids, + thread_ids=thread_ids, + ) + # Make a list of dicts from the two sets of results. notifs = [ EmailPushAction( From 1f8f991d51a3311d67ea0b717bf168553d51b441 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 8 Apr 2024 14:25:28 +0100 Subject: [PATCH 08/13] Add back fast path for non-gappy syncs (#17064) PR #16942 removed an invalid optimisation that avoided pulling out state for non-gappy syncs. This causes a large increase in DB usage. c.f. #16941 for why that optimisation was wrong. However, we can still optimise in the simple case where the events in the timeline are a linear chain without any branching/merging of the DAG. cc. @richvdh --- changelog.d/17064.bugfix | 1 + synapse/handlers/sync.py | 36 ++++++++++++++++++++++++++++++++++++ 2 files changed, 37 insertions(+) create mode 100644 changelog.d/17064.bugfix diff --git a/changelog.d/17064.bugfix b/changelog.d/17064.bugfix new file mode 100644 index 0000000000..99ed435d75 --- /dev/null +++ b/changelog.d/17064.bugfix @@ -0,0 +1 @@ +Fix various long-standing bugs which could cause incorrect state to be returned from `/sync` in certain situations. diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 554c820f79..7c29c15540 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -1259,6 +1259,42 @@ async def _compute_state_delta_for_incremental_sync( await_full_state = True lazy_load_members = False + # For a non-gappy sync if the events in the timeline are simply a linear + # chain (i.e. no merging/branching of the graph), then we know the state + # delta between the end of the previous sync and start of the new one is + # empty. + # + # c.f. #16941 for an example of why we can't do this for all non-gappy + # syncs. + is_linear_timeline = False + if batch.events: + prev_event_id = batch.events[0].event_id + for e in batch.events[1:]: + if e.prev_event_ids() != [prev_event_id]: + break + else: + is_linear_timeline = True + + if is_linear_timeline and not batch.limited: + state_ids: StateMap[str] = {} + if lazy_load_members: + if members_to_fetch and batch.events: + # We're lazy-loading, so the client might need some more + # member events to understand the events in this timeline. + # So we fish out all the member events corresponding to the + # timeline here. The caller will then dedupe any redundant + # ones. + + state_ids = await self._state_storage_controller.get_state_ids_for_event( + batch.events[0].event_id, + # we only want members! + state_filter=StateFilter.from_types( + (EventTypes.Member, member) for member in members_to_fetch + ), + await_full_state=False, + ) + return state_ids + if batch: state_at_timeline_start = ( await self._state_storage_controller.get_state_ids_for_event( From 4d10a8fb1807b3ab742e17a562bcae377057be50 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 8 Apr 2024 14:55:19 +0100 Subject: [PATCH 09/13] Fixups to #17064 (#17065) Forget a line, and an empty batch is trivially linear. c.f. #17064 --- changelog.d/17065.bugfix | 1 + synapse/handlers/sync.py | 3 +++ 2 files changed, 4 insertions(+) create mode 100644 changelog.d/17065.bugfix diff --git a/changelog.d/17065.bugfix b/changelog.d/17065.bugfix new file mode 100644 index 0000000000..99ed435d75 --- /dev/null +++ b/changelog.d/17065.bugfix @@ -0,0 +1 @@ +Fix various long-standing bugs which could cause incorrect state to be returned from `/sync` in certain situations. diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 7c29c15540..410805e806 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -1272,8 +1272,11 @@ async def _compute_state_delta_for_incremental_sync( for e in batch.events[1:]: if e.prev_event_ids() != [prev_event_id]: break + prev_event_id = e.event_id else: is_linear_timeline = True + else: + is_linear_timeline = True if is_linear_timeline and not batch.limited: state_ids: StateMap[str] = {} From 9b8597e431f17676a1cd7b648e75504f881f510c Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Mon, 8 Apr 2024 17:50:16 +0100 Subject: [PATCH 10/13] Bump types-requests from 2.31.0.20240125 to 2.31.0.20240406 (#17063) Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- poetry.lock | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/poetry.lock b/poetry.lock index 5332774705..99540cc2ba 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1,4 +1,4 @@ -# This file is automatically @generated by Poetry 1.7.1 and should not be changed by hand. +# This file is automatically @generated by Poetry 1.8.2 and should not be changed by hand. [[package]] name = "alabaster" @@ -3156,13 +3156,13 @@ files = [ [[package]] name = "types-requests" -version = "2.31.0.20240125" +version = "2.31.0.20240406" description = "Typing stubs for requests" optional = false python-versions = ">=3.8" files = [ - {file = "types-requests-2.31.0.20240125.tar.gz", hash = "sha256:03a28ce1d7cd54199148e043b2079cdded22d6795d19a2c2a6791a4b2b5e2eb5"}, - {file = "types_requests-2.31.0.20240125-py3-none-any.whl", hash = "sha256:9592a9a4cb92d6d75d9b491a41477272b710e021011a2a3061157e2fb1f1a5d1"}, + {file = "types-requests-2.31.0.20240406.tar.gz", hash = "sha256:4428df33c5503945c74b3f42e82b181e86ec7b724620419a2966e2de604ce1a1"}, + {file = "types_requests-2.31.0.20240406-py3-none-any.whl", hash = "sha256:6216cdac377c6b9a040ac1c0404f7284bd13199c0e1bb235f4324627e8898cf5"}, ] [package.dependencies] From 3e51b370c5e278d964181fd2b74135f5f9a68808 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Mon, 8 Apr 2024 17:52:02 +0100 Subject: [PATCH 11/13] Bump typing-extensions from 4.9.0 to 4.11.0 (#17062) Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- poetry.lock | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/poetry.lock b/poetry.lock index 99540cc2ba..c97db87564 100644 --- a/poetry.lock +++ b/poetry.lock @@ -3181,13 +3181,13 @@ files = [ [[package]] name = "typing-extensions" -version = "4.9.0" +version = "4.11.0" description = "Backported and Experimental Type Hints for Python 3.8+" optional = false python-versions = ">=3.8" files = [ - {file = "typing_extensions-4.9.0-py3-none-any.whl", hash = "sha256:af72aea155e91adfc61c3ae9e0e342dbc0cba726d6cba4b6c72c1f34e47291cd"}, - {file = "typing_extensions-4.9.0.tar.gz", hash = "sha256:23478f88c37f27d76ac8aee6c905017a143b0b1b886c3c9f66bc2fd94f9f5783"}, + {file = "typing_extensions-4.11.0-py3-none-any.whl", hash = "sha256:c1f94d72897edaf4ce775bb7558d5b79d8126906a14ea5ed1635921406c0387a"}, + {file = "typing_extensions-4.11.0.tar.gz", hash = "sha256:83f085bd5ca59c80295fc2a82ab5dac679cbe02b9f33f7d83af68e241bea51b0"}, ] [[package]] From 680f60102b3cffac2eabb399745ed041a6e8c75f Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Mon, 8 Apr 2024 17:52:43 +0100 Subject: [PATCH 12/13] Bump types-pillow from 10.2.0.20240125 to 10.2.0.20240406 (#17061) Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- poetry.lock | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/poetry.lock b/poetry.lock index c97db87564..e9caed37a2 100644 --- a/poetry.lock +++ b/poetry.lock @@ -3109,13 +3109,13 @@ files = [ [[package]] name = "types-pillow" -version = "10.2.0.20240125" +version = "10.2.0.20240406" description = "Typing stubs for Pillow" optional = false python-versions = ">=3.8" files = [ - {file = "types-Pillow-10.2.0.20240125.tar.gz", hash = "sha256:c449b2c43b9fdbe0494a7b950e6b39a4e50516091213fec24ef3f33c1d017717"}, - {file = "types_Pillow-10.2.0.20240125-py3-none-any.whl", hash = "sha256:322dbae32b4b7918da5e8a47c50ac0f24b0aa72a804a23857620f2722b03c858"}, + {file = "types-Pillow-10.2.0.20240406.tar.gz", hash = "sha256:62e0cc1f17caba40e72e7154a483f4c7f3bea0e1c34c0ebba9de3c7745bc306d"}, + {file = "types_Pillow-10.2.0.20240406-py3-none-any.whl", hash = "sha256:5ac182e8afce53de30abca2fdf9cbec7b2500e549d0be84da035a729a84c7c47"}, ] [[package]] From 13a3987929d88d333e79d4b700b2f18332c682d1 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Mon, 8 Apr 2024 17:54:18 +0100 Subject: [PATCH 13/13] Bump ruff from 0.3.2 to 0.3.5 (#17060) Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- poetry.lock | 38 +++++++++++++++++++------------------- pyproject.toml | 2 +- 2 files changed, 20 insertions(+), 20 deletions(-) diff --git a/poetry.lock b/poetry.lock index e9caed37a2..370d99704a 100644 --- a/poetry.lock +++ b/poetry.lock @@ -2444,28 +2444,28 @@ files = [ [[package]] name = "ruff" -version = "0.3.2" +version = "0.3.5" description = "An extremely fast Python linter and code formatter, written in Rust." optional = false python-versions = ">=3.7" files = [ - {file = "ruff-0.3.2-py3-none-macosx_10_12_x86_64.macosx_11_0_arm64.macosx_10_12_universal2.whl", hash = "sha256:77f2612752e25f730da7421ca5e3147b213dca4f9a0f7e0b534e9562c5441f01"}, - {file = "ruff-0.3.2-py3-none-macosx_10_12_x86_64.whl", hash = "sha256:9966b964b2dd1107797be9ca7195002b874424d1d5472097701ae8f43eadef5d"}, - {file = "ruff-0.3.2-py3-none-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:b83d17ff166aa0659d1e1deaf9f2f14cbe387293a906de09bc4860717eb2e2da"}, - {file = "ruff-0.3.2-py3-none-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:bb875c6cc87b3703aeda85f01c9aebdce3d217aeaca3c2e52e38077383f7268a"}, - {file = "ruff-0.3.2-py3-none-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:be75e468a6a86426430373d81c041b7605137a28f7014a72d2fc749e47f572aa"}, - {file = "ruff-0.3.2-py3-none-manylinux_2_17_ppc64.manylinux2014_ppc64.whl", hash = "sha256:967978ac2d4506255e2f52afe70dda023fc602b283e97685c8447d036863a302"}, - {file = "ruff-0.3.2-py3-none-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:1231eacd4510f73222940727ac927bc5d07667a86b0cbe822024dd00343e77e9"}, - {file = "ruff-0.3.2-py3-none-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:2c6d613b19e9a8021be2ee1d0e27710208d1603b56f47203d0abbde906929a9b"}, - {file = "ruff-0.3.2-py3-none-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:c8439338a6303585d27b66b4626cbde89bb3e50fa3cae86ce52c1db7449330a7"}, - {file = "ruff-0.3.2-py3-none-musllinux_1_2_aarch64.whl", hash = "sha256:de8b480d8379620cbb5ea466a9e53bb467d2fb07c7eca54a4aa8576483c35d36"}, - {file = "ruff-0.3.2-py3-none-musllinux_1_2_armv7l.whl", hash = "sha256:b74c3de9103bd35df2bb05d8b2899bf2dbe4efda6474ea9681280648ec4d237d"}, - {file = "ruff-0.3.2-py3-none-musllinux_1_2_i686.whl", hash = "sha256:f380be9fc15a99765c9cf316b40b9da1f6ad2ab9639e551703e581a5e6da6745"}, - {file = "ruff-0.3.2-py3-none-musllinux_1_2_x86_64.whl", hash = "sha256:0ac06a3759c3ab9ef86bbeca665d31ad3aa9a4b1c17684aadb7e61c10baa0df4"}, - {file = "ruff-0.3.2-py3-none-win32.whl", hash = "sha256:9bd640a8f7dd07a0b6901fcebccedadeb1a705a50350fb86b4003b805c81385a"}, - {file = "ruff-0.3.2-py3-none-win_amd64.whl", hash = "sha256:0c1bdd9920cab5707c26c8b3bf33a064a4ca7842d91a99ec0634fec68f9f4037"}, - {file = "ruff-0.3.2-py3-none-win_arm64.whl", hash = "sha256:5f65103b1d76e0d600cabd577b04179ff592064eaa451a70a81085930e907d0b"}, - {file = "ruff-0.3.2.tar.gz", hash = "sha256:fa78ec9418eb1ca3db392811df3376b46471ae93792a81af2d1cbb0e5dcb5142"}, + {file = "ruff-0.3.5-py3-none-macosx_10_12_x86_64.macosx_11_0_arm64.macosx_10_12_universal2.whl", hash = "sha256:aef5bd3b89e657007e1be6b16553c8813b221ff6d92c7526b7e0227450981eac"}, + {file = "ruff-0.3.5-py3-none-macosx_10_12_x86_64.whl", hash = "sha256:89b1e92b3bd9fca249153a97d23f29bed3992cff414b222fcd361d763fc53f12"}, + {file = "ruff-0.3.5-py3-none-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:5e55771559c89272c3ebab23326dc23e7f813e492052391fe7950c1a5a139d89"}, + {file = "ruff-0.3.5-py3-none-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:dabc62195bf54b8a7876add6e789caae0268f34582333cda340497c886111c39"}, + {file = "ruff-0.3.5-py3-none-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:3a05f3793ba25f194f395578579c546ca5d83e0195f992edc32e5907d142bfa3"}, + {file = "ruff-0.3.5-py3-none-manylinux_2_17_ppc64.manylinux2014_ppc64.whl", hash = "sha256:dfd3504e881082959b4160ab02f7a205f0fadc0a9619cc481982b6837b2fd4c0"}, + {file = "ruff-0.3.5-py3-none-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:87258e0d4b04046cf1d6cc1c56fadbf7a880cc3de1f7294938e923234cf9e498"}, + {file = "ruff-0.3.5-py3-none-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:712e71283fc7d9f95047ed5f793bc019b0b0a29849b14664a60fd66c23b96da1"}, + {file = "ruff-0.3.5-py3-none-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:a532a90b4a18d3f722c124c513ffb5e5eaff0cc4f6d3aa4bda38e691b8600c9f"}, + {file = "ruff-0.3.5-py3-none-musllinux_1_2_aarch64.whl", hash = "sha256:122de171a147c76ada00f76df533b54676f6e321e61bd8656ae54be326c10296"}, + {file = "ruff-0.3.5-py3-none-musllinux_1_2_armv7l.whl", hash = "sha256:d80a6b18a6c3b6ed25b71b05eba183f37d9bc8b16ace9e3d700997f00b74660b"}, + {file = "ruff-0.3.5-py3-none-musllinux_1_2_i686.whl", hash = "sha256:a7b6e63194c68bca8e71f81de30cfa6f58ff70393cf45aab4c20f158227d5936"}, + {file = "ruff-0.3.5-py3-none-musllinux_1_2_x86_64.whl", hash = "sha256:a759d33a20c72f2dfa54dae6e85e1225b8e302e8ac655773aff22e542a300985"}, + {file = "ruff-0.3.5-py3-none-win32.whl", hash = "sha256:9d8605aa990045517c911726d21293ef4baa64f87265896e491a05461cae078d"}, + {file = "ruff-0.3.5-py3-none-win_amd64.whl", hash = "sha256:dc56bb16a63c1303bd47563c60482a1512721053d93231cf7e9e1c6954395a0e"}, + {file = "ruff-0.3.5-py3-none-win_arm64.whl", hash = "sha256:faeeae9905446b975dcf6d4499dc93439b131f1443ee264055c5716dd947af55"}, + {file = "ruff-0.3.5.tar.gz", hash = "sha256:a067daaeb1dc2baf9b82a32dae67d154d95212080c80435eb052d95da647763d"}, ] [[package]] @@ -3451,4 +3451,4 @@ user-search = ["pyicu"] [metadata] lock-version = "2.0" python-versions = "^3.8.0" -content-hash = "b510fa05f4ea33194bec079f5d04ebb3f9ffbb5c1ea96a0341d57ba770ef81e6" +content-hash = "4abda113a01f162bb3978b0372956d569364533aa39f57863c234363f8449a4f" diff --git a/pyproject.toml b/pyproject.toml index 9a645079c3..fa87d19180 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -321,7 +321,7 @@ all = [ # This helps prevents merge conflicts when running a batch of dependabot updates. isort = ">=5.10.1" black = ">=22.7.0" -ruff = "0.3.2" +ruff = "0.3.5" # Type checking only works with the pydantic.v1 compat module from pydantic v2 pydantic = "^2"