diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py index 1b2412405ca5..93d53d68db0e 100644 --- a/synapse/replication/tcp/client.py +++ b/synapse/replication/tcp/client.py @@ -423,7 +423,7 @@ async def _on_new_receipts( receipt.receipt_type, receipt.user_id, [receipt.event_id], - thread_id=None, # TODO + thread_id=receipt.thread_id, data=receipt.data, ) await self.federation_sender.send_read_receipt(receipt_info) diff --git a/synapse/replication/tcp/streams/_base.py b/synapse/replication/tcp/streams/_base.py index 398bebeaa659..e01155ad597b 100644 --- a/synapse/replication/tcp/streams/_base.py +++ b/synapse/replication/tcp/streams/_base.py @@ -361,6 +361,7 @@ class ReceiptsStreamRow: receipt_type: str user_id: str event_id: str + thread_id: Optional[str] data: dict NAME = "receipts" diff --git a/synapse/storage/databases/main/receipts.py b/synapse/storage/databases/main/receipts.py index def2e2d21c40..683ee5e17373 100644 --- a/synapse/storage/databases/main/receipts.py +++ b/synapse/storage/databases/main/receipts.py @@ -526,7 +526,7 @@ def _get_users_sent_receipts_between_txn(txn: LoggingTransaction) -> List[str]: async def get_all_updated_receipts( self, instance_name: str, last_id: int, current_id: int, limit: int - ) -> Tuple[List[Tuple[int, list]], int, bool]: + ) -> Tuple[List[Tuple[int, Tuple[str, str, str, str, str, JsonDict]]], int, bool]: """Get updates for receipts replication stream. Args: @@ -553,9 +553,11 @@ async def get_all_updated_receipts( def get_all_updated_receipts_txn( txn: LoggingTransaction, - ) -> Tuple[List[Tuple[int, list]], int, bool]: + ) -> Tuple[ + List[Tuple[int, Tuple[str, str, str, str, str, JsonDict]]], int, bool + ]: sql = """ - SELECT stream_id, room_id, receipt_type, user_id, event_id, data + SELECT stream_id, room_id, receipt_type, user_id, event_id, thread_id, data FROM receipts_linearized WHERE ? < stream_id AND stream_id <= ? ORDER BY stream_id ASC @@ -564,8 +566,8 @@ def get_all_updated_receipts_txn( txn.execute(sql, (last_id, current_id, limit)) updates = cast( - List[Tuple[int, list]], - [(r[0], r[1:5] + (db_to_json(r[5]),)) for r in txn], + List[Tuple[int, Tuple[str, str, str, str, str, JsonDict]]], + [(r[0], r[1:6] + (db_to_json(r[6]),)) for r in txn], ) limited = False