Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Send the thread ID over replication.
Browse files Browse the repository at this point in the history
  • Loading branch information
clokep committed Aug 3, 2022
1 parent d42e5a1 commit fa237ad
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 6 deletions.
2 changes: 1 addition & 1 deletion synapse/replication/tcp/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,7 @@ async def _on_new_receipts(
receipt.receipt_type,
receipt.user_id,
[receipt.event_id],
thread_id=None, # TODO
thread_id=receipt.thread_id,
data=receipt.data,
)
await self.federation_sender.send_read_receipt(receipt_info)
Expand Down
1 change: 1 addition & 0 deletions synapse/replication/tcp/streams/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -361,6 +361,7 @@ class ReceiptsStreamRow:
receipt_type: str
user_id: str
event_id: str
thread_id: Optional[str]
data: dict

NAME = "receipts"
Expand Down
12 changes: 7 additions & 5 deletions synapse/storage/databases/main/receipts.py
Original file line number Diff line number Diff line change
Expand Up @@ -526,7 +526,7 @@ def _get_users_sent_receipts_between_txn(txn: LoggingTransaction) -> List[str]:

async def get_all_updated_receipts(
self, instance_name: str, last_id: int, current_id: int, limit: int
) -> Tuple[List[Tuple[int, list]], int, bool]:
) -> Tuple[List[Tuple[int, Tuple[str, str, str, str, str, JsonDict]]], int, bool]:
"""Get updates for receipts replication stream.
Args:
Expand All @@ -553,9 +553,11 @@ async def get_all_updated_receipts(

def get_all_updated_receipts_txn(
txn: LoggingTransaction,
) -> Tuple[List[Tuple[int, list]], int, bool]:
) -> Tuple[
List[Tuple[int, Tuple[str, str, str, str, str, JsonDict]]], int, bool
]:
sql = """
SELECT stream_id, room_id, receipt_type, user_id, event_id, data
SELECT stream_id, room_id, receipt_type, user_id, event_id, thread_id, data
FROM receipts_linearized
WHERE ? < stream_id AND stream_id <= ?
ORDER BY stream_id ASC
Expand All @@ -564,8 +566,8 @@ def get_all_updated_receipts_txn(
txn.execute(sql, (last_id, current_id, limit))

updates = cast(
List[Tuple[int, list]],
[(r[0], r[1:5] + (db_to_json(r[5]),)) for r in txn],
List[Tuple[int, Tuple[str, str, str, str, str, JsonDict]]],
[(r[0], r[1:6] + (db_to_json(r[6]),)) for r in txn],
)

limited = False
Expand Down

0 comments on commit fa237ad

Please sign in to comment.