Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Move client command handling out of TCP protocol #7185

Merged
merged 21 commits into from
Apr 6, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 28 additions & 21 deletions synapse/replication/tcp/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
UserSyncCommand,
)
from synapse.replication.tcp.streams import STREAMS_MAP, Stream
from synapse.util.async_helpers import Linearizer

logger = logging.getLogger(__name__)

Expand All @@ -60,6 +61,8 @@ def __init__(self, hs):
stream.NAME: stream(hs) for stream in STREAMS_MAP.values()
} # type: Dict[str, Stream]

self._position_linearizer = Linearizer("replication_position")

# Map of stream to batched updates. See RdataCommand for info on how
# batching works.
self.pending_batches = {} # type: Dict[str, List[Any]]
Expand Down Expand Up @@ -122,31 +125,35 @@ async def on_POSITION(self, cmd: PositionCommand):
# to stop RDATA being handled at the same time.
self.streams_connecting.add(cmd.stream_name)

# Find where we previously streamed up to.
current_token = self.replication_data_handler.get_streams_to_replicate().get(
cmd.stream_name
)
if current_token is None:
logger.warning(
"Got POSITION for stream we're not subscribed to: %s", cmd.stream_name
)
return

# Fetch all updates between then and now.
limited = True
while limited:
updates, current_token, limited = await stream.get_updates_since(
current_token, cmd.token
# We protect catching up with a linearizer in case the replicaiton
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

replicaiton

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

replicaiton

# connection reconnects under us.
with await self._position_linearizer.queue(cmd.stream_name):
# Find where we previously streamed up to.
current_token = self.replication_data_handler.get_streams_to_replicate().get(
cmd.stream_name
)
if updates:
await self.on_rdata(
if current_token is None:
logger.warning(
"Got POSITION for stream we're not subscribed to: %s",
cmd.stream_name,
current_token,
[stream.parse_row(update[1]) for update in updates],
)
return

# We've now caught up to position sent to us, notify handler.
await self.replication_data_handler.on_position(cmd.stream_name, cmd.token)
# Fetch all updates between then and now.
limited = True
while limited:
updates, current_token, limited = await stream.get_updates_since(
current_token, cmd.token
)
if updates:
await self.on_rdata(
cmd.stream_name,
current_token,
[stream.parse_row(update[1]) for update in updates],
)

# We've now caught up to position sent to us, notify handler.
await self.replication_data_handler.on_position(cmd.stream_name, cmd.token)

self.streams_connecting.discard(cmd.stream_name)

Expand Down
6 changes: 4 additions & 2 deletions tests/replication/slave/storage/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@

from mock import Mock, NonCallableMock

from synapse.replication.tcp.client import ReplicationClientFactory
from synapse.replication.tcp.client import (
ReplicationClientFactory,
ReplicationDataHandler,
)
from synapse.replication.tcp.handler import ReplicationCommandHandler
from synapse.replication.tcp.client import ReplicationDataHandler
from synapse.replication.tcp.resource import ReplicationStreamProtocolFactory
from synapse.storage.database import make_conn

Expand Down