Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Merge commit 'a0acdfa9e' into anoa/dinsic_release_1_21_x
Browse files Browse the repository at this point in the history
* commit 'a0acdfa9e':
  Converts event_federation and registration databases to async/await (#8061)
  • Loading branch information
anoadragon453 committed Oct 19, 2020
2 parents 04f61d9 + a0acdfa commit 8857135
Show file tree
Hide file tree
Showing 9 changed files with 168 additions and 227 deletions.
1 change: 1 addition & 0 deletions changelog.d/8061.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Convert various parts of the codebase to async/await.
24 changes: 13 additions & 11 deletions synapse/handlers/account_validity.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@
from email.mime.text import MIMEText
from typing import List

from twisted.internet import defer

from synapse.api.errors import StoreError
from synapse.logging.context import make_deferred_yieldable
from synapse.metrics.background_process_metrics import run_as_background_process
Expand Down Expand Up @@ -90,12 +88,17 @@ def send_emails():

self.clock.looping_call(send_emails, 30 * 60 * 1000)

# If account_validity is enabled,check every hour to remove expired users from
# the user directory
# Mark users as inactive when they expired. Check once every hour
if self._account_validity.enabled:
self.clock.looping_call(
self._mark_expired_users_as_inactive, 60 * 60 * 1000
)

def mark_expired_users_as_inactive():
# run as a background process to allow async functions to work
return run_as_background_process(
"_mark_expired_users_as_inactive",
self._mark_expired_users_as_inactive,
)

self.clock.looping_call(mark_expired_users_as_inactive, 60 * 60 * 1000)

async def _send_renewal_emails(self):
"""Gets the list of users whose account is expiring in the amount of time
Expand Down Expand Up @@ -286,16 +289,15 @@ async def renew_account_for_user(

return expiration_ts

@defer.inlineCallbacks
def _mark_expired_users_as_inactive(self):
async def _mark_expired_users_as_inactive(self):
"""Iterate over active, expired users. Mark them as inactive in order to hide them
from the user directory.
Returns:
Deferred
"""
# Get active, expired users
active_expired_users = yield self.store.get_expired_users()
active_expired_users = await self.store.get_expired_users()

# Mark each as non-active
yield self.profile_handler.set_active(active_expired_users, False, True)
await self.profile_handler.set_active(active_expired_users, False, True)
16 changes: 0 additions & 16 deletions synapse/push/pusherpool.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,14 +192,6 @@ async def on_new_notifications(self, min_stream_id, max_stream_id):

for u in users_affected:
if u in self.pushers:
# Don't push if the user account has expired
if self._account_validity.enabled:
expired = await self.store.is_account_expired(
u, self.clock.time_msec()
)
if expired:
continue

for p in self.pushers[u].values():
p.on_new_notifications(min_stream_id, max_stream_id)

Expand All @@ -220,14 +212,6 @@ async def on_new_receipts(self, min_stream_id, max_stream_id, affected_room_ids)

for u in users_affected:
if u in self.pushers:
# Don't push if the user account has expired
if self._account_validity.enabled:
expired = yield self.store.is_account_expired(
u, self.clock.time_msec()
)
if expired:
continue

for p in self.pushers[u].values():
p.on_new_receipts(min_stream_id, max_stream_id)

Expand Down
38 changes: 13 additions & 25 deletions synapse/storage/databases/main/event_federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,7 @@
import itertools
import logging
from queue import Empty, PriorityQueue
from typing import Dict, List, Optional, Set, Tuple

from twisted.internet import defer
from typing import Dict, Iterable, List, Optional, Set, Tuple

from synapse.api.errors import StoreError
from synapse.metrics.background_process_metrics import run_as_background_process
Expand Down Expand Up @@ -286,17 +284,13 @@ def get_oldest_events_with_depth_in_room_txn(self, txn, room_id):

return dict(txn)

@defer.inlineCallbacks
def get_max_depth_of(self, event_ids):
async def get_max_depth_of(self, event_ids: List[str]) -> int:
"""Returns the max depth of a set of event IDs
Args:
event_ids (list[str])
Returns
Deferred[int]
event_ids: The event IDs to calculate the max depth of.
"""
rows = yield self.db_pool.simple_select_many_batch(
rows = await self.db_pool.simple_select_many_batch(
table="events",
column="event_id",
iterable=event_ids,
Expand Down Expand Up @@ -550,17 +544,16 @@ def _get_backfill_events(self, txn, room_id, event_list, limit):

return event_results

@defer.inlineCallbacks
def get_missing_events(self, room_id, earliest_events, latest_events, limit):
ids = yield self.db_pool.runInteraction(
async def get_missing_events(self, room_id, earliest_events, latest_events, limit):
ids = await self.db_pool.runInteraction(
"get_missing_events",
self._get_missing_events,
room_id,
earliest_events,
latest_events,
limit,
)
events = yield self.get_events_as_list(ids)
events = await self.get_events_as_list(ids)
return events

def _get_missing_events(self, txn, room_id, earliest_events, latest_events, limit):
Expand Down Expand Up @@ -595,17 +588,13 @@ def _get_missing_events(self, txn, room_id, earliest_events, latest_events, limi
event_results.reverse()
return event_results

@defer.inlineCallbacks
def get_successor_events(self, event_ids):
async def get_successor_events(self, event_ids: Iterable[str]) -> List[str]:
"""Fetch all events that have the given events as a prev event
Args:
event_ids (iterable[str])
Returns:
Deferred[list[str]]
event_ids: The events to use as the previous events.
"""
rows = yield self.db_pool.simple_select_many_batch(
rows = await self.db_pool.simple_select_many_batch(
table="event_edges",
column="prev_event_id",
iterable=event_ids,
Expand Down Expand Up @@ -674,8 +663,7 @@ def _clean_room_for_join_txn(self, txn, room_id):
txn.execute(query, (room_id,))
txn.call_after(self.get_latest_event_ids_in_room.invalidate, (room_id,))

@defer.inlineCallbacks
def _background_delete_non_state_event_auth(self, progress, batch_size):
async def _background_delete_non_state_event_auth(self, progress, batch_size):
def delete_event_auth(txn):
target_min_stream_id = progress.get("target_min_stream_id_inclusive")
max_stream_id = progress.get("max_stream_id_exclusive")
Expand Down Expand Up @@ -714,12 +702,12 @@ def delete_event_auth(txn):

return min_stream_id >= target_min_stream_id

result = yield self.db_pool.runInteraction(
result = await self.db_pool.runInteraction(
self.EVENT_AUTH_STATE_ONLY, delete_event_auth
)

if not result:
yield self.db_pool.updates._end_background_update(
await self.db_pool.updates._end_background_update(
self.EVENT_AUTH_STATE_ONLY
)

Expand Down
Loading

0 comments on commit 8857135

Please sign in to comment.