Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Fix race in federation sender that delayed device updates. (#6799)
Browse files Browse the repository at this point in the history
We were sending device updates down both the federation stream and
device streams. This mean there was a race if the federation sender
worker processed the federation stream first, as when the sender checked
if there were new device updates the slaved ID generator hadn't been
updated with the new stream IDs and so returned nothing.

This situation is correctly handled by events/receipts/etc by not
sending updates down the federation stream and instead having the
federation sender worker listen on the other streams and poke the
transaction queues as appropriate.
  • Loading branch information
erikjohnston authored Jan 29, 2020
1 parent 611215a commit 6b9e101
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 30 deletions.
1 change: 1 addition & 0 deletions changelog.d/6799.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix race in federation sender worker that delayed sending of device updates.
20 changes: 19 additions & 1 deletion synapse/app/federation_sender.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,11 @@
from synapse.replication.slave.storage.registration import SlavedRegistrationStore
from synapse.replication.slave.storage.transactions import SlavedTransactionStore
from synapse.replication.tcp.client import ReplicationClientHandler
from synapse.replication.tcp.streams._base import ReceiptsStream
from synapse.replication.tcp.streams._base import (
DeviceListsStream,
ReceiptsStream,
ToDeviceStream,
)
from synapse.server import HomeServer
from synapse.storage.database import Database
from synapse.types import ReadReceipt
Expand Down Expand Up @@ -256,6 +260,20 @@ def process_replication_rows(self, stream_name, token, rows):
"process_receipts_for_federation", self._on_new_receipts, rows
)

# ... as well as device updates and messages
elif stream_name == DeviceListsStream.NAME:
hosts = set(row.destination for row in rows)
for host in hosts:
self.federation_sender.send_device_messages(host)

elif stream_name == ToDeviceStream.NAME:
# The to_device stream includes stuff to be pushed to both local
# clients and remote servers, so we ignore entities that start with
# '@' (since they'll be local users rather than destinations).
hosts = set(row.entity for row in rows if not row.entity.startswith("@"))
for host in hosts:
self.federation_sender.send_device_messages(host)

@defer.inlineCallbacks
def _on_new_receipts(self, rows):
"""
Expand Down
32 changes: 3 additions & 29 deletions synapse/federation/send_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,6 @@ def __init__(self, hs):

self.edus = SortedDict() # stream position -> Edu

self.device_messages = SortedDict() # stream position -> destination

self.pos = 1
self.pos_time = SortedDict()

Expand All @@ -92,7 +90,6 @@ def register(name, queue):
"keyed_edu",
"keyed_edu_changed",
"edus",
"device_messages",
"pos_time",
"presence_destinations",
]:
Expand Down Expand Up @@ -171,12 +168,6 @@ def _clear_queue_before_pos(self, position_to_delete):
for key in keys[:i]:
del self.edus[key]

# Delete things out of device map
keys = self.device_messages.keys()
i = self.device_messages.bisect_left(position_to_delete)
for key in keys[:i]:
del self.device_messages[key]

def notify_new_events(self, current_id):
"""As per FederationSender"""
# We don't need to replicate this as it gets sent down a different
Expand Down Expand Up @@ -249,9 +240,8 @@ def send_presence_to_destinations(self, states, destinations):

def send_device_messages(self, destination):
"""As per FederationSender"""
pos = self._next_pos()
self.device_messages[pos] = destination
self.notifier.on_new_replication_data()
# We don't need to replicate this as it gets sent down a different
# stream.

def get_current_token(self):
return self.pos - 1
Expand Down Expand Up @@ -339,14 +329,6 @@ async def get_replication_rows(
for (pos, edu) in edus:
rows.append((pos, EduRow(edu)))

# Fetch changed device messages
i = self.device_messages.bisect_right(from_token)
j = self.device_messages.bisect_right(to_token) + 1
device_messages = {v: k for k, v in self.device_messages.items()[i:j]}

for (destination, pos) in iteritems(device_messages):
rows.append((pos, DeviceRow(destination=destination)))

# Sort rows based on pos
rows.sort()

Expand Down Expand Up @@ -504,7 +486,6 @@ def add_to_buffer(self, buff):
"presence_destinations", # list of tuples of UserPresenceState and destinations
"keyed_edus", # dict of destination -> { key -> Edu }
"edus", # dict of destination -> [Edu]
"device_destinations", # set of destinations
),
)

Expand All @@ -523,11 +504,7 @@ def process_rows_for_federation(transaction_queue, rows):
# them into the appropriate collection and then send them off.

buff = ParsedFederationStreamData(
presence=[],
presence_destinations=[],
keyed_edus={},
edus={},
device_destinations=set(),
presence=[], presence_destinations=[], keyed_edus={}, edus={},
)

# Parse the rows in the stream and add to the buffer
Expand Down Expand Up @@ -555,6 +532,3 @@ def process_rows_for_federation(transaction_queue, rows):
for destination, edu_list in iteritems(buff.edus):
for edu in edu_list:
transaction_queue.send_edu(edu, None)

for destination in buff.device_destinations:
transaction_queue.send_device_messages(destination)

0 comments on commit 6b9e101

Please sign in to comment.