Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Converts event_federation and registration databases to async/await #8061

Merged
merged 5 commits into from
Aug 11, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/8061.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Convert various parts of the codebase to async/await.
38 changes: 13 additions & 25 deletions synapse/storage/databases/main/event_federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,7 @@
import itertools
import logging
from queue import Empty, PriorityQueue
from typing import Dict, List, Optional, Set, Tuple

from twisted.internet import defer
from typing import Dict, Iterable, List, Optional, Set, Tuple

from synapse.api.errors import StoreError
from synapse.metrics.background_process_metrics import run_as_background_process
Expand Down Expand Up @@ -286,17 +284,13 @@ def get_oldest_events_with_depth_in_room_txn(self, txn, room_id):

return dict(txn)

@defer.inlineCallbacks
def get_max_depth_of(self, event_ids):
async def get_max_depth_of(self, event_ids: List[str]) -> int:
"""Returns the max depth of a set of event IDs

Args:
event_ids (list[str])

Returns
Deferred[int]
event_ids: The event IDs to calculate the max depth of.
"""
rows = yield self.db_pool.simple_select_many_batch(
rows = await self.db_pool.simple_select_many_batch(
table="events",
column="event_id",
iterable=event_ids,
Expand Down Expand Up @@ -550,17 +544,16 @@ def _get_backfill_events(self, txn, room_id, event_list, limit):

return event_results

@defer.inlineCallbacks
def get_missing_events(self, room_id, earliest_events, latest_events, limit):
ids = yield self.db_pool.runInteraction(
async def get_missing_events(self, room_id, earliest_events, latest_events, limit):
ids = await self.db_pool.runInteraction(
"get_missing_events",
self._get_missing_events,
room_id,
earliest_events,
latest_events,
limit,
)
events = yield self.get_events_as_list(ids)
events = await self.get_events_as_list(ids)
return events

def _get_missing_events(self, txn, room_id, earliest_events, latest_events, limit):
Expand Down Expand Up @@ -595,17 +588,13 @@ def _get_missing_events(self, txn, room_id, earliest_events, latest_events, limi
event_results.reverse()
return event_results

@defer.inlineCallbacks
def get_successor_events(self, event_ids):
async def get_successor_events(self, event_ids: Iterable[str]) -> List[str]:
"""Fetch all events that have the given events as a prev event

Args:
event_ids (iterable[str])

Returns:
Deferred[list[str]]
event_ids: The events to use as the previous events.
"""
rows = yield self.db_pool.simple_select_many_batch(
rows = await self.db_pool.simple_select_many_batch(
table="event_edges",
column="prev_event_id",
iterable=event_ids,
Expand Down Expand Up @@ -674,8 +663,7 @@ def _clean_room_for_join_txn(self, txn, room_id):
txn.execute(query, (room_id,))
txn.call_after(self.get_latest_event_ids_in_room.invalidate, (room_id,))

@defer.inlineCallbacks
def _background_delete_non_state_event_auth(self, progress, batch_size):
async def _background_delete_non_state_event_auth(self, progress, batch_size):
def delete_event_auth(txn):
target_min_stream_id = progress.get("target_min_stream_id_inclusive")
max_stream_id = progress.get("max_stream_id_exclusive")
Expand Down Expand Up @@ -714,12 +702,12 @@ def delete_event_auth(txn):

return min_stream_id >= target_min_stream_id

result = yield self.db_pool.runInteraction(
result = await self.db_pool.runInteraction(
self.EVENT_AUTH_STATE_ONLY, delete_event_auth
)

if not result:
yield self.db_pool.updates._end_background_update(
await self.db_pool.updates._end_background_update(
self.EVENT_AUTH_STATE_ONLY
)

Expand Down
Loading