Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Convert run_as_background_process inner function into an async function #8032

Merged
merged 5 commits into from
Aug 6, 2020
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/8032.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Convert various parts of the codebase to async/await.
2 changes: 1 addition & 1 deletion synapse/handlers/appservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ async def handle_event(event):

async def start_scheduler():
try:
return self.scheduler.start()
return await self.scheduler.start()
except Exception:
logger.error("Application Services Failure")

Expand Down
5 changes: 2 additions & 3 deletions synapse/http/site.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,10 +146,9 @@ def processing(self):

Returns a context manager; the correct way to use this is:

@defer.inlineCallbacks
def handle_request(request):
async def handle_request(request):
with request.processing("FooServlet"):
yield really_handle_the_request()
await really_handle_the_request()

Once the context manager is closed, the completion of the request will be logged,
and the various metrics will be updated.
Expand Down
34 changes: 13 additions & 21 deletions synapse/metrics/background_process_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,15 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import inspect
import logging
import threading
from asyncio import iscoroutine
from functools import wraps
from typing import TYPE_CHECKING, Dict, Optional, Set

from prometheus_client.core import REGISTRY, Counter, Gauge

from twisted.internet import defer
from twisted.python.failure import Failure

from synapse.logging.context import LoggingContext, PreserveLoggingContext

Expand Down Expand Up @@ -167,7 +166,7 @@ def update_metrics(self):
)


def run_as_background_process(desc, func, *args, **kwargs):
def run_as_background_process(desc: str, func, *args, **kwargs):
"""Run the given function in its own logcontext, with resource metrics

This should be used to wrap processes which are fired off to run in the
Expand All @@ -179,7 +178,7 @@ def run_as_background_process(desc, func, *args, **kwargs):
normal synapse inlineCallbacks function).

Args:
desc (str): a description for this background process type
desc: a description for this background process type
func: a function, which may return a Deferred or a coroutine
args: positional args for func
kwargs: keyword args for func
Expand All @@ -188,8 +187,7 @@ def run_as_background_process(desc, func, *args, **kwargs):
follow the synapse logcontext rules.
"""

@defer.inlineCallbacks
def run():
async def run():
with _bg_metrics_lock:
count = _background_process_counts.get(desc, 0)
_background_process_counts[desc] = count + 1
Expand All @@ -203,29 +201,23 @@ def run():
try:
result = func(*args, **kwargs)

# We probably don't have an ensureDeferred in our call stack to handle
# coroutine results, so we need to ensureDeferred here.
#
# But we need this check because ensureDeferred doesn't like being
# But we need this check because await doesn't like being
# called on immediate values (as opposed to Deferreds or coroutines).
clokep marked this conversation as resolved.
Show resolved Hide resolved
if iscoroutine(result):
result = defer.ensureDeferred(result)
if inspect.isawaitable(result):
result = await result

return (yield result)
return result
except Exception:
# failure.Failure() fishes the original Failure out of our stack, and
# thus gives us a sensible stack trace.
f = Failure()
logger.error(
"Background process '%s' threw an exception",
desc,
exc_info=(f.type, f.value, f.getTracebackObject()),
logger.exception(
"Background process '%s' threw an exception", desc,
)
finally:
_background_process_in_flight_count.labels(desc).dec()

with PreserveLoggingContext():
return run()
# Note that we return a Deferred here so that it can be used in a
# looping_call and other places that expect a Deferred.
return defer.ensureDeferred(run())


def wrap_as_background_process(desc):
Expand Down