Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Add all Stream Writer worker types to configure_workers_and_start.py (#…
Browse files Browse the repository at this point in the history
…14197)

Co-authored-by: reivilibre <oliverw@matrix.org>
  • Loading branch information
realtyem and reivilibre authored Nov 8, 2022
1 parent 5853d79 commit d85cba1
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 7 deletions.
1 change: 1 addition & 0 deletions changelog.d/14197.docker
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add all Stream Writer worker types to configure_workers_and_start.py.
76 changes: 69 additions & 7 deletions docker/configure_workers_and_start.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,12 @@

MAIN_PROCESS_HTTP_LISTENER_PORT = 8080


# Workers with exposed endpoints needs either "client", "federation", or "media" listener_resources
# Watching /_matrix/client needs a "client" listener
# Watching /_matrix/federation needs a "federation" listener
# Watching /_matrix/media and related needs a "media" listener
# Stream Writers require "client" and "replication" listeners because they
# have to attach by instance_map to the master process and have client endpoints.
WORKERS_CONFIG: Dict[str, Dict[str, Any]] = {
"pusher": {
"app": "synapse.app.pusher",
Expand Down Expand Up @@ -209,6 +214,49 @@
% (MAIN_PROCESS_HTTP_LISTENER_PORT,)
),
},
"account_data": {
"app": "synapse.app.generic_worker",
"listener_resources": ["client", "replication"],
"endpoint_patterns": [
"^/_matrix/client/(r0|v3|unstable)/.*/tags",
"^/_matrix/client/(r0|v3|unstable)/.*/account_data",
],
"shared_extra_conf": {},
"worker_extra_conf": "",
},
"presence": {
"app": "synapse.app.generic_worker",
"listener_resources": ["client", "replication"],
"endpoint_patterns": ["^/_matrix/client/(api/v1|r0|v3|unstable)/presence/"],
"shared_extra_conf": {},
"worker_extra_conf": "",
},
"receipts": {
"app": "synapse.app.generic_worker",
"listener_resources": ["client", "replication"],
"endpoint_patterns": [
"^/_matrix/client/(r0|v3|unstable)/rooms/.*/receipt",
"^/_matrix/client/(r0|v3|unstable)/rooms/.*/read_markers",
],
"shared_extra_conf": {},
"worker_extra_conf": "",
},
"to_device": {
"app": "synapse.app.generic_worker",
"listener_resources": ["client", "replication"],
"endpoint_patterns": ["^/_matrix/client/(r0|v3|unstable)/sendToDevice/"],
"shared_extra_conf": {},
"worker_extra_conf": "",
},
"typing": {
"app": "synapse.app.generic_worker",
"listener_resources": ["client", "replication"],
"endpoint_patterns": [
"^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/typing"
],
"shared_extra_conf": {},
"worker_extra_conf": "",
},
}

# Templates for sections that may be inserted multiple times in config files
Expand Down Expand Up @@ -271,7 +319,7 @@ def convert(src: str, dst: str, **template_vars: object) -> None:
outfile.write(rendered)


def add_sharding_to_shared_config(
def add_worker_roles_to_shared_config(
shared_config: dict,
worker_type: str,
worker_name: str,
Expand Down Expand Up @@ -309,6 +357,20 @@ def add_sharding_to_shared_config(
"port": worker_port,
}

elif worker_type in ["account_data", "presence", "receipts", "to_device", "typing"]:
# Update the list of stream writers
# It's convienent that the name of the worker type is the same as the event stream
shared_config.setdefault("stream_writers", {}).setdefault(
worker_type, []
).append(worker_name)

# Map of stream writer instance names to host/ports combos
# For now, all stream writers need http replication ports
instance_map[worker_name] = {
"host": "localhost",
"port": worker_port,
}

elif worker_type == "media_repository":
# The first configured media worker will run the media background jobs
shared_config.setdefault("media_instance_running_background_jobs", worker_name)
Expand Down Expand Up @@ -441,11 +503,11 @@ def generate_worker_files(

# Check if more than one instance of this worker type has been specified
worker_type_total_count = worker_types.count(worker_type)
if worker_type_total_count > 1:
# Update the shared config with sharding-related options if necessary
add_sharding_to_shared_config(
shared_config, worker_type, worker_name, worker_port
)

# Update the shared config with sharding-related options if necessary
add_worker_roles_to_shared_config(
shared_config, worker_type, worker_name, worker_port
)

# Enable the worker in supervisord
worker_descriptors.append(worker_config)
Expand Down

0 comments on commit d85cba1

Please sign in to comment.