Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Convert additional databases to async/await part 2 (#8200)
Browse files Browse the repository at this point in the history
  • Loading branch information
clokep authored Sep 1, 2020
1 parent bbb3c86 commit da77520
Show file tree
Hide file tree
Showing 15 changed files with 111 additions and 81 deletions.
1 change: 1 addition & 0 deletions changelog.d/8200.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Convert various parts of the codebase to async/await.
19 changes: 12 additions & 7 deletions synapse/events/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Optional
from typing import Any, Dict, List, Optional, Tuple, Union

import attr
from nacl.signing import SigningKey
Expand Down Expand Up @@ -97,14 +97,14 @@ def state_key(self):
def is_state(self):
return self._state_key is not None

async def build(self, prev_event_ids):
async def build(self, prev_event_ids: List[str]) -> EventBase:
"""Transform into a fully signed and hashed event
Args:
prev_event_ids (list[str]): The event IDs to use as the prev events
prev_event_ids: The event IDs to use as the prev events
Returns:
FrozenEvent
The signed and hashed event.
"""

state_ids = await self._state.get_current_state_ids(
Expand All @@ -114,8 +114,13 @@ async def build(self, prev_event_ids):

format_version = self.room_version.event_format
if format_version == EventFormatVersions.V1:
auth_events = await self._store.add_event_hashes(auth_ids)
prev_events = await self._store.add_event_hashes(prev_event_ids)
# The types of auth/prev events changes between event versions.
auth_events = await self._store.add_event_hashes(
auth_ids
) # type: Union[List[str], List[Tuple[str, Dict[str, str]]]]
prev_events = await self._store.add_event_hashes(
prev_event_ids
) # type: Union[List[str], List[Tuple[str, Dict[str, str]]]]
else:
auth_events = auth_ids
prev_events = prev_event_ids
Expand All @@ -138,7 +143,7 @@ async def build(self, prev_event_ids):
"unsigned": self.unsigned,
"depth": depth,
"prev_state": [],
}
} # type: Dict[str, Any]

if self.is_state():
event_dict["state_key"] = self._state_key
Expand Down
13 changes: 3 additions & 10 deletions synapse/handlers/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,7 @@
from synapse.replication.http.send_event import ReplicationSendEventRestServlet
from synapse.storage.databases.main.events_worker import EventRedactBehaviour
from synapse.storage.state import StateFilter
from synapse.types import (
Collection,
Requester,
RoomAlias,
StreamToken,
UserID,
create_requester,
)
from synapse.types import Requester, RoomAlias, StreamToken, UserID, create_requester
from synapse.util import json_decoder
from synapse.util.async_helpers import Linearizer
from synapse.util.frozenutils import frozendict_json_encoder
Expand Down Expand Up @@ -446,7 +439,7 @@ async def create_event(
event_dict: dict,
token_id: Optional[str] = None,
txn_id: Optional[str] = None,
prev_event_ids: Optional[Collection[str]] = None,
prev_event_ids: Optional[List[str]] = None,
require_consent: bool = True,
) -> Tuple[EventBase, EventContext]:
"""
Expand Down Expand Up @@ -786,7 +779,7 @@ async def create_new_client_event(
self,
builder: EventBuilder,
requester: Optional[Requester] = None,
prev_event_ids: Optional[Collection[str]] = None,
prev_event_ids: Optional[List[str]] = None,
) -> Tuple[EventBase, EventContext]:
"""Create a new event for a local client
Expand Down
12 changes: 2 additions & 10 deletions synapse/handlers/room_member.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,7 @@
from synapse.events.snapshot import EventContext
from synapse.events.validator import EventValidator
from synapse.storage.roommember import RoomsForUser
from synapse.types import (
Collection,
JsonDict,
Requester,
RoomAlias,
RoomID,
StateMap,
UserID,
)
from synapse.types import JsonDict, Requester, RoomAlias, RoomID, StateMap, UserID
from synapse.util.async_helpers import Linearizer
from synapse.util.distributor import user_joined_room, user_left_room

Expand Down Expand Up @@ -184,7 +176,7 @@ async def _local_membership_update(
target: UserID,
room_id: str,
membership: str,
prev_event_ids: Collection[str],
prev_event_ids: List[str],
txn_id: Optional[str] = None,
ratelimit: bool = True,
content: Optional[dict] = None,
Expand Down
4 changes: 2 additions & 2 deletions synapse/storage/databases/main/client_ips.py
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,7 @@ async def insert_client_ip(
self._batch_row_update[key] = (user_agent, device_id, now)

@wrap_as_background_process("update_client_ips")
def _update_client_ips_batch(self):
async def _update_client_ips_batch(self) -> None:

# If the DB pool has already terminated, don't try updating
if not self.db_pool.is_running():
Expand All @@ -405,7 +405,7 @@ def _update_client_ips_batch(self):
to_update = self._batch_row_update
self._batch_row_update = {}

return self.db_pool.runInteraction(
await self.db_pool.runInteraction(
"_update_client_ips_batch", self._update_client_ips_batch_txn, to_update
)

Expand Down
6 changes: 3 additions & 3 deletions synapse/storage/databases/main/directory.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,9 +159,9 @@ def _delete_room_alias_txn(self, txn, room_alias: RoomAlias) -> str:

return room_id

def update_aliases_for_room(
async def update_aliases_for_room(
self, old_room_id: str, new_room_id: str, creator: Optional[str] = None,
):
) -> None:
"""Repoint all of the aliases for a given room, to a different room.
Args:
Expand Down Expand Up @@ -189,6 +189,6 @@ def _update_aliases_for_room_txn(txn):
txn, self.get_aliases_for_room, (new_room_id,)
)

return self.db_pool.runInteraction(
await self.db_pool.runInteraction(
"_update_aliases_for_room_txn", _update_aliases_for_room_txn
)
5 changes: 3 additions & 2 deletions synapse/storage/databases/main/filtering.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

from synapse.api.errors import Codes, SynapseError
from synapse.storage._base import SQLBaseStore, db_to_json
from synapse.types import JsonDict
from synapse.util.caches.descriptors import cached


Expand All @@ -40,7 +41,7 @@ async def get_user_filter(self, user_localpart, filter_id):

return db_to_json(def_json)

def add_user_filter(self, user_localpart, user_filter):
async def add_user_filter(self, user_localpart: str, user_filter: JsonDict) -> str:
def_json = encode_canonical_json(user_filter)

# Need an atomic transaction to SELECT the maximal ID so far then
Expand Down Expand Up @@ -71,4 +72,4 @@ def _do_txn(txn):

return filter_id

return self.db_pool.runInteraction("add_user_filter", _do_txn)
return await self.db_pool.runInteraction("add_user_filter", _do_txn)
8 changes: 6 additions & 2 deletions synapse/storage/databases/main/openid.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from typing import Optional

from synapse.storage._base import SQLBaseStore


Expand All @@ -15,7 +17,9 @@ async def insert_open_id_token(
desc="insert_open_id_token",
)

def get_user_id_for_open_id_token(self, token, ts_now_ms):
async def get_user_id_for_open_id_token(
self, token: str, ts_now_ms: int
) -> Optional[str]:
def get_user_id_for_token_txn(txn):
sql = (
"SELECT user_id FROM open_id_tokens"
Expand All @@ -30,6 +34,6 @@ def get_user_id_for_token_txn(txn):
else:
return rows[0][0]

return self.db_pool.runInteraction(
return await self.db_pool.runInteraction(
"get_user_id_for_token", get_user_id_for_token_txn
)
6 changes: 4 additions & 2 deletions synapse/storage/databases/main/profile.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,9 @@ async def maybe_delete_remote_profile_cache(self, user_id):
desc="delete_remote_profile_cache",
)

def get_remote_profile_cache_entries_that_expire(self, last_checked):
async def get_remote_profile_cache_entries_that_expire(
self, last_checked: int
) -> Dict[str, str]:
"""Get all users who haven't been checked since `last_checked`
"""

Expand All @@ -153,7 +155,7 @@ def _get_remote_profile_cache_entries_that_expire_txn(txn):

return self.db_pool.cursor_to_dict(txn)

return self.db_pool.runInteraction(
return await self.db_pool.runInteraction(
"get_remote_profile_cache_entries_that_expire",
_get_remote_profile_cache_entries_that_expire_txn,
)
Expand Down
10 changes: 5 additions & 5 deletions synapse/storage/databases/main/push_rule.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@
import logging
from typing import List, Tuple, Union

from twisted.internet import defer

from synapse.push.baserules import list_with_base_rules
from synapse.replication.slave.storage._slaved_id_tracker import SlavedIdTracker
from synapse.storage._base import SQLBaseStore, db_to_json
Expand Down Expand Up @@ -149,9 +147,11 @@ async def get_push_rules_enabled_for_user(self, user_id):
)
return {r["rule_id"]: False if r["enabled"] == 0 else True for r in results}

def have_push_rules_changed_for_user(self, user_id, last_id):
async def have_push_rules_changed_for_user(
self, user_id: str, last_id: int
) -> bool:
if not self.push_rules_stream_cache.has_entity_changed(user_id, last_id):
return defer.succeed(False)
return False
else:

def have_push_rules_changed_txn(txn):
Expand All @@ -163,7 +163,7 @@ def have_push_rules_changed_txn(txn):
(count,) = txn.fetchone()
return bool(count)

return self.db_pool.runInteraction(
return await self.db_pool.runInteraction(
"have_push_rules_changed", have_push_rules_changed_txn
)

Expand Down
Loading

0 comments on commit da77520

Please sign in to comment.