Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Use run_in_background in preference to preserve_fn #3140

Merged
merged 3 commits into from
Apr 29, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions synapse/app/federation_sender.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
from synapse.storage.engines import create_engine
from synapse.util.async import Linearizer
from synapse.util.httpresourcetree import create_resource_tree
from synapse.util.logcontext import LoggingContext, preserve_fn
from synapse.util.logcontext import LoggingContext, run_in_background
from synapse.util.manhole import manhole
from synapse.util.versionstring import get_version_string
from twisted.internet import defer, reactor
Expand Down Expand Up @@ -229,7 +229,7 @@ def process_replication_rows(self, stream_name, token, rows):
# presence, typing, etc.
if stream_name == "federation":
send_queue.process_rows_for_federation(self.federation_sender, rows)
preserve_fn(self.update_token)(token)
run_in_background(self.update_token, token)

# We also need to poke the federation sender when new events happen
elif stream_name == "events":
Expand Down
4 changes: 2 additions & 2 deletions synapse/app/pusher.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
from synapse.storage import DataStore
from synapse.storage.engines import create_engine
from synapse.util.httpresourcetree import create_resource_tree
from synapse.util.logcontext import LoggingContext, preserve_fn
from synapse.util.logcontext import LoggingContext, run_in_background
from synapse.util.manhole import manhole
from synapse.util.versionstring import get_version_string
from twisted.internet import defer, reactor
Expand Down Expand Up @@ -140,7 +140,7 @@ def __init__(self, hs):

def on_rdata(self, stream_name, token, rows):
super(PusherReplicationHandler, self).on_rdata(stream_name, token, rows)
preserve_fn(self.poke_pushers)(stream_name, token, rows)
run_in_background(self.poke_pushers, stream_name, token, rows)

@defer.inlineCallbacks
def poke_pushers(self, stream_name, token, rows):
Expand Down
5 changes: 2 additions & 3 deletions synapse/app/synchrotron.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
from synapse.storage.presence import UserPresenceState
from synapse.storage.roommember import RoomMemberStore
from synapse.util.httpresourcetree import create_resource_tree
from synapse.util.logcontext import LoggingContext, preserve_fn
from synapse.util.logcontext import LoggingContext, run_in_background
from synapse.util.manhole import manhole
from synapse.util.stringutils import random_string
from synapse.util.versionstring import get_version_string
Expand Down Expand Up @@ -327,8 +327,7 @@ def __init__(self, hs):

def on_rdata(self, stream_name, token, rows):
super(SyncReplicationHandler, self).on_rdata(stream_name, token, rows)

preserve_fn(self.process_and_notify)(stream_name, token, rows)
run_in_background(self.process_and_notify, stream_name, token, rows)

def get_streams_to_replicate(self):
args = super(SyncReplicationHandler, self).get_streams_to_replicate()
Expand Down
12 changes: 6 additions & 6 deletions synapse/appservice/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
from twisted.internet import defer

from synapse.appservice import ApplicationServiceState
from synapse.util.logcontext import preserve_fn
from synapse.util.logcontext import run_in_background
from synapse.util.metrics import Measure

import logging
Expand Down Expand Up @@ -106,7 +106,7 @@ def __init__(self, txn_ctrl, clock):
def enqueue(self, service, event):
# if this service isn't being sent something
self.queued_events.setdefault(service.id, []).append(event)
preserve_fn(self._send_request)(service)
run_in_background(self._send_request, service)

@defer.inlineCallbacks
def _send_request(self, service):
Expand Down Expand Up @@ -152,10 +152,10 @@ def send(self, service, events):
if sent:
yield txn.complete(self.store)
else:
preserve_fn(self._start_recoverer)(service)
except Exception as e:
logger.exception(e)
preserve_fn(self._start_recoverer)(service)
run_in_background(self._start_recoverer, service)
except Exception:
logger.exception("Error creating appservice transaction")
run_in_background(self._start_recoverer, service)

@defer.inlineCallbacks
def on_recovered(self, recoverer):
Expand Down
28 changes: 17 additions & 11 deletions synapse/crypto/keyring.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@
from synapse.util import unwrapFirstError, logcontext
from synapse.util.logcontext import (
PreserveLoggingContext,
preserve_fn
preserve_fn,
run_in_background,
)
from synapse.util.metrics import Measure

Expand Down Expand Up @@ -127,7 +128,7 @@ def verify_json_objects_for_server(self, server_and_json):

verify_requests.append(verify_request)

preserve_fn(self._start_key_lookups)(verify_requests)
run_in_background(self._start_key_lookups, verify_requests)

# Pass those keys to handle_key_deferred so that the json object
# signatures can be verified
Expand Down Expand Up @@ -316,7 +317,7 @@ def on_err(err):
if not verify_request.deferred.called:
verify_request.deferred.errback(err)

preserve_fn(do_iterations)().addErrback(on_err)
run_in_background(do_iterations).addErrback(on_err)

@defer.inlineCallbacks
def get_keys_from_store(self, server_name_and_key_ids):
Expand All @@ -332,8 +333,9 @@ def get_keys_from_store(self, server_name_and_key_ids):
"""
res = yield logcontext.make_deferred_yieldable(defer.gatherResults(
[
preserve_fn(self.store.get_server_verify_keys)(
server_name, key_ids
run_in_background(
self.store.get_server_verify_keys,
server_name, key_ids,
).addCallback(lambda ks, server: (server, ks), server_name)
for server_name, key_ids in server_name_and_key_ids
],
Expand Down Expand Up @@ -361,7 +363,7 @@ def get_key(perspective_name, perspective_keys):

results = yield logcontext.make_deferred_yieldable(defer.gatherResults(
[
preserve_fn(get_key)(p_name, p_keys)
run_in_background(get_key, p_name, p_keys)
for p_name, p_keys in self.perspective_servers.items()
],
consumeErrors=True,
Expand Down Expand Up @@ -401,7 +403,7 @@ def get_key(server_name, key_ids):

results = yield logcontext.make_deferred_yieldable(defer.gatherResults(
[
preserve_fn(get_key)(server_name, key_ids)
run_in_background(get_key, server_name, key_ids)
for server_name, key_ids in server_name_and_key_ids
],
consumeErrors=True,
Expand Down Expand Up @@ -484,7 +486,8 @@ def get_server_verify_key_v2_indirect(self, server_names_and_key_ids,

yield logcontext.make_deferred_yieldable(defer.gatherResults(
[
preserve_fn(self.store_keys)(
run_in_background(
self.store_keys,
server_name=server_name,
from_server=perspective_name,
verify_keys=response_keys,
Expand Down Expand Up @@ -542,7 +545,8 @@ def get_server_verify_key_v2_direct(self, server_name, key_ids):

yield logcontext.make_deferred_yieldable(defer.gatherResults(
[
preserve_fn(self.store_keys)(
run_in_background(
self.store_keys,
server_name=key_server_name,
from_server=server_name,
verify_keys=verify_keys,
Expand Down Expand Up @@ -618,7 +622,8 @@ def process_v2_response(self, from_server, response_json,

yield logcontext.make_deferred_yieldable(defer.gatherResults(
[
preserve_fn(self.store.store_server_keys_json)(
run_in_background(
self.store.store_server_keys_json,
server_name=server_name,
key_id=key_id,
from_server=server_name,
Expand Down Expand Up @@ -719,7 +724,8 @@ def store_keys(self, server_name, from_server, verify_keys):
# TODO(markjh): Store whether the keys have expired.
return logcontext.make_deferred_yieldable(defer.gatherResults(
[
preserve_fn(self.store.store_server_verify_key)(
run_in_background(
self.store.store_server_verify_key,
server_name, server_name, key.time_added, key
)
for key_id, key in verify_keys.items()
Expand Down
5 changes: 3 additions & 2 deletions synapse/federation/federation_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
import synapse.metrics
from synapse.util import logcontext, unwrapFirstError
from synapse.util.caches.expiringcache import ExpiringCache
from synapse.util.logcontext import make_deferred_yieldable, preserve_fn
from synapse.util.logcontext import make_deferred_yieldable, run_in_background
from synapse.util.logutils import log_function
from synapse.util.retryutils import NotRetryingDestination

Expand Down Expand Up @@ -417,7 +417,8 @@ def random_server_list():
batch = set(missing_events[i:i + batch_size])

deferreds = [
preserve_fn(self.get_pdu)(
run_in_background(
self.get_pdu,
destinations=random_server_list(),
event_id=e_id,
)
Expand Down
4 changes: 2 additions & 2 deletions synapse/groups/attestations.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@

from synapse.api.errors import SynapseError
from synapse.types import get_domain_from_id
from synapse.util.logcontext import preserve_fn
from synapse.util.logcontext import run_in_background

from signedjson.sign import sign_json

Expand Down Expand Up @@ -196,4 +196,4 @@ def _renew_attestation(group_id, user_id):
group_id = row["group_id"]
user_id = row["user_id"]

preserve_fn(_renew_attestation)(group_id, user_id)
run_in_background(_renew_attestation, group_id, user_id)
5 changes: 4 additions & 1 deletion synapse/handlers/appservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,10 @@ def query_3pe(self, kind, protocol, fields):
services = yield self._get_services_for_3pn(protocol)

results = yield make_deferred_yieldable(defer.DeferredList([
preserve_fn(self.appservice_api.query_3pe)(service, kind, protocol, fields)
run_in_background(
self.appservice_api.query_3pe,
service, kind, protocol, fields,
)
for service in services
], consumeErrors=True))

Expand Down
6 changes: 3 additions & 3 deletions synapse/handlers/e2e_keys.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
SynapseError, CodeMessageException, FederationDeniedError,
)
from synapse.types import get_domain_from_id, UserID
from synapse.util.logcontext import preserve_fn, make_deferred_yieldable
from synapse.util.logcontext import make_deferred_yieldable, run_in_background
from synapse.util.retryutils import NotRetryingDestination

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -139,7 +139,7 @@ def do_remote_query(destination):
failures[destination] = _exception_to_failure(e)

yield make_deferred_yieldable(defer.gatherResults([
preserve_fn(do_remote_query)(destination)
run_in_background(do_remote_query, destination)
for destination in remote_queries_not_in_cache
], consumeErrors=True))

Expand Down Expand Up @@ -242,7 +242,7 @@ def claim_client_keys(destination):
failures[destination] = _exception_to_failure(e)

yield make_deferred_yieldable(defer.gatherResults([
preserve_fn(claim_client_keys)(destination)
run_in_background(claim_client_keys, destination)
for destination in remote_queries
], consumeErrors=True))

Expand Down
16 changes: 10 additions & 6 deletions synapse/handlers/federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -639,7 +639,8 @@ def backfill(self, dest, room_id, limit, extremities):

results = yield logcontext.make_deferred_yieldable(defer.gatherResults(
[
logcontext.preserve_fn(self.replication_layer.get_pdu)(
logcontext.run_in_background(
self.replication_layer.get_pdu,
[dest],
event_id,
outlier=True,
Expand Down Expand Up @@ -1025,7 +1026,7 @@ def do_invite_join(self, target_hosts, room_id, joinee, content):
# lots of requests for missing prev_events which we do actually
# have. Hence we fire off the deferred, but don't wait for it.

logcontext.preserve_fn(self._handle_queued_pdus)(room_queue)
logcontext.run_in_background(self._handle_queued_pdus, room_queue)

defer.returnValue(True)

Expand Down Expand Up @@ -1527,8 +1528,9 @@ def _handle_new_event(self, origin, event, state=None, auth_events=None,
if not backfilled:
# this intentionally does not yield: we don't care about the result
# and don't need to wait for it.
logcontext.preserve_fn(self.pusher_pool.on_new_notifications)(
event_stream_id, max_stream_id
logcontext.run_in_background(
self.pusher_pool.on_new_notifications,
event_stream_id, max_stream_id,
)

defer.returnValue((context, event_stream_id, max_stream_id))
Expand All @@ -1542,7 +1544,8 @@ def _handle_new_events(self, origin, event_infos, backfilled=False):
"""
contexts = yield logcontext.make_deferred_yieldable(defer.gatherResults(
[
logcontext.preserve_fn(self._prep_event)(
logcontext.run_in_background(
self._prep_event,
origin,
ev_info["event"],
state=ev_info.get("state"),
Expand Down Expand Up @@ -1871,7 +1874,8 @@ def do_auth(self, origin, event, context, auth_events):

different_events = yield logcontext.make_deferred_yieldable(
defer.gatherResults([
logcontext.preserve_fn(self.store.get_event)(
logcontext.run_in_background(
self.store.get_event,
d,
allow_none=True,
allow_rejected=False,
Expand Down
12 changes: 7 additions & 5 deletions synapse/handlers/initial_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
from synapse.util import unwrapFirstError
from synapse.util.async import concurrently_execute
from synapse.util.caches.snapshot_cache import SnapshotCache
from synapse.util.logcontext import make_deferred_yieldable, preserve_fn
from synapse.util.logcontext import make_deferred_yieldable, run_in_background
from synapse.visibility import filter_events_for_client

from ._base import BaseHandler
Expand Down Expand Up @@ -166,7 +166,8 @@ def handle_room(event):
(messages, token), current_state = yield make_deferred_yieldable(
defer.gatherResults(
[
preserve_fn(self.store.get_recent_events_for_room)(
run_in_background(
self.store.get_recent_events_for_room,
event.room_id,
limit=limit,
end_token=room_end_token,
Expand Down Expand Up @@ -391,9 +392,10 @@ def get_receipts():

presence, receipts, (messages, token) = yield defer.gatherResults(
[
preserve_fn(get_presence)(),
preserve_fn(get_receipts)(),
preserve_fn(self.store.get_recent_events_for_room)(
run_in_background(get_presence),
run_in_background(get_receipts),
run_in_background(
self.store.get_recent_events_for_room,
room_id,
limit=limit,
end_token=now_token.room_key,
Expand Down
7 changes: 4 additions & 3 deletions synapse/handlers/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
UserID, RoomAlias, RoomStreamToken,
)
from synapse.util.async import run_on_reactor, ReadWriteLock, Limiter
from synapse.util.logcontext import preserve_fn, run_in_background
from synapse.util.logcontext import run_in_background
from synapse.util.metrics import measure_func
from synapse.util.frozenutils import frozendict_json_encoder
from synapse.util.stringutils import random_string
Expand Down Expand Up @@ -857,7 +857,8 @@ def is_inviter_member_event(e):

# this intentionally does not yield: we don't care about the result
# and don't need to wait for it.
preserve_fn(self.pusher_pool.on_new_notifications)(
run_in_background(
self.pusher_pool.on_new_notifications,
event_stream_id, max_stream_id
)

Expand All @@ -872,7 +873,7 @@ def _notify():
except Exception:
logger.exception("Error notifying about new room event")

preserve_fn(_notify)()
run_in_background(_notify)

if event.type == EventTypes.Message:
# We don't want to block sending messages on any presence code. This
Expand Down
7 changes: 4 additions & 3 deletions synapse/handlers/typing.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from twisted.internet import defer

from synapse.api.errors import SynapseError, AuthError
from synapse.util.logcontext import preserve_fn
from synapse.util.logcontext import run_in_background
from synapse.util.metrics import Measure
from synapse.util.wheel_timer import WheelTimer
from synapse.types import UserID, get_domain_from_id
Expand Down Expand Up @@ -97,7 +97,8 @@ def _handle_timeouts(self):
if self.hs.is_mine_id(member.user_id):
last_fed_poke = self._member_last_federation_poke.get(member, None)
if not last_fed_poke or last_fed_poke + FEDERATION_PING_INTERVAL <= now:
preserve_fn(self._push_remote)(
run_in_background(
self._push_remote,
member=member,
typing=True
)
Expand Down Expand Up @@ -196,7 +197,7 @@ def _stopped_typing(self, member):
def _push_update(self, member, typing):
if self.hs.is_mine_id(member.user_id):
# Only send updates for changes to our own users.
preserve_fn(self._push_remote)(member, typing)
run_in_background(self._push_remote, member, typing)

self._push_update_local(
member=member,
Expand Down
Loading