Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Convert federation client to async/await. #7975

Merged
merged 9 commits into from
Jul 30, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/7975.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Convert various parts of the codebase to async/await.
16 changes: 9 additions & 7 deletions contrib/cmdclient/console.py
Original file line number Diff line number Diff line change
Expand Up @@ -609,13 +609,15 @@ def do_stream(self, line):

@defer.inlineCallbacks
def _do_event_stream(self, timeout):
res = yield self.http_client.get_json(
self._url() + "/events",
{
"access_token": self._tok(),
"timeout": str(timeout),
"from": self.event_stream_token,
},
res = yield defer.ensureDeferred(
self.http_client.get_json(
self._url() + "/events",
{
"access_token": self._tok(),
"timeout": str(timeout),
"from": self.event_stream_token,
},
)
)
print(json.dumps(res, indent=4))

Expand Down
60 changes: 32 additions & 28 deletions synapse/crypto/keyring.py
Original file line number Diff line number Diff line change
Expand Up @@ -632,18 +632,20 @@ def get_server_verify_key_v2_indirect(self, keys_to_fetch, key_server):
)

try:
query_response = yield self.client.post_json(
destination=perspective_name,
path="/_matrix/key/v2/query",
data={
"server_keys": {
server_name: {
key_id: {"minimum_valid_until_ts": min_valid_ts}
for key_id, min_valid_ts in server_keys.items()
query_response = yield defer.ensureDeferred(
self.client.post_json(
destination=perspective_name,
path="/_matrix/key/v2/query",
data={
"server_keys": {
server_name: {
key_id: {"minimum_valid_until_ts": min_valid_ts}
for key_id, min_valid_ts in server_keys.items()
}
for server_name, server_keys in keys_to_fetch.items()
}
for server_name, server_keys in keys_to_fetch.items()
}
},
},
)
)
except (NotRetryingDestination, RequestSendFailed) as e:
# these both have str() representations which we can't really improve upon
Expand Down Expand Up @@ -792,23 +794,25 @@ def get_server_verify_key_v2_direct(self, server_name, key_ids):

time_now_ms = self.clock.time_msec()
try:
response = yield self.client.get_json(
destination=server_name,
path="/_matrix/key/v2/server/"
+ urllib.parse.quote(requested_key_id),
ignore_backoff=True,
# we only give the remote server 10s to respond. It should be an
# easy request to handle, so if it doesn't reply within 10s, it's
# probably not going to.
#
# Furthermore, when we are acting as a notary server, we cannot
# wait all day for all of the origin servers, as the requesting
# server will otherwise time out before we can respond.
#
# (Note that get_json may make 4 attempts, so this can still take
# almost 45 seconds to fetch the headers, plus up to another 60s to
# read the response).
timeout=10000,
response = yield defer.ensureDeferred(
self.client.get_json(
destination=server_name,
path="/_matrix/key/v2/server/"
+ urllib.parse.quote(requested_key_id),
ignore_backoff=True,
# we only give the remote server 10s to respond. It should be an
# easy request to handle, so if it doesn't reply within 10s, it's
# probably not going to.
#
# Furthermore, when we are acting as a notary server, we cannot
# wait all day for all of the origin servers, as the requesting
# server will otherwise time out before we can respond.
#
# (Note that get_json may make 4 attempts, so this can still take
# almost 45 seconds to fetch the headers, plus up to another 60s to
# read the response).
timeout=10000,
)
)
except (NotRetryingDestination, RequestSendFailed) as e:
# these both have str() representations which we can't really improve
Expand Down
8 changes: 4 additions & 4 deletions synapse/federation/federation_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ def make_query(
and try the request anyway.

Returns:
a Deferred which will eventually yield a JSON object from the
a Awaitable which will eventually yield a JSON object from the
response
"""
sent_queries_counter.labels(query_type).inc()
Expand All @@ -157,7 +157,7 @@ def query_client_keys(self, destination, content, timeout):
content (dict): The query content.

Returns:
a Deferred which will eventually yield a JSON object from the
an Awaitable which will eventually yield a JSON object from the
response
"""
sent_queries_counter.labels("client_device_keys").inc()
Expand All @@ -180,7 +180,7 @@ def claim_client_keys(self, destination, content, timeout):
content (dict): The query content.

Returns:
a Deferred which will eventually yield a JSON object from the
an Awaitable which will eventually yield a JSON object from the
response
"""
sent_queries_counter.labels("client_one_time_keys").inc()
Expand Down Expand Up @@ -900,7 +900,7 @@ def get_public_rooms(
party instance

Returns:
Deferred[Dict[str, Any]]: The response from the remote server, or None if
Awaitable[Dict[str, Any]]: The response from the remote server, or None if
`remote_server` is the same as the local server_name

Raises:
Expand Down
19 changes: 6 additions & 13 deletions synapse/federation/sender/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -288,8 +288,7 @@ def _send_pdu(self, pdu: EventBase, destinations: Iterable[str]) -> None:
for destination in destinations:
self._get_per_destination_queue(destination).send_pdu(pdu, order)

@defer.inlineCallbacks
def send_read_receipt(self, receipt: ReadReceipt):
async def send_read_receipt(self, receipt: ReadReceipt) -> None:
"""Send a RR to any other servers in the room

Args:
Expand Down Expand Up @@ -330,9 +329,7 @@ def send_read_receipt(self, receipt: ReadReceipt):
room_id = receipt.room_id

# Work out which remote servers should be poked and poke them.
domains = yield defer.ensureDeferred(
self.state.get_current_hosts_in_room(room_id)
)
domains = await self.state.get_current_hosts_in_room(room_id)
domains = [
d
for d in domains
Expand Down Expand Up @@ -387,8 +384,7 @@ def _flush_rrs_for_room(self, room_id: str) -> None:
queue.flush_read_receipts_for_room(room_id)

@preserve_fn # the caller should not yield on this
@defer.inlineCallbacks
def send_presence(self, states: List[UserPresenceState]):
async def send_presence(self, states: List[UserPresenceState]):
"""Send the new presence states to the appropriate destinations.

This actually queues up the presence states ready for sending and
Expand Down Expand Up @@ -423,7 +419,7 @@ def send_presence(self, states: List[UserPresenceState]):
if not states_map:
break

yield self._process_presence_inner(list(states_map.values()))
await self._process_presence_inner(list(states_map.values()))
except Exception:
logger.exception("Error sending presence states to servers")
finally:
Expand All @@ -450,14 +446,11 @@ def send_presence_to_destinations(
self._get_per_destination_queue(destination).send_presence(states)

@measure_func("txnqueue._process_presence")
@defer.inlineCallbacks
def _process_presence_inner(self, states: List[UserPresenceState]):
async def _process_presence_inner(self, states: List[UserPresenceState]):
"""Given a list of states populate self.pending_presence_by_dest and
poke to send a new transaction to each destination
"""
hosts_and_states = yield defer.ensureDeferred(
get_interested_remotes(self.store, states, self.state)
)
hosts_and_states = await get_interested_remotes(self.store, states, self.state)

for destinations, states in hosts_and_states:
for destination in destinations:
Expand Down
Loading