Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Include thread information when sending receipts over federation. #14466

Merged
merged 13 commits into from
Nov 28, 2022
90 changes: 53 additions & 37 deletions synapse/federation/sender/per_destination_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -550,7 +550,7 @@ async def _catch_up_transmission_loop(self) -> None:
self._destination, last_successful_stream_ordering
)

def _get_rr_edus(self, force_flush: bool) -> Iterable[Edu]:
def _get_rr_edus(self, force_flush: bool, limit: int) -> Iterable[Edu]:
if not self._pending_rrs:
return
if not force_flush and not self._rrs_pending_flush:
Expand All @@ -564,7 +564,8 @@ def _get_rr_edus(self, force_flush: bool) -> Iterable[Edu]:
#
# This could be more efficient by bundling users who have sent receipts
# for different threads.
while self._pending_rrs:
generated_edus = 0
while self._pending_rrs and generated_edus < limit:
# The next EDU's content.
content: JsonDict = {}

Expand Down Expand Up @@ -592,6 +593,7 @@ def _get_rr_edus(self, force_flush: bool) -> Iterable[Edu]:
edu_type=EduTypes.RECEIPT,
content=content,
)
generated_edus += 1

self._pending_rrs = {}
clokep marked this conversation as resolved.
Show resolved Hide resolved
self._rrs_pending_flush = False
Expand Down Expand Up @@ -681,68 +683,80 @@ class _TransactionQueueManager:
async def __aenter__(self) -> Tuple[List[EventBase], List[Edu]]:
# First we calculate the EDUs we want to send, if any.

# We start by fetching device related EDUs, i.e device updates and to
# device messages. We have to keep 2 free slots for presence and rr_edus.
device_edu_limit = MAX_EDUS_PER_TRANSACTION - 2
# There's a maximum number of EDUs that can be sent with a transaction,
# generally device udates and to-device messages get priority, but we
clokep marked this conversation as resolved.
Show resolved Hide resolved
# want to ensure that there's room for some other EDUs as well.
#
# This is done by:
#
# * Add a presence EDU, if one exists.
# * Add up-to a small limit of read receipt EDUs.
# * Add to-device EDUs, but leave some space for device list updates.
# * Add device list updates EDUs.
# * If there's any remaining room, add other EDUs.
pending_edus = []

# Add presence EDU.
if self.queue._pending_presence:
pending_edus.append(
Edu(
origin=self.queue._server_name,
destination=self.queue._destination,
edu_type=EduTypes.PRESENCE,
content={
"push": [
format_user_presence_state(
presence, self.queue._clock.time_msec()
)
for presence in self.queue._pending_presence.values()
]
},
)
)
self.queue._pending_presence = {}

# We prioritize to-device messages so that existing encryption channels
# Add read receipt EDUs.
pending_edus.extend(self.queue._get_rr_edus(force_flush=False, limit=5))
edu_limit = MAX_EDUS_PER_TRANSACTION - len(pending_edus)

# Next, prioritize to-device messages so that existing encryption channels
# work. We also keep a few slots spare (by reducing the limit) so that
# we can still trickle out some device list updates.
(
to_device_edus,
device_stream_id,
) = await self.queue._get_to_device_message_edus(device_edu_limit - 10)
) = await self.queue._get_to_device_message_edus(edu_limit - 10)

if to_device_edus:
self._device_stream_id = device_stream_id
else:
self.queue._last_device_stream_id = device_stream_id

device_edu_limit -= len(to_device_edus)
pending_edus.extend(to_device_edus)
edu_limit -= len(to_device_edus)

# Add device list update EDUs.
device_update_edus, dev_list_id = await self.queue._get_device_update_edus(
device_edu_limit
edu_limit
)

if device_update_edus:
self._device_list_id = dev_list_id
else:
self.queue._last_device_list_stream_id = dev_list_id

pending_edus = device_update_edus + to_device_edus

# Now add the read receipt EDU.
pending_edus.extend(self.queue._get_rr_edus(force_flush=False))

# And presence EDU.
if self.queue._pending_presence:
pending_edus.append(
Edu(
origin=self.queue._server_name,
destination=self.queue._destination,
edu_type=EduTypes.PRESENCE,
content={
"push": [
format_user_presence_state(
presence, self.queue._clock.time_msec()
)
for presence in self.queue._pending_presence.values()
]
},
)
)
self.queue._pending_presence = {}
pending_edus.extend(device_update_edus)
edu_limit -= len(device_update_edus)

# Finally add any other types of EDUs if there is room.
pending_edus.extend(
self.queue._pop_pending_edus(MAX_EDUS_PER_TRANSACTION - len(pending_edus))
)
pending_edus.extend(self.queue._pop_pending_edus(edu_limit))
clokep marked this conversation as resolved.
Show resolved Hide resolved
while (
len(pending_edus) < MAX_EDUS_PER_TRANSACTION
clokep marked this conversation as resolved.
Show resolved Hide resolved
and self.queue._pending_edus_keyed
):
_, val = self.queue._pending_edus_keyed.popitem()
pending_edus.append(val)
edu_limit -= 1

# Now we look for any PDUs to send, by getting up to 50 PDUs from the
# queue
Expand All @@ -753,8 +767,10 @@ async def __aenter__(self) -> Tuple[List[EventBase], List[Edu]]:

# if we've decided to send a transaction anyway, and we have room, we
# may as well send any pending RRs
if len(pending_edus) < MAX_EDUS_PER_TRANSACTION:
pending_edus.extend(self.queue._get_rr_edus(force_flush=True))
if edu_limit:
pending_edus.extend(
self.queue._get_rr_edus(force_flush=True, limit=edu_limit)
)

if self._pdus:
self._last_stream_ordering = self._pdus[
Expand Down