From 82022709881e16c3c64cb12e00d7bfc8e44d9b30 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 12 Jan 2023 12:02:07 +0000 Subject: [PATCH] Send out POSITION commands for all streams --- synapse/replication/tcp/resource.py | 43 +++++++++++++---------------- 1 file changed, 19 insertions(+), 24 deletions(-) diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py index 99f09669f00b..9d17eff71451 100644 --- a/synapse/replication/tcp/resource.py +++ b/synapse/replication/tcp/resource.py @@ -199,33 +199,28 @@ async def _run_notifier_loop(self) -> None: # The token has advanced but there is no data to # send, so we send a `POSITION` to inform other # workers of the updated position. - if stream.NAME == EventsStream.NAME: - # XXX: We only do this for the EventStream as it - # turns out that e.g. account data streams share - # their "current token" with each other, meaning - # that it is *not* safe to send a POSITION. - - # Note: `last_token` may not *actually* be the - # last token we sent out in a RDATA or POSITION. - # This can happen if we sent out an RDATA for - # position X when our current token was say X+1. - # Other workers will see RDATA for X and then a - # POSITION with last token of X+1, which will - # cause them to check if there were any missing - # updates between X and X+1. - logger.info( - "Sending position: %s -> %s", + + # Note: `last_token` may not *actually* be the + # last token we sent out in a RDATA or POSITION. + # This can happen if we sent out an RDATA for + # position X when our current token was say X+1. + # Other workers will see RDATA for X and then a + # POSITION with last token of X+1, which will + # cause them to check if there were any missing + # updates between X and X+1. + logger.info( + "Sending position: %s -> %s", + stream.NAME, + current_token, + ) + self.command_handler.send_command( + PositionCommand( stream.NAME, + self._instance_name, + last_token, current_token, ) - self.command_handler.send_command( - PositionCommand( - stream.NAME, - self._instance_name, - last_token, - current_token, - ) - ) + ) continue # Some streams return multiple rows with the same stream IDs,