Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Remove usage of "conn_id" for presence. #7128

Merged
merged 5 commits into from
Mar 30, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions docs/tcp_replication.md
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,10 @@ Asks the server for the current position of all streams.

A user has started or stopped syncing

#### CLEAR_USER_SYNC (C)

The server should clear all associated user sync data from the worker.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some words on when it is sent would be good.


#### FEDERATION_ACK (C)

Acknowledge receipt of some federation data
Expand Down
17 changes: 14 additions & 3 deletions synapse/app/generic_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
from synapse.replication.slave.storage.room import RoomStore
from synapse.replication.slave.storage.transactions import SlavedTransactionStore
from synapse.replication.tcp.client import ReplicationClientHandler
from synapse.replication.tcp.commands import ClearUserSyncsCommand
from synapse.replication.tcp.streams import (
AccountDataStream,
DeviceListsStream,
Expand Down Expand Up @@ -124,7 +125,6 @@
from synapse.util.async_helpers import Linearizer
from synapse.util.httpresourcetree import create_resource_tree
from synapse.util.manhole import manhole
from synapse.util.stringutils import random_string
from synapse.util.versionstring import get_version_string

logger = logging.getLogger("synapse.app.generic_worker")
Expand Down Expand Up @@ -246,8 +246,19 @@ def __init__(self, hs):
self.send_stop_syncing, UPDATE_SYNCING_USERS_MS
)

self.process_id = random_string(16)
logger.info("Presence process_id is %r", self.process_id)
hs.get_reactor().addSystemEventTrigger(
"before",
"shutdown",
run_as_background_process,
"generic_presence.on_shutdown",
self._on_shutdown,
)

def _on_shutdown(self):
if self.hs.config.use_presence:
self.hs.get_tcp_replication().send_command(
ClearUserSyncsCommand(self.instance_id)
)

def send_user_sync(self, user_id, is_syncing, last_sync_ms):
if self.hs.config.use_presence:
Expand Down
26 changes: 26 additions & 0 deletions synapse/replication/tcp/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,30 @@ def to_line(self):
)


class ClearUserSyncsCommand(Command):
"""Sent by the client to inform the server that it should drop all
information about syncing users sent by the client.

Mainly used when client is about to shut down.

Format::

CLEAR_USER_SYNC <instance_id>
"""

NAME = "CLEAR_USER_SYNC"

def __init__(self, instance_id):
self.instance_id = instance_id

@classmethod
def from_line(cls, line):
return cls(line)

def to_line(self):
return self.instance_id


class FederationAckCommand(Command):
"""Sent by the client when it has processed up to a given point in the
federation stream. This allows the master to drop in-memory caches of the
Expand Down Expand Up @@ -400,6 +424,7 @@ class RemoteServerUpCommand(Command):
InvalidateCacheCommand,
UserIpCommand,
RemoteServerUpCommand,
ClearUserSyncsCommand,
) # type: Tuple[Type[Command], ...]

# Map of command name to command type.
Expand All @@ -422,6 +447,7 @@ class RemoteServerUpCommand(Command):
ReplicateCommand.NAME,
PingCommand.NAME,
UserSyncCommand.NAME,
ClearUserSyncsCommand.NAME,
FederationAckCommand.NAME,
RemovePusherCommand.NAME,
InvalidateCacheCommand.NAME,
Expand Down
3 changes: 3 additions & 0 deletions synapse/replication/tcp/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,9 @@ async def on_USER_SYNC(self, cmd):
cmd.instance_id, cmd.user_id, cmd.is_syncing, cmd.last_sync_ms
)

async def on_CLEAR_USER_SYNC(self, cmd):
await self.streamer.on_clear_user_syncs(cmd.instance_id)

async def on_REPLICATE(self, cmd):
# Subscribe to all streams we're publishing to.
for stream_name in self.streamer.streams_by_name:
Expand Down
5 changes: 5 additions & 0 deletions synapse/replication/tcp/resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,11 @@ async def on_user_sync(self, instance_id, user_id, is_syncing, last_sync_ms):
instance_id, user_id, is_syncing, last_sync_ms
)

async def on_clear_user_syncs(self, instance_id):
"""A replication client wants us to drop all their UserSync data.
"""
await self.presence_handler.update_external_syncs_clear(instance_id)

@measure_func("repl.on_remove_pusher")
async def on_remove_pusher(self, app_id, push_key, user_id):
"""A client has asked us to remove a pusher
Expand Down