Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Wait for streams to catch up when processing HTTP replication. #14820

Merged
merged 10 commits into from
Jan 18, 2023
43 changes: 19 additions & 24 deletions synapse/replication/tcp/resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,33 +199,28 @@ async def _run_notifier_loop(self) -> None:
# The token has advanced but there is no data to
# send, so we send a `POSITION` to inform other
# workers of the updated position.
if stream.NAME == EventsStream.NAME:
# XXX: We only do this for the EventStream as it
# turns out that e.g. account data streams share
# their "current token" with each other, meaning
# that it is *not* safe to send a POSITION.
DMRobertson marked this conversation as resolved.
Show resolved Hide resolved

# Note: `last_token` may not *actually* be the
# last token we sent out in a RDATA or POSITION.
# This can happen if we sent out an RDATA for
# position X when our current token was say X+1.
# Other workers will see RDATA for X and then a
# POSITION with last token of X+1, which will
# cause them to check if there were any missing
# updates between X and X+1.
logger.info(
"Sending position: %s -> %s",

# Note: `last_token` may not *actually* be the
# last token we sent out in a RDATA or POSITION.
# This can happen if we sent out an RDATA for
# position X when our current token was say X+1.
# Other workers will see RDATA for X and then a
# POSITION with last token of X+1, which will
# cause them to check if there were any missing
# updates between X and X+1.
logger.info(
"Sending position: %s -> %s",
stream.NAME,
current_token,
)
self.command_handler.send_command(
PositionCommand(
stream.NAME,
self._instance_name,
last_token,
current_token,
)
self.command_handler.send_command(
PositionCommand(
stream.NAME,
self._instance_name,
last_token,
current_token,
)
)
)
continue

# Some streams return multiple rows with the same stream IDs,
Expand Down