Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Remove usage of "conn_id" for presence. #7128

Merged
merged 5 commits into from
Mar 30, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion synapse/app/generic_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,7 @@ def __init__(self, hs):
self.user_to_num_current_syncs = {}
self.clock = hs.get_clock()
self.notifier = hs.get_notifier()
self.instance_id = hs.get_instance_id()

active_presence = self.store.take_presence_startup_info()
self.user_to_current_state = {state.user_id: state for state in active_presence}
Expand All @@ -251,7 +252,7 @@ def __init__(self, hs):
def send_user_sync(self, user_id, is_syncing, last_sync_ms):
if self.hs.config.use_presence:
self.hs.get_tcp_replication().send_user_sync(
user_id, is_syncing, last_sync_ms
self.instance_id, user_id, is_syncing, last_sync_ms
)

def mark_as_coming_online(self, user_id):
Expand Down
6 changes: 4 additions & 2 deletions synapse/replication/tcp/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,10 +189,12 @@ def send_federation_ack(self, token):
"""
self.send_command(FederationAckCommand(token))

def send_user_sync(self, user_id, is_syncing, last_sync_ms):
def send_user_sync(self, instance_id, user_id, is_syncing, last_sync_ms):
"""Poke the master that a user has started/stopped syncing.
"""
self.send_command(UserSyncCommand(user_id, is_syncing, last_sync_ms))
self.send_command(
UserSyncCommand(instance_id, user_id, is_syncing, last_sync_ms)
)

def send_remove_pusher(self, app_id, push_key, user_id):
"""Poke the master to remove a pusher for a user
Expand Down
10 changes: 6 additions & 4 deletions synapse/replication/tcp/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,30 +207,32 @@ class UserSyncCommand(Command):

Format::

USER_SYNC <user_id> <state> <last_sync_ms>
USER_SYNC <instance_id> <user_id> <state> <last_sync_ms>

Where <state> is either "start" or "stop"
"""

NAME = "USER_SYNC"

def __init__(self, user_id, is_syncing, last_sync_ms):
def __init__(self, instance_id, user_id, is_syncing, last_sync_ms):
self.instance_id = instance_id
self.user_id = user_id
self.is_syncing = is_syncing
self.last_sync_ms = last_sync_ms

@classmethod
def from_line(cls, line):
user_id, state, last_sync_ms = line.split(" ", 2)
instance_id, user_id, state, last_sync_ms = line.split(" ", 3)

if state not in ("start", "end"):
raise Exception("Invalid USER_SYNC state %r" % (state,))

return cls(user_id, state == "start", int(last_sync_ms))
return cls(instance_id, user_id, state == "start", int(last_sync_ms))

def to_line(self):
return " ".join(
(
self.instance_id,
self.user_id,
"start" if self.is_syncing else "end",
str(self.last_sync_ms),
Expand Down
6 changes: 4 additions & 2 deletions synapse/replication/tcp/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,7 @@ async def on_NAME(self, cmd):

async def on_USER_SYNC(self, cmd):
await self.streamer.on_user_sync(
self.conn_id, cmd.user_id, cmd.is_syncing, cmd.last_sync_ms
cmd.instance_id, cmd.user_id, cmd.is_syncing, cmd.last_sync_ms
)

async def on_REPLICATE(self, cmd):
Expand Down Expand Up @@ -551,6 +551,8 @@ def __init__(
):
BaseReplicationStreamProtocol.__init__(self, clock)

self.instance_id = hs.get_instance_id()

self.client_name = client_name
self.server_name = server_name
self.handler = handler
Expand Down Expand Up @@ -580,7 +582,7 @@ def connectionMade(self):
currently_syncing = self.handler.get_currently_syncing_users()
now = self.clock.time_msec()
for user_id in currently_syncing:
self.send_command(UserSyncCommand(user_id, True, now))
self.send_command(UserSyncCommand(self.instance_id, user_id, True, now))

# We've now finished connecting to so inform the client handler
self.handler.update_connection(self)
Expand Down
12 changes: 2 additions & 10 deletions synapse/replication/tcp/resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,12 +251,12 @@ def federation_ack(self, token):
self.federation_sender.federation_ack(token)

@measure_func("repl.on_user_sync")
async def on_user_sync(self, conn_id, user_id, is_syncing, last_sync_ms):
async def on_user_sync(self, instance_id, user_id, is_syncing, last_sync_ms):
"""A client has started/stopped syncing on a worker.
"""
user_sync_counter.inc()
await self.presence_handler.update_external_syncs_row(
conn_id, user_id, is_syncing, last_sync_ms
instance_id, user_id, is_syncing, last_sync_ms
)

@measure_func("repl.on_remove_pusher")
Expand Down Expand Up @@ -321,14 +321,6 @@ def lost_connection(self, connection):
except ValueError:
pass

# We need to tell the presence handler that the connection has been
# lost so that it can handle any ongoing syncs on that connection.
run_as_background_process(
"update_external_syncs_clear",
self.presence_handler.update_external_syncs_clear,
connection.conn_id,
)


def _batch_updates(updates):
"""Takes a list of updates of form [(token, row)] and sets the token to
Expand Down
8 changes: 8 additions & 0 deletions synapse/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@
from synapse.streams.events import EventSources
from synapse.util import Clock
from synapse.util.distributor import Distributor
from synapse.util.stringutils import random_string

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -226,6 +227,8 @@ def __init__(self, hostname: str, config: HomeServerConfig, reactor=None, **kwar
self._listening_services = []
self.start_time = None

self.instance_id = random_string(5)

self.clock = Clock(reactor)
self.distributor = Distributor()
self.ratelimiter = Ratelimiter()
Expand All @@ -238,6 +241,11 @@ def __init__(self, hostname: str, config: HomeServerConfig, reactor=None, **kwar
for depname in kwargs:
setattr(self, depname, kwargs[depname])

def get_instance_id(self):
"""A unique ID for this synapse process instance.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wouldn't mind some words about what this is used for. "This is used to distinguish instances in worker-based deployments"?

"""
return self.instance_id

def setup(self):
logger.info("Setting up.")
self.start_time = int(self.get_clock().time())
Expand Down