Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Use stable identifiers for faster joins #14832

Merged
merged 5 commits into from
Jan 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/14832.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Faster joins: use stable identifiers from [MSC3706](https://github.com/matrix-org/matrix-spec-proposals/pull/3706).
2 changes: 2 additions & 0 deletions synapse/federation/federation_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -725,10 +725,12 @@ async def on_send_join_request(
"state": [p.get_pdu_json(time_now) for p in state_events],
"auth_chain": [p.get_pdu_json(time_now) for p in auth_chain_events],
"org.matrix.msc3706.partial_state": caller_supports_partial_state,
"members_omitted": caller_supports_partial_state,
}

if servers_in_room is not None:
resp["org.matrix.msc3706.servers_in_room"] = list(servers_in_room)
resp["servers_in_room"] = list(servers_in_room)

return resp

Expand Down
18 changes: 18 additions & 0 deletions synapse/federation/transport/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,7 @@ async def send_join_v2(
if self._faster_joins_enabled:
# lazy-load state on join
query_params["org.matrix.msc3706.partial_state"] = "true"
query_params["omit_members"] = "true"

return await self.client.put_json(
destination=destination,
Expand Down Expand Up @@ -909,6 +910,14 @@ def __init__(self, room_version: RoomVersion, v1_api: bool):
use_float="True",
)
)
# The stable field name comes last, so it "wins" if the fields disagree
self._coros.append(
ijson.items_coro(
_partial_state_parser(self._response),
"members_omitted",
use_float="True",
)
)

self._coros.append(
ijson.items_coro(
Expand All @@ -918,6 +927,15 @@ def __init__(self, room_version: RoomVersion, v1_api: bool):
)
)

# Again, stable field name comes last
self._coros.append(
ijson.items_coro(
_servers_in_room_parser(self._response),
"servers_in_room",
use_float="True",
)
)

def write(self, data: bytes) -> int:
for c in self._coros:
c.send(data)
Expand Down
13 changes: 10 additions & 3 deletions synapse/federation/transport/server/federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -437,9 +437,16 @@ async def on_PUT(

partial_state = False
if self._msc3706_enabled:
partial_state = parse_boolean_from_args(
query, "org.matrix.msc3706.partial_state", default=False
)
# The stable query parameter wins, if it disagrees with the unstable
# parameter for some reason.
stable_param = parse_boolean_from_args(query, "omit_members", default=None)
if stable_param is not None:
partial_state = stable_param
else:
partial_state = parse_boolean_from_args(
query, "org.matrix.msc3706.partial_state", default=False
)

result = await self.handler.on_send_join_request(
origin, content, room_id, caller_supports_partial_state=partial_state
)
Expand Down
2 changes: 1 addition & 1 deletion tests/federation/test_federation_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ def test_send_join_partial_state(self) -> None:
)
channel = self.make_signed_federation_request(
"PUT",
f"/_matrix/federation/v2/send_join/{self._room_id}/x?org.matrix.msc3706.partial_state=true",
f"/_matrix/federation/v2/send_join/{self._room_id}/x?omit_members=true",
content=join_event_dict,
)
self.assertEqual(channel.code, HTTPStatus.OK, channel.json_body)
Expand Down
77 changes: 57 additions & 20 deletions tests/federation/transport/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,14 @@
# limitations under the License.

import json
from typing import List, Optional
from unittest.mock import Mock

import ijson.common

from synapse.api.room_versions import RoomVersions
from synapse.federation.transport.client import SendJoinParser
from synapse.types import JsonDict
from synapse.util import ExceptionBundle

from tests.unittest import TestCase
Expand Down Expand Up @@ -71,33 +73,68 @@ def test_two_writes(self) -> None:

def test_partial_state(self) -> None:
"""Check that the partial_state flag is correctly parsed"""
parser = SendJoinParser(RoomVersions.V1, False)
response = {
"org.matrix.msc3706.partial_state": True,
}

serialised_response = json.dumps(response).encode()
def parse(response: JsonDict) -> bool:
parser = SendJoinParser(RoomVersions.V1, False)
serialised_response = json.dumps(response).encode()

# Send data to the parser
parser.write(serialised_response)
# Send data to the parser
parser.write(serialised_response)

# Retrieve and check the parsed SendJoinResponse
parsed_response = parser.finish()
self.assertTrue(parsed_response.partial_state)
# Retrieve and check the parsed SendJoinResponse
parsed_response = parser.finish()
return parsed_response.partial_state

def test_servers_in_room(self) -> None:
"""Check that the servers_in_room field is correctly parsed"""
parser = SendJoinParser(RoomVersions.V1, False)
response = {"org.matrix.msc3706.servers_in_room": ["hs1", "hs2"]}
self.assertTrue(parse({"members_omitted": True}))
self.assertTrue(parse({"org.matrix.msc3706.partial_state": True}))

serialised_response = json.dumps(response).encode()
self.assertFalse(parse({"members_omitted": False}))
self.assertFalse(parse({"org.matrix.msc3706.partial_state": False}))

# Send data to the parser
parser.write(serialised_response)
# If there's a conflict, the stable field wins.
self.assertTrue(
parse({"members_omitted": True, "org.matrix.msc3706.partial_state": False})
)
self.assertFalse(
parse({"members_omitted": False, "org.matrix.msc3706.partial_state": True})
)

# Retrieve and check the parsed SendJoinResponse
parsed_response = parser.finish()
self.assertEqual(parsed_response.servers_in_room, ["hs1", "hs2"])
def test_servers_in_room(self) -> None:
"""Check that the servers_in_room field is correctly parsed"""

def parse(response: JsonDict) -> Optional[List[str]]:
parser = SendJoinParser(RoomVersions.V1, False)
serialised_response = json.dumps(response).encode()

# Send data to the parser
parser.write(serialised_response)

# Retrieve and check the parsed SendJoinResponse
parsed_response = parser.finish()
return parsed_response.servers_in_room

self.assertEqual(
parse({"org.matrix.msc3706.servers_in_room": ["hs1", "hs2"]}),
["hs1", "hs2"],
)
self.assertEqual(parse({"servers_in_room": ["example.com"]}), ["example.com"])

# If both are provided, the stable identifier should win
self.assertEqual(
parse(
{
"org.matrix.msc3706.servers_in_room": ["old"],
"servers_in_room": ["new"],
}
),
["new"],
)

# And lastly, we should be able to tell if neither field was present.
self.assertEqual(
parse({}),
None,
)

def test_errors_closing_coroutines(self) -> None:
"""Check we close all coroutines, even if closing the first raises an Exception.
Expand Down