Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Add a thread ID to receipts.
Browse files Browse the repository at this point in the history
  • Loading branch information
clokep committed Jul 11, 2022
1 parent 5bf4b4a commit 2b922b0
Show file tree
Hide file tree
Showing 13 changed files with 169 additions and 15 deletions.
1 change: 1 addition & 0 deletions changelog.d/13202.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Experimental support for thread-specific receipts ([MSC3771](https://github.com/matrix-org/matrix-spec-proposals/pull/3771)).
10 changes: 9 additions & 1 deletion synapse/handlers/receipts.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ async def _received_remote_receipt(self, origin: str, content: JsonDict) -> None
receipt_type=receipt_type,
user_id=user_id,
event_ids=user_values["event_ids"],
thread_id=None, # TODO
data=user_values.get("data", {}),
)
)
Expand All @@ -114,6 +115,7 @@ async def _handle_new_receipts(self, receipts: List[ReadReceipt]) -> bool:
receipt.receipt_type,
receipt.user_id,
receipt.event_ids,
receipt.thread_id,
receipt.data,
)

Expand Down Expand Up @@ -146,7 +148,12 @@ async def _handle_new_receipts(self, receipts: List[ReadReceipt]) -> bool:
return True

async def received_client_receipt(
self, room_id: str, receipt_type: str, user_id: str, event_id: str
self,
room_id: str,
receipt_type: str,
user_id: str,
event_id: str,
thread_id: Optional[str],
) -> None:
"""Called when a client tells us a local user has read up to the given
event_id in the room.
Expand All @@ -156,6 +163,7 @@ async def received_client_receipt(
receipt_type=receipt_type,
user_id=user_id,
event_ids=[event_id],
thread_id=thread_id,
data={"ts": int(self.clock.time_msec())},
)

Expand Down
3 changes: 2 additions & 1 deletion synapse/replication/tcp/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -408,7 +408,8 @@ async def _on_new_receipts(
receipt.receipt_type,
receipt.user_id,
[receipt.event_id],
receipt.data,
thread_id=None, # TODO
data=receipt.data,
)
await self.federation_sender.send_read_receipt(receipt_info)

Expand Down
1 change: 1 addition & 0 deletions synapse/rest/client/read_marker.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ async def on_POST(
receipt_type,
user_id=requester.user.to_string(),
event_id=event_id,
thread_id=None, # TODO
)

return 200, {}
Expand Down
13 changes: 10 additions & 3 deletions synapse/rest/client/receipts.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
# limitations under the License.

import logging
from typing import TYPE_CHECKING, Tuple
from typing import TYPE_CHECKING, Optional, Tuple

from synapse.api.constants import ReceiptTypes
from synapse.api.errors import SynapseError
Expand All @@ -34,7 +34,8 @@ class ReceiptRestServlet(RestServlet):
PATTERNS = client_patterns(
"/rooms/(?P<room_id>[^/]*)"
"/receipt/(?P<receipt_type>[^/]*)"
"/(?P<event_id>[^/]*)$"
"/(?P<event_id>[^/]*)"
"(/(?P<thread_id>[^/]*))?$"
)

def __init__(self, hs: "HomeServer"):
Expand All @@ -51,7 +52,12 @@ def __init__(self, hs: "HomeServer"):
)

async def on_POST(
self, request: SynapseRequest, room_id: str, receipt_type: str, event_id: str
self,
request: SynapseRequest,
room_id: str,
receipt_type: str,
event_id: str,
thread_id: Optional[str],
) -> Tuple[int, JsonDict]:
requester = await self.auth.get_user_by_req(request)

Expand All @@ -77,6 +83,7 @@ async def on_POST(
receipt_type,
user_id=requester.user.to_string(),
event_id=event_id,
thread_id=thread_id,
)

return 200, {}
Expand Down
26 changes: 21 additions & 5 deletions synapse/storage/databases/main/receipts.py
Original file line number Diff line number Diff line change
Expand Up @@ -613,6 +613,7 @@ def _insert_linearized_receipt_txn(
receipt_type: str,
user_id: str,
event_id: str,
thread_id: Optional[str],
data: JsonDict,
stream_id: int,
) -> Optional[int]:
Expand All @@ -636,15 +637,18 @@ def _insert_linearized_receipt_txn(
stream_ordering = int(res["stream_ordering"]) if res else None
rx_ts = res["received_ts"] if res else 0

# Convert None to a blank string.
thread_id = thread_id or ""

# We don't want to clobber receipts for more recent events, so we
# have to compare orderings of existing receipts
if stream_ordering is not None:
sql = (
"SELECT stream_ordering, event_id FROM events"
" INNER JOIN receipts_linearized AS r USING (event_id, room_id)"
" WHERE r.room_id = ? AND r.receipt_type = ? AND r.user_id = ?"
" INNER JOIN receipts_linearized as r USING (event_id, room_id)"
" WHERE r.room_id = ? AND r.receipt_type = ? AND r.user_id = ? AND r.thread_id = ?"
)
txn.execute(sql, (room_id, receipt_type, user_id))
txn.execute(sql, (room_id, receipt_type, user_id, thread_id))

for so, eid in txn:
if int(so) >= stream_ordering:
Expand All @@ -656,6 +660,7 @@ def _insert_linearized_receipt_txn(
)
return None

# TODO
txn.call_after(
self.invalidate_caches_for_receipt, room_id, receipt_type, user_id
)
Expand All @@ -671,14 +676,15 @@ def _insert_linearized_receipt_txn(
"room_id": room_id,
"receipt_type": receipt_type,
"user_id": user_id,
"thread_id": thread_id,
},
values={
"stream_id": stream_id,
"event_id": event_id,
"data": json_encoder.encode(data),
},
# receipts_linearized has a unique constraint on
# (user_id, room_id, receipt_type), so no need to lock
# (user_id, room_id, receipt_type, key), so no need to lock
lock=False,
)

Expand Down Expand Up @@ -728,6 +734,7 @@ async def insert_receipt(
receipt_type: str,
user_id: str,
event_ids: List[str],
thread_id: Optional[str],
data: dict,
) -> Optional[Tuple[int, int]]:
"""Insert a receipt, either from local client or remote server.
Expand Down Expand Up @@ -760,6 +767,7 @@ async def insert_receipt(
receipt_type,
user_id,
linearized_event_id,
thread_id,
data,
stream_id=stream_id,
# Read committed is actually beneficial here because we check for a receipt with
Expand All @@ -774,7 +782,8 @@ async def insert_receipt(

now = self._clock.time_msec()
logger.debug(
"RR for event %s in %s (%i ms old)",
"Receipt %s for event %s in %s (%i ms old)",
receipt_type,
linearized_event_id,
room_id,
now - event_ts,
Expand All @@ -787,6 +796,7 @@ async def insert_receipt(
receipt_type,
user_id,
event_ids,
thread_id,
data,
)

Expand All @@ -801,6 +811,7 @@ def _insert_graph_receipt_txn(
receipt_type: str,
user_id: str,
event_ids: List[str],
thread_id: Optional[str],
data: JsonDict,
) -> None:
assert self._can_write_to_receipts
Expand All @@ -812,13 +823,17 @@ def _insert_graph_receipt_txn(
# FIXME: This shouldn't invalidate the whole cache
txn.call_after(self._get_linearized_receipts_for_room.invalidate, (room_id,))

# Convert None to a blank string.
thread_id = thread_id or ""

self.db_pool.simple_delete_txn(
txn,
table="receipts_graph",
keyvalues={
"room_id": room_id,
"receipt_type": receipt_type,
"user_id": user_id,
"thread_id": thread_id,
},
)
self.db_pool.simple_insert_txn(
Expand All @@ -829,6 +844,7 @@ def _insert_graph_receipt_txn(
"receipt_type": receipt_type,
"user_id": user_id,
"event_ids": json_encoder.encode(event_ids),
"thread_id": thread_id,
"data": json_encoder.encode(data),
},
)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/* Copyright 2022 The Matrix.org Foundation C.I.C
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

-- Allow multiple receipts per user per room via a nullable thread_id column.
ALTER TABLE receipts_linearized ADD COLUMN thread_id TEXT NOT NULL DEFAULT '';
ALTER TABLE receipts_graph ADD COLUMN thread_id TEXT NOT NULL DEFAULT '';

-- Rebuild the unique constraint with the thread_id.
ALTER TABLE receipts_linearized DROP CONSTRAINT IF EXISTS receipts_linearized_uniqueness;
ALTER TABLE receipts_linearized ADD CONSTRAINT receipts_linearized_uniqueness UNIQUE (room_id, receipt_type, user_id, thread_id);

ALTER TABLE receipts_graph DROP CONSTRAINT IF EXISTS receipts_graph_uniqueness;
ALTER TABLE receipts_graph ADD CONSTRAINT receipts_graph_uniqueness UNIQUE (room_id, receipt_type, user_id, thread_id);
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/* Copyright 2022 The Matrix.org Foundation C.I.C
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

-- Allow multiple receipts per user per room via a nullable thread_id column.
--
-- SQLite doesn't support modifying constraints to an existing table, so it must
-- be recreated.

-- Create the new tables.
CREATE TABLE receipts_graph_new (
room_id TEXT NOT NULL,
receipt_type TEXT NOT NULL,
user_id TEXT NOT NULL,
event_ids TEXT NOT NULL,
thread_id TEXT NOT NULL DEFAULT '',
data TEXT NOT NULL,
CONSTRAINT receipts_graph_uniqueness UNIQUE (room_id, receipt_type, user_id, thread_id)
);

CREATE TABLE receipts_linearized_new (
stream_id BIGINT NOT NULL,
room_id TEXT NOT NULL,
receipt_type TEXT NOT NULL,
user_id TEXT NOT NULL,
event_id TEXT NOT NULL,
thread_id TEXT NOT NULL DEFAULT '',
data TEXT NOT NULL,
CONSTRAINT receipts_linearized_uniqueness UNIQUE (room_id, receipt_type, user_id, thread_id)
);

-- Drop the old indexes.
DROP INDEX IF EXISTS receipts_linearized_id;
DROP INDEX IF EXISTS receipts_linearized_room_stream;
DROP INDEX IF EXISTS receipts_linearized_user;

-- Copy the data.
INSERT INTO receipts_graph_new (room_id, receipt_type, user_id, event_ids, data)
SELECT room_id, receipt_type, user_id, event_ids, data
FROM receipts_graph;
INSERT INTO receipts_linearized_new (stream_id, room_id, receipt_type, user_id, event_id, data)
SELECT stream_id, room_id, receipt_type, user_id, event_id, data
FROM receipts_linearized;

-- Drop the old tables.
DROP TABLE receipts_graph;
DROP TABLE receipts_linearized;

-- Rename the tables.
ALTER TABLE receipts_graph_new RENAME TO receipts_graph;
ALTER TABLE receipts_linearized_new RENAME TO receipts_linearized;

-- Create the indices.
CREATE INDEX receipts_linearized_id ON receipts_linearized( stream_id );
CREATE INDEX receipts_linearized_room_stream ON receipts_linearized( room_id, stream_id );
CREATE INDEX receipts_linearized_user ON receipts_linearized( user_id );
1 change: 1 addition & 0 deletions synapse/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -830,6 +830,7 @@ class ReadReceipt:
receipt_type: str
user_id: str
event_ids: List[str]
thread_id: Optional[str]
data: JsonDict


Expand Down
21 changes: 18 additions & 3 deletions tests/federation/test_federation_sender.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,12 @@ def test_send_receipts(self):

sender = self.hs.get_federation_sender()
receipt = ReadReceipt(
"room_id", "m.read", "user_id", ["event_id"], {"ts": 1234}
"room_id",
"m.read",
"user_id",
["event_id"],
thread_id=None,
data={"ts": 1234},
)
self.successResultOf(defer.ensureDeferred(sender.send_read_receipt(receipt)))

Expand Down Expand Up @@ -89,7 +94,12 @@ def test_send_receipts_with_backoff(self):

sender = self.hs.get_federation_sender()
receipt = ReadReceipt(
"room_id", "m.read", "user_id", ["event_id"], {"ts": 1234}
"room_id",
"m.read",
"user_id",
["event_id"],
thread_id=None,
data={"ts": 1234},
)
self.successResultOf(defer.ensureDeferred(sender.send_read_receipt(receipt)))

Expand Down Expand Up @@ -121,7 +131,12 @@ def test_send_receipts_with_backoff(self):

# send the second RR
receipt = ReadReceipt(
"room_id", "m.read", "user_id", ["other_id"], {"ts": 1234}
"room_id",
"m.read",
"user_id",
["other_id"],
thread_id=None,
data={"ts": 1234},
)
self.successResultOf(defer.ensureDeferred(sender.send_read_receipt(receipt)))
self.pump()
Expand Down
1 change: 1 addition & 0 deletions tests/handlers/test_appservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -447,6 +447,7 @@ def test_sending_read_receipt_batches_to_application_services(self):
receipt_type="m.read",
user_id=self.local_user,
event_ids=[f"$eventid_{i}"],
thread_id=None,
data={},
)
)
Expand Down
Loading

0 comments on commit 2b922b0

Please sign in to comment.