diff --git a/changelog.d/13202.feature b/changelog.d/13202.feature new file mode 100644 index 000000000000..d0cb902dffd0 --- /dev/null +++ b/changelog.d/13202.feature @@ -0,0 +1 @@ +Experimental support for thread-specific receipts ([MSC3771](https://github.com/matrix-org/matrix-spec-proposals/pull/3771)). diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py index 43d2882b0aa2..45f0983c6b0a 100644 --- a/synapse/handlers/receipts.py +++ b/synapse/handlers/receipts.py @@ -97,6 +97,7 @@ async def _received_remote_receipt(self, origin: str, content: JsonDict) -> None receipt_type=receipt_type, user_id=user_id, event_ids=user_values["event_ids"], + thread_id=None, # TODO data=user_values.get("data", {}), ) ) @@ -114,6 +115,7 @@ async def _handle_new_receipts(self, receipts: List[ReadReceipt]) -> bool: receipt.receipt_type, receipt.user_id, receipt.event_ids, + receipt.thread_id, receipt.data, ) @@ -146,7 +148,12 @@ async def _handle_new_receipts(self, receipts: List[ReadReceipt]) -> bool: return True async def received_client_receipt( - self, room_id: str, receipt_type: str, user_id: str, event_id: str + self, + room_id: str, + receipt_type: str, + user_id: str, + event_id: str, + thread_id: Optional[str], ) -> None: """Called when a client tells us a local user has read up to the given event_id in the room. @@ -156,6 +163,7 @@ async def received_client_receipt( receipt_type=receipt_type, user_id=user_id, event_ids=[event_id], + thread_id=thread_id, data={"ts": int(self.clock.time_msec())}, ) diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py index 2f59245058e7..977538741392 100644 --- a/synapse/replication/tcp/client.py +++ b/synapse/replication/tcp/client.py @@ -408,7 +408,8 @@ async def _on_new_receipts( receipt.receipt_type, receipt.user_id, [receipt.event_id], - receipt.data, + thread_id=None, # TODO + data=receipt.data, ) await self.federation_sender.send_read_receipt(receipt_info) diff --git a/synapse/rest/client/read_marker.py b/synapse/rest/client/read_marker.py index 8896f2df50c3..d170cf1560d8 100644 --- a/synapse/rest/client/read_marker.py +++ b/synapse/rest/client/read_marker.py @@ -81,6 +81,7 @@ async def on_POST( receipt_type, user_id=requester.user.to_string(), event_id=event_id, + thread_id=None, # TODO ) return 200, {} diff --git a/synapse/rest/client/receipts.py b/synapse/rest/client/receipts.py index 409bfd43c11c..3e4aea065a56 100644 --- a/synapse/rest/client/receipts.py +++ b/synapse/rest/client/receipts.py @@ -13,7 +13,7 @@ # limitations under the License. import logging -from typing import TYPE_CHECKING, Tuple +from typing import TYPE_CHECKING, Optional, Tuple from synapse.api.constants import ReceiptTypes from synapse.api.errors import SynapseError @@ -34,7 +34,8 @@ class ReceiptRestServlet(RestServlet): PATTERNS = client_patterns( "/rooms/(?P[^/]*)" "/receipt/(?P[^/]*)" - "/(?P[^/]*)$" + "/(?P[^/]*)" + "(/(?P[^/]*))?$" ) def __init__(self, hs: "HomeServer"): @@ -51,7 +52,12 @@ def __init__(self, hs: "HomeServer"): ) async def on_POST( - self, request: SynapseRequest, room_id: str, receipt_type: str, event_id: str + self, + request: SynapseRequest, + room_id: str, + receipt_type: str, + event_id: str, + thread_id: Optional[str], ) -> Tuple[int, JsonDict]: requester = await self.auth.get_user_by_req(request) @@ -77,6 +83,7 @@ async def on_POST( receipt_type, user_id=requester.user.to_string(), event_id=event_id, + thread_id=thread_id, ) return 200, {} diff --git a/synapse/storage/databases/main/receipts.py b/synapse/storage/databases/main/receipts.py index 0090c9f22512..757d3ceb4fa5 100644 --- a/synapse/storage/databases/main/receipts.py +++ b/synapse/storage/databases/main/receipts.py @@ -613,6 +613,7 @@ def _insert_linearized_receipt_txn( receipt_type: str, user_id: str, event_id: str, + thread_id: Optional[str], data: JsonDict, stream_id: int, ) -> Optional[int]: @@ -636,15 +637,18 @@ def _insert_linearized_receipt_txn( stream_ordering = int(res["stream_ordering"]) if res else None rx_ts = res["received_ts"] if res else 0 + # Convert None to a blank string. + thread_id = thread_id or "" + # We don't want to clobber receipts for more recent events, so we # have to compare orderings of existing receipts if stream_ordering is not None: sql = ( "SELECT stream_ordering, event_id FROM events" - " INNER JOIN receipts_linearized AS r USING (event_id, room_id)" - " WHERE r.room_id = ? AND r.receipt_type = ? AND r.user_id = ?" + " INNER JOIN receipts_linearized as r USING (event_id, room_id)" + " WHERE r.room_id = ? AND r.receipt_type = ? AND r.user_id = ? AND r.thread_id = ?" ) - txn.execute(sql, (room_id, receipt_type, user_id)) + txn.execute(sql, (room_id, receipt_type, user_id, thread_id)) for so, eid in txn: if int(so) >= stream_ordering: @@ -656,6 +660,7 @@ def _insert_linearized_receipt_txn( ) return None + # TODO txn.call_after( self.invalidate_caches_for_receipt, room_id, receipt_type, user_id ) @@ -671,6 +676,7 @@ def _insert_linearized_receipt_txn( "room_id": room_id, "receipt_type": receipt_type, "user_id": user_id, + "thread_id": thread_id, }, values={ "stream_id": stream_id, @@ -678,7 +684,7 @@ def _insert_linearized_receipt_txn( "data": json_encoder.encode(data), }, # receipts_linearized has a unique constraint on - # (user_id, room_id, receipt_type), so no need to lock + # (user_id, room_id, receipt_type, key), so no need to lock lock=False, ) @@ -728,6 +734,7 @@ async def insert_receipt( receipt_type: str, user_id: str, event_ids: List[str], + thread_id: Optional[str], data: dict, ) -> Optional[Tuple[int, int]]: """Insert a receipt, either from local client or remote server. @@ -760,6 +767,7 @@ async def insert_receipt( receipt_type, user_id, linearized_event_id, + thread_id, data, stream_id=stream_id, # Read committed is actually beneficial here because we check for a receipt with @@ -774,7 +782,8 @@ async def insert_receipt( now = self._clock.time_msec() logger.debug( - "RR for event %s in %s (%i ms old)", + "Receipt %s for event %s in %s (%i ms old)", + receipt_type, linearized_event_id, room_id, now - event_ts, @@ -787,6 +796,7 @@ async def insert_receipt( receipt_type, user_id, event_ids, + thread_id, data, ) @@ -801,6 +811,7 @@ def _insert_graph_receipt_txn( receipt_type: str, user_id: str, event_ids: List[str], + thread_id: Optional[str], data: JsonDict, ) -> None: assert self._can_write_to_receipts @@ -812,6 +823,9 @@ def _insert_graph_receipt_txn( # FIXME: This shouldn't invalidate the whole cache txn.call_after(self._get_linearized_receipts_for_room.invalidate, (room_id,)) + # Convert None to a blank string. + thread_id = thread_id or "" + self.db_pool.simple_delete_txn( txn, table="receipts_graph", @@ -819,6 +833,7 @@ def _insert_graph_receipt_txn( "room_id": room_id, "receipt_type": receipt_type, "user_id": user_id, + "thread_id": thread_id, }, ) self.db_pool.simple_insert_txn( @@ -829,6 +844,7 @@ def _insert_graph_receipt_txn( "receipt_type": receipt_type, "user_id": user_id, "event_ids": json_encoder.encode(event_ids), + "thread_id": thread_id, "data": json_encoder.encode(data), }, ) diff --git a/synapse/storage/schema/main/delta/72/03_add_receipts_key.sql.postgres b/synapse/storage/schema/main/delta/72/03_add_receipts_key.sql.postgres new file mode 100644 index 000000000000..e26241fa89b6 --- /dev/null +++ b/synapse/storage/schema/main/delta/72/03_add_receipts_key.sql.postgres @@ -0,0 +1,25 @@ +/* Copyright 2022 The Matrix.org Foundation C.I.C + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +-- Allow multiple receipts per user per room via a nullable thread_id column. +ALTER TABLE receipts_linearized ADD COLUMN thread_id TEXT NOT NULL DEFAULT ''; +ALTER TABLE receipts_graph ADD COLUMN thread_id TEXT NOT NULL DEFAULT ''; + +-- Rebuild the unique constraint with the thread_id. +ALTER TABLE receipts_linearized DROP CONSTRAINT IF EXISTS receipts_linearized_uniqueness; +ALTER TABLE receipts_linearized ADD CONSTRAINT receipts_linearized_uniqueness UNIQUE (room_id, receipt_type, user_id, thread_id); + +ALTER TABLE receipts_graph DROP CONSTRAINT IF EXISTS receipts_graph_uniqueness; +ALTER TABLE receipts_graph ADD CONSTRAINT receipts_graph_uniqueness UNIQUE (room_id, receipt_type, user_id, thread_id); diff --git a/synapse/storage/schema/main/delta/72/03_add_receipts_key.sql.sqlite b/synapse/storage/schema/main/delta/72/03_add_receipts_key.sql.sqlite new file mode 100644 index 000000000000..914cff50b0a9 --- /dev/null +++ b/synapse/storage/schema/main/delta/72/03_add_receipts_key.sql.sqlite @@ -0,0 +1,67 @@ +/* Copyright 2022 The Matrix.org Foundation C.I.C + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +-- Allow multiple receipts per user per room via a nullable thread_id column. +-- +-- SQLite doesn't support modifying constraints to an existing table, so it must +-- be recreated. + +-- Create the new tables. +CREATE TABLE receipts_graph_new ( + room_id TEXT NOT NULL, + receipt_type TEXT NOT NULL, + user_id TEXT NOT NULL, + event_ids TEXT NOT NULL, + thread_id TEXT NOT NULL DEFAULT '', + data TEXT NOT NULL, + CONSTRAINT receipts_graph_uniqueness UNIQUE (room_id, receipt_type, user_id, thread_id) +); + +CREATE TABLE receipts_linearized_new ( + stream_id BIGINT NOT NULL, + room_id TEXT NOT NULL, + receipt_type TEXT NOT NULL, + user_id TEXT NOT NULL, + event_id TEXT NOT NULL, + thread_id TEXT NOT NULL DEFAULT '', + data TEXT NOT NULL, + CONSTRAINT receipts_linearized_uniqueness UNIQUE (room_id, receipt_type, user_id, thread_id) +); + +-- Drop the old indexes. +DROP INDEX IF EXISTS receipts_linearized_id; +DROP INDEX IF EXISTS receipts_linearized_room_stream; +DROP INDEX IF EXISTS receipts_linearized_user; + +-- Copy the data. +INSERT INTO receipts_graph_new (room_id, receipt_type, user_id, event_ids, data) + SELECT room_id, receipt_type, user_id, event_ids, data + FROM receipts_graph; +INSERT INTO receipts_linearized_new (stream_id, room_id, receipt_type, user_id, event_id, data) + SELECT stream_id, room_id, receipt_type, user_id, event_id, data + FROM receipts_linearized; + +-- Drop the old tables. +DROP TABLE receipts_graph; +DROP TABLE receipts_linearized; + +-- Rename the tables. +ALTER TABLE receipts_graph_new RENAME TO receipts_graph; +ALTER TABLE receipts_linearized_new RENAME TO receipts_linearized; + +-- Create the indices. +CREATE INDEX receipts_linearized_id ON receipts_linearized( stream_id ); +CREATE INDEX receipts_linearized_room_stream ON receipts_linearized( room_id, stream_id ); +CREATE INDEX receipts_linearized_user ON receipts_linearized( user_id ); diff --git a/synapse/types.py b/synapse/types.py index 668d48d646ae..7ad483c17aa9 100644 --- a/synapse/types.py +++ b/synapse/types.py @@ -830,6 +830,7 @@ class ReadReceipt: receipt_type: str user_id: str event_ids: List[str] + thread_id: Optional[str] data: JsonDict diff --git a/tests/federation/test_federation_sender.py b/tests/federation/test_federation_sender.py index 01a1db611538..820f6c904da6 100644 --- a/tests/federation/test_federation_sender.py +++ b/tests/federation/test_federation_sender.py @@ -49,7 +49,12 @@ def test_send_receipts(self): sender = self.hs.get_federation_sender() receipt = ReadReceipt( - "room_id", "m.read", "user_id", ["event_id"], {"ts": 1234} + "room_id", + "m.read", + "user_id", + ["event_id"], + thread_id=None, + data={"ts": 1234}, ) self.successResultOf(defer.ensureDeferred(sender.send_read_receipt(receipt))) @@ -89,7 +94,12 @@ def test_send_receipts_with_backoff(self): sender = self.hs.get_federation_sender() receipt = ReadReceipt( - "room_id", "m.read", "user_id", ["event_id"], {"ts": 1234} + "room_id", + "m.read", + "user_id", + ["event_id"], + thread_id=None, + data={"ts": 1234}, ) self.successResultOf(defer.ensureDeferred(sender.send_read_receipt(receipt))) @@ -121,7 +131,12 @@ def test_send_receipts_with_backoff(self): # send the second RR receipt = ReadReceipt( - "room_id", "m.read", "user_id", ["other_id"], {"ts": 1234} + "room_id", + "m.read", + "user_id", + ["other_id"], + thread_id=None, + data={"ts": 1234}, ) self.successResultOf(defer.ensureDeferred(sender.send_read_receipt(receipt))) self.pump() diff --git a/tests/handlers/test_appservice.py b/tests/handlers/test_appservice.py index d96d5aa1385e..fae35896c9ee 100644 --- a/tests/handlers/test_appservice.py +++ b/tests/handlers/test_appservice.py @@ -447,6 +447,7 @@ def test_sending_read_receipt_batches_to_application_services(self): receipt_type="m.read", user_id=self.local_user, event_ids=[f"$eventid_{i}"], + thread_id=None, data={}, ) ) diff --git a/tests/replication/tcp/streams/test_receipts.py b/tests/replication/tcp/streams/test_receipts.py index eb0011784518..ff4ff4d5f080 100644 --- a/tests/replication/tcp/streams/test_receipts.py +++ b/tests/replication/tcp/streams/test_receipts.py @@ -33,7 +33,12 @@ def test_receipt(self): # tell the master to send a new receipt self.get_success( self.hs.get_datastores().main.insert_receipt( - "!room:blue", "m.read", USER_ID, ["$event:blue"], {"a": 1} + "!room:blue", + "m.read", + USER_ID, + ["$event:blue"], + thread_id=None, + data={"a": 1}, ) ) self.replicate() @@ -57,7 +62,12 @@ def test_receipt(self): self.get_success( self.hs.get_datastores().main.insert_receipt( - "!room2:blue", "m.read", USER_ID, ["$event2:foo"], {"a": 2} + "!room2:blue", + "m.read", + USER_ID, + ["$event2:foo"], + thread_id=None, + data={"a": 2}, ) ) self.replicate() diff --git a/tests/storage/test_event_push_actions.py b/tests/storage/test_event_push_actions.py index e8c53f16d9c5..6c34a561623e 100644 --- a/tests/storage/test_event_push_actions.py +++ b/tests/storage/test_event_push_actions.py @@ -141,6 +141,7 @@ def _mark_read(stream: int, depth: int) -> None: "m.read", user_id=user_id, event_ids=[f"$test{stream}:example.com"], + thread_id=None, data={}, ) )