Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Remove usage of "conn_id" for presence. #7128

Merged
merged 5 commits into from
Mar 30, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/7128.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add explicit `instance_id` for USER_SYNC commands and remove implicit `conn_id` usage.
6 changes: 6 additions & 0 deletions docs/tcp_replication.md
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,12 @@ Asks the server for the current position of all streams.

A user has started or stopped syncing

#### CLEAR_USER_SYNC (C)

The server should clear all associated user sync data from the worker.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some words on when it is sent would be good.


This is used when a worker is shutting down.

#### FEDERATION_ACK (C)

Acknowledge receipt of some federation data
Expand Down
20 changes: 16 additions & 4 deletions synapse/app/generic_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
from synapse.replication.slave.storage.room import RoomStore
from synapse.replication.slave.storage.transactions import SlavedTransactionStore
from synapse.replication.tcp.client import ReplicationClientHandler
from synapse.replication.tcp.commands import ClearUserSyncsCommand
from synapse.replication.tcp.streams import (
AccountDataStream,
DeviceListsStream,
Expand Down Expand Up @@ -124,7 +125,6 @@
from synapse.util.async_helpers import Linearizer
from synapse.util.httpresourcetree import create_resource_tree
from synapse.util.manhole import manhole
from synapse.util.stringutils import random_string
from synapse.util.versionstring import get_version_string

logger = logging.getLogger("synapse.app.generic_worker")
Expand Down Expand Up @@ -233,6 +233,7 @@ def __init__(self, hs):
self.user_to_num_current_syncs = {}
self.clock = hs.get_clock()
self.notifier = hs.get_notifier()
self.instance_id = hs.get_instance_id()

active_presence = self.store.take_presence_startup_info()
self.user_to_current_state = {state.user_id: state for state in active_presence}
Expand All @@ -245,13 +246,24 @@ def __init__(self, hs):
self.send_stop_syncing, UPDATE_SYNCING_USERS_MS
)

self.process_id = random_string(16)
logger.info("Presence process_id is %r", self.process_id)
hs.get_reactor().addSystemEventTrigger(
"before",
"shutdown",
run_as_background_process,
"generic_presence.on_shutdown",
self._on_shutdown,
)

def _on_shutdown(self):
if self.hs.config.use_presence:
self.hs.get_tcp_replication().send_command(
ClearUserSyncsCommand(self.instance_id)
)

def send_user_sync(self, user_id, is_syncing, last_sync_ms):
if self.hs.config.use_presence:
self.hs.get_tcp_replication().send_user_sync(
user_id, is_syncing, last_sync_ms
self.instance_id, user_id, is_syncing, last_sync_ms
)

def mark_as_coming_online(self, user_id):
Expand Down
6 changes: 4 additions & 2 deletions synapse/replication/tcp/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,10 +189,12 @@ def send_federation_ack(self, token):
"""
self.send_command(FederationAckCommand(token))

def send_user_sync(self, user_id, is_syncing, last_sync_ms):
def send_user_sync(self, instance_id, user_id, is_syncing, last_sync_ms):
"""Poke the master that a user has started/stopped syncing.
"""
self.send_command(UserSyncCommand(user_id, is_syncing, last_sync_ms))
self.send_command(
UserSyncCommand(instance_id, user_id, is_syncing, last_sync_ms)
)

def send_remove_pusher(self, app_id, push_key, user_id):
"""Poke the master to remove a pusher for a user
Expand Down
36 changes: 32 additions & 4 deletions synapse/replication/tcp/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,37 +207,63 @@ class UserSyncCommand(Command):
Format::
USER_SYNC <user_id> <state> <last_sync_ms>
USER_SYNC <instance_id> <user_id> <state> <last_sync_ms>
Where <state> is either "start" or "stop"
"""

NAME = "USER_SYNC"

def __init__(self, user_id, is_syncing, last_sync_ms):
def __init__(self, instance_id, user_id, is_syncing, last_sync_ms):
self.instance_id = instance_id
self.user_id = user_id
self.is_syncing = is_syncing
self.last_sync_ms = last_sync_ms

@classmethod
def from_line(cls, line):
user_id, state, last_sync_ms = line.split(" ", 2)
instance_id, user_id, state, last_sync_ms = line.split(" ", 3)

if state not in ("start", "end"):
raise Exception("Invalid USER_SYNC state %r" % (state,))

return cls(user_id, state == "start", int(last_sync_ms))
return cls(instance_id, user_id, state == "start", int(last_sync_ms))

def to_line(self):
return " ".join(
(
self.instance_id,
self.user_id,
"start" if self.is_syncing else "end",
str(self.last_sync_ms),
)
)


class ClearUserSyncsCommand(Command):
"""Sent by the client to inform the server that it should drop all
information about syncing users sent by the client.
Mainly used when client is about to shut down.
Format::
CLEAR_USER_SYNC <instance_id>
"""

NAME = "CLEAR_USER_SYNC"

def __init__(self, instance_id):
self.instance_id = instance_id

@classmethod
def from_line(cls, line):
return cls(line)

def to_line(self):
return self.instance_id


class FederationAckCommand(Command):
"""Sent by the client when it has processed up to a given point in the
federation stream. This allows the master to drop in-memory caches of the
Expand Down Expand Up @@ -398,6 +424,7 @@ class RemoteServerUpCommand(Command):
InvalidateCacheCommand,
UserIpCommand,
RemoteServerUpCommand,
ClearUserSyncsCommand,
) # type: Tuple[Type[Command], ...]

# Map of command name to command type.
Expand All @@ -420,6 +447,7 @@ class RemoteServerUpCommand(Command):
ReplicateCommand.NAME,
PingCommand.NAME,
UserSyncCommand.NAME,
ClearUserSyncsCommand.NAME,
FederationAckCommand.NAME,
RemovePusherCommand.NAME,
InvalidateCacheCommand.NAME,
Expand Down
9 changes: 7 additions & 2 deletions synapse/replication/tcp/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -423,9 +423,12 @@ async def on_NAME(self, cmd):

async def on_USER_SYNC(self, cmd):
await self.streamer.on_user_sync(
self.conn_id, cmd.user_id, cmd.is_syncing, cmd.last_sync_ms
cmd.instance_id, cmd.user_id, cmd.is_syncing, cmd.last_sync_ms
)

async def on_CLEAR_USER_SYNC(self, cmd):
await self.streamer.on_clear_user_syncs(cmd.instance_id)

async def on_REPLICATE(self, cmd):
# Subscribe to all streams we're publishing to.
for stream_name in self.streamer.streams_by_name:
Expand Down Expand Up @@ -551,6 +554,8 @@ def __init__(
):
BaseReplicationStreamProtocol.__init__(self, clock)

self.instance_id = hs.get_instance_id()

self.client_name = client_name
self.server_name = server_name
self.handler = handler
Expand Down Expand Up @@ -580,7 +585,7 @@ def connectionMade(self):
currently_syncing = self.handler.get_currently_syncing_users()
now = self.clock.time_msec()
for user_id in currently_syncing:
self.send_command(UserSyncCommand(user_id, True, now))
self.send_command(UserSyncCommand(self.instance_id, user_id, True, now))

# We've now finished connecting to so inform the client handler
self.handler.update_connection(self)
Expand Down
17 changes: 7 additions & 10 deletions synapse/replication/tcp/resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,14 +251,19 @@ def federation_ack(self, token):
self.federation_sender.federation_ack(token)

@measure_func("repl.on_user_sync")
async def on_user_sync(self, conn_id, user_id, is_syncing, last_sync_ms):
async def on_user_sync(self, instance_id, user_id, is_syncing, last_sync_ms):
"""A client has started/stopped syncing on a worker.
"""
user_sync_counter.inc()
await self.presence_handler.update_external_syncs_row(
conn_id, user_id, is_syncing, last_sync_ms
instance_id, user_id, is_syncing, last_sync_ms
)

async def on_clear_user_syncs(self, instance_id):
"""A replication client wants us to drop all their UserSync data.
"""
await self.presence_handler.update_external_syncs_clear(instance_id)

@measure_func("repl.on_remove_pusher")
async def on_remove_pusher(self, app_id, push_key, user_id):
"""A client has asked us to remove a pusher
Expand Down Expand Up @@ -321,14 +326,6 @@ def lost_connection(self, connection):
except ValueError:
pass

# We need to tell the presence handler that the connection has been
# lost so that it can handle any ongoing syncs on that connection.
run_as_background_process(
"update_external_syncs_clear",
self.presence_handler.update_external_syncs_clear,
connection.conn_id,
)


def _batch_updates(updates):
"""Takes a list of updates of form [(token, row)] and sets the token to
Expand Down
11 changes: 11 additions & 0 deletions synapse/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@
from synapse.streams.events import EventSources
from synapse.util import Clock
from synapse.util.distributor import Distributor
from synapse.util.stringutils import random_string

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -226,6 +227,8 @@ def __init__(self, hostname: str, config: HomeServerConfig, reactor=None, **kwar
self._listening_services = []
self.start_time = None

self.instance_id = random_string(5)

self.clock = Clock(reactor)
self.distributor = Distributor()
self.ratelimiter = Ratelimiter()
Expand All @@ -238,6 +241,14 @@ def __init__(self, hostname: str, config: HomeServerConfig, reactor=None, **kwar
for depname in kwargs:
setattr(self, depname, kwargs[depname])

def get_instance_id(self):
"""A unique ID for this synapse process instance.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wouldn't mind some words about what this is used for. "This is used to distinguish instances in worker-based deployments"?


This is used to distinguish running instances in worker-based
deployments.
"""
return self.instance_id

def setup(self):
logger.info("Setting up.")
self.start_time = int(self.get_clock().time())
Expand Down
2 changes: 2 additions & 0 deletions synapse/server.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -114,3 +114,5 @@ class HomeServer(object):
pass
def is_mine_id(self, domain_id: str) -> bool:
pass
def get_instance_id(self) -> str:
pass