Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Move some replication processing out of generic_worker #9796

Merged
merged 8 commits into from
Apr 14, 2021
70 changes: 2 additions & 68 deletions synapse/app/generic_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import contextlib
import logging
import sys
from typing import Dict, Iterable, Optional, Set
from typing import Dict, Iterable, Optional

from typing_extensions import ContextManager

Expand Down Expand Up @@ -81,14 +81,9 @@
from synapse.replication.tcp.client import ReplicationDataHandler
from synapse.replication.tcp.commands import ClearUserSyncsCommand
from synapse.replication.tcp.streams import (
AccountDataStream,
DeviceListsStream,
GroupServerStream,
PresenceStream,
PushersStream,
PushRulesStream,
ReceiptsStream,
TagAccountDataStream,
ToDeviceStream,
)
from synapse.rest.admin import register_servlets_for_media_repo
Expand Down Expand Up @@ -693,72 +688,11 @@ async def _process_and_notify(self, stream_name, instance_name, token, rows):
stream_name, token, rows
)

if stream_name == PushRulesStream.NAME:
self.notifier.on_new_event(
richvdh marked this conversation as resolved.
Show resolved Hide resolved
"push_rules_key", token, users=[row.user_id for row in rows]
)
elif stream_name in (AccountDataStream.NAME, TagAccountDataStream.NAME):
self.notifier.on_new_event(
"account_data_key", token, users=[row.user_id for row in rows]
)
elif stream_name == ReceiptsStream.NAME:
self.notifier.on_new_event(
"receipt_key", token, rooms=[row.room_id for row in rows]
)
await self.pusher_pool.on_new_receipts(
token, token, {row.room_id for row in rows}
)
elif stream_name == ToDeviceStream.NAME:
entities = [row.entity for row in rows if row.entity.startswith("@")]
if entities:
self.notifier.on_new_event("to_device_key", token, users=entities)
elif stream_name == DeviceListsStream.NAME:
all_room_ids = set() # type: Set[str]
for row in rows:
if row.entity.startswith("@"):
room_ids = await self.store.get_rooms_for_user(row.entity)
all_room_ids.update(room_ids)
self.notifier.on_new_event("device_list_key", token, rooms=all_room_ids)
elif stream_name == PresenceStream.NAME:
if stream_name == PresenceStream.NAME:
await self.presence_handler.process_replication_rows(token, rows)
elif stream_name == GroupServerStream.NAME:
self.notifier.on_new_event(
"groups_key", token, users=[row.user_id for row in rows]
)
elif stream_name == PushersStream.NAME:
for row in rows:
if row.deleted:
self.stop_pusher(row.user_id, row.app_id, row.pushkey)
else:
await self.start_pusher(row.user_id, row.app_id, row.pushkey)
except Exception:
logger.exception("Error processing replication")

async def on_position(self, stream_name: str, instance_name: str, token: int):
await super().on_position(stream_name, instance_name, token)
# Also call on_rdata to ensure that stream positions are properly reset.
await self.on_rdata(stream_name, instance_name, token, [])

def stop_pusher(self, user_id, app_id, pushkey):
if not self.notify_pushers:
return

key = "%s:%s" % (app_id, pushkey)
pushers_for_user = self.pusher_pool.pushers.get(user_id, {})
pusher = pushers_for_user.pop(key, None)
if pusher is None:
return
logger.info("Stopping pusher %r / %r", user_id, key)
pusher.on_stop()

async def start_pusher(self, user_id, app_id, pushkey):
if not self.notify_pushers:
return

key = "%s:%s" % (app_id, pushkey)
logger.info("Starting pusher %r / %r", user_id, key)
return await self.pusher_pool.start_pusher_by_id(app_id, pushkey, user_id)

def on_remote_server_up(self, server: str):
"""Called when get a new REMOTE_SERVER_UP command."""

Expand Down
75 changes: 73 additions & 2 deletions synapse/replication/tcp/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,25 @@
"""A replication client for use by synapse workers.
"""
import logging
from typing import TYPE_CHECKING, Dict, List, Tuple
from typing import TYPE_CHECKING, Dict, List, Set, Tuple

from twisted.internet.defer import Deferred
from twisted.internet.protocol import ReconnectingClientFactory

from synapse.api.constants import EventTypes
from synapse.logging.context import PreserveLoggingContext, make_deferred_yieldable
from synapse.replication.tcp.protocol import ClientReplicationStreamProtocol
from synapse.replication.tcp.streams import TypingStream
from synapse.replication.tcp.streams import (
AccountDataStream,
DeviceListsStream,
GroupServerStream,
PushersStream,
PushRulesStream,
ReceiptsStream,
TagAccountDataStream,
ToDeviceStream,
TypingStream,
)
from synapse.replication.tcp.streams.events import (
EventsStream,
EventsStreamEventRow,
Expand Down Expand Up @@ -106,6 +116,9 @@ def __init__(self, hs: "HomeServer"):
self._instance_name = hs.get_instance_name()
self._typing_handler = hs.get_typing_handler()

self._notify_pushers = hs.config.start_pushers
self._pusher_pool = hs.get_pusherpool()

# Map from stream to list of deferreds waiting for the stream to
# arrive at a particular position. The lists are sorted by stream position.
self._streams_to_waiters = {} # type: Dict[str, List[Tuple[int, Deferred]]]
Expand All @@ -131,6 +144,42 @@ async def on_rdata(
self.notifier.on_new_event(
"typing_key", token, rooms=[row.room_id for row in rows]
)
elif stream_name == PushRulesStream.NAME:
self.notifier.on_new_event(
"push_rules_key", token, users=[row.user_id for row in rows]
)
elif stream_name in (AccountDataStream.NAME, TagAccountDataStream.NAME):
self.notifier.on_new_event(
"account_data_key", token, users=[row.user_id for row in rows]
)
elif stream_name == ReceiptsStream.NAME:
self.notifier.on_new_event(
"receipt_key", token, rooms=[row.room_id for row in rows]
)
await self._pusher_pool.on_new_receipts(
token, token, {row.room_id for row in rows}
)
elif stream_name == ToDeviceStream.NAME:
entities = [row.entity for row in rows if row.entity.startswith("@")]
if entities:
self.notifier.on_new_event("to_device_key", token, users=entities)
elif stream_name == DeviceListsStream.NAME:
all_room_ids = set() # type: Set[str]
for row in rows:
if row.entity.startswith("@"):
room_ids = await self.store.get_rooms_for_user(row.entity)
all_room_ids.update(room_ids)
self.notifier.on_new_event("device_list_key", token, rooms=all_room_ids)
elif stream_name == GroupServerStream.NAME:
self.notifier.on_new_event(
"groups_key", token, users=[row.user_id for row in rows]
)
elif stream_name == PushersStream.NAME:
for row in rows:
if row.deleted:
self.stop_pusher(row.user_id, row.app_id, row.pushkey)
else:
await self.start_pusher(row.user_id, row.app_id, row.pushkey)
Comment on lines +159 to +194
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this means that all this stuff is going to happen on the master, where it previously did not (iirc each replication client receives its own replication data back). Is that not going to be a problem for any of these?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

iirc each replication client receives its own replication data back

We filter out the echoes before we get here, as RDATA includes the sending instance name, so that shouldn't be a problem. Really, I think its a bug that this hasn't been happening on master, e.g. if we split out receipts etc but keep /sync on master then /sync requests won't get told about new receipts.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We filter out the echoes before we get here,

I went looking for that code, but couldn't find it. Can you point me to it?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


if stream_name == EventsStream.NAME:
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved
# We shouldn't get multiple rows per token for events stream, so
Expand Down Expand Up @@ -197,6 +246,8 @@ async def on_position(self, stream_name: str, instance_name: str, token: int):
# may be streaming.
self.notifier.notify_replication()

await self.on_rdata(stream_name, instance_name, token, [])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we get rid of the call to self.store.process_replication_rows above, given that is also called by on_rdata ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. I've moved the self.notifier.notify_replication() after the on_rdata(..) call in case anything listening was requiring the store to have processed the replication.


def on_remote_server_up(self, server: str):
"""Called when get a new REMOTE_SERVER_UP command."""

Expand Down Expand Up @@ -236,3 +287,23 @@ async def wait_for_stream_position(
logger.info(
"Finished waiting for repl stream %r to reach %s", stream_name, position
)

def stop_pusher(self, user_id, app_id, pushkey):
if not self._notify_pushers:
return

key = "%s:%s" % (app_id, pushkey)
pushers_for_user = self._pusher_pool.pushers.get(user_id, {})
pusher = pushers_for_user.pop(key, None)
if pusher is None:
return
logger.info("Stopping pusher %r / %r", user_id, key)
pusher.on_stop()

async def start_pusher(self, user_id, app_id, pushkey):
if not self._notify_pushers:
return

key = "%s:%s" % (app_id, pushkey)
logger.info("Starting pusher %r / %r", user_id, key)
return await self._pusher_pool.start_pusher_by_id(app_id, pushkey, user_id)