Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Fix 'Unhandled error in Deferred' (#12089)
Browse files Browse the repository at this point in the history
* Fix 'Unhandled error in Deferred'

Fixes a CRITICAL "Unhandled error in Deferred" log message which happened when
a function wrapped with `@cachedList` failed

* Minor optimisation to cachedListDescriptor

we can avoid re-using `missing`, which saves looking up entries in
`deferreds_map`, and means we don't need to copy it.

* Improve type annotation on CachedListDescriptor
  • Loading branch information
richvdh authored Mar 1, 2022
1 parent 9d11fee commit 5458eb8
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 37 deletions.
1 change: 1 addition & 0 deletions changelog.d/12089.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix occasional 'Unhandled error in Deferred' error message.
64 changes: 32 additions & 32 deletions synapse/util/caches/descriptors.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import logging
from typing import (
Any,
Awaitable,
Callable,
Dict,
Generic,
Expand Down Expand Up @@ -346,15 +347,15 @@ class DeferredCacheListDescriptor(_CacheDescriptorBase):
"""Wraps an existing cache to support bulk fetching of keys.
Given an iterable of keys it looks in the cache to find any hits, then passes
the tuple of missing keys to the wrapped function.
the set of missing keys to the wrapped function.
Once wrapped, the function returns a Deferred which resolves to the list
of results.
Once wrapped, the function returns a Deferred which resolves to a Dict mapping from
input key to output value.
"""

def __init__(
self,
orig: Callable[..., Any],
orig: Callable[..., Awaitable[Dict]],
cached_method_name: str,
list_name: str,
num_args: Optional[int] = None,
Expand Down Expand Up @@ -385,13 +386,13 @@ def __init__(

def __get__(
self, obj: Optional[Any], objtype: Optional[Type] = None
) -> Callable[..., Any]:
) -> Callable[..., "defer.Deferred[Dict[Hashable, Any]]"]:
cached_method = getattr(obj, self.cached_method_name)
cache: DeferredCache[CacheKey, Any] = cached_method.cache
num_args = cached_method.num_args

@functools.wraps(self.orig)
def wrapped(*args: Any, **kwargs: Any) -> Any:
def wrapped(*args: Any, **kwargs: Any) -> "defer.Deferred[Dict]":
# If we're passed a cache_context then we'll want to call its
# invalidate() whenever we are invalidated
invalidate_callback = kwargs.pop("on_invalidate", None)
Expand Down Expand Up @@ -444,39 +445,38 @@ def arg_to_cache_key(arg: Hashable) -> Hashable:
deferred: "defer.Deferred[Any]" = defer.Deferred()
deferreds_map[arg] = deferred
key = arg_to_cache_key(arg)
cache.set(key, deferred, callback=invalidate_callback)
cached_defers.append(
cache.set(key, deferred, callback=invalidate_callback)
)

def complete_all(res: Dict[Hashable, Any]) -> None:
# the wrapped function has completed. It returns a
# a dict. We can now resolve the observable deferreds in
# the cache and update our own result map.
for e in missing:
# the wrapped function has completed. It returns a dict.
# We can now update our own result map, and then resolve the
# observable deferreds in the cache.
for e, d1 in deferreds_map.items():
val = res.get(e, None)
deferreds_map[e].callback(val)
# make sure we update the results map before running the
# deferreds, because as soon as we run the last deferred, the
# gatherResults() below will complete and return the result
# dict to our caller.
results[e] = val
d1.callback(val)

def errback(f: Failure) -> Failure:
# the wrapped function has failed. Invalidate any cache
# entries we're supposed to be populating, and fail
# their deferreds.
for e in missing:
key = arg_to_cache_key(e)
cache.invalidate(key)
deferreds_map[e].errback(f)

# return the failure, to propagate to our caller.
return f
def errback_all(f: Failure) -> None:
# the wrapped function has failed. Propagate the failure into
# the cache, which will invalidate the entry, and cause the
# relevant cached_deferreds to fail, which will propagate the
# failure to our caller.
for d1 in deferreds_map.values():
d1.errback(f)

args_to_call = dict(arg_dict)
# copy the missing set before sending it to the callee, to guard against
# modification.
args_to_call[self.list_name] = tuple(missing)

cached_defers.append(
defer.maybeDeferred(
preserve_fn(self.orig), **args_to_call
).addCallbacks(complete_all, errback)
)
args_to_call[self.list_name] = missing

# dispatch the call, and attach the two handlers
defer.maybeDeferred(
preserve_fn(self.orig), **args_to_call
).addCallbacks(complete_all, errback_all)

if cached_defers:
d = defer.gatherResults(cached_defers, consumeErrors=True).addCallbacks(
Expand Down
10 changes: 5 additions & 5 deletions tests/util/caches/test_descriptors.py
Original file line number Diff line number Diff line change
Expand Up @@ -673,14 +673,14 @@ async def list_fn(self, args1, arg2):
self.assertEqual(current_context(), SENTINEL_CONTEXT)
r = yield d1
self.assertEqual(current_context(), c1)
obj.mock.assert_called_once_with((10, 20), 2)
obj.mock.assert_called_once_with({10, 20}, 2)
self.assertEqual(r, {10: "fish", 20: "chips"})
obj.mock.reset_mock()

# a call with different params should call the mock again
obj.mock.return_value = {30: "peas"}
r = yield obj.list_fn([20, 30], 2)
obj.mock.assert_called_once_with((30,), 2)
obj.mock.assert_called_once_with({30}, 2)
self.assertEqual(r, {20: "chips", 30: "peas"})
obj.mock.reset_mock()

Expand All @@ -701,7 +701,7 @@ async def list_fn(self, args1, arg2):
obj.mock.return_value = {40: "gravy"}
iterable = (x for x in [10, 40, 40])
r = yield obj.list_fn(iterable, 2)
obj.mock.assert_called_once_with((40,), 2)
obj.mock.assert_called_once_with({40}, 2)
self.assertEqual(r, {10: "fish", 40: "gravy"})

def test_concurrent_lookups(self):
Expand Down Expand Up @@ -729,7 +729,7 @@ def list_fn(self, args1) -> "Deferred[dict]":
d3 = obj.list_fn([10])

# the mock should have been called exactly once
obj.mock.assert_called_once_with((10,))
obj.mock.assert_called_once_with({10})
obj.mock.reset_mock()

# ... and none of the calls should yet be complete
Expand Down Expand Up @@ -771,7 +771,7 @@ async def list_fn(self, args1, arg2):
# cache miss
obj.mock.return_value = {10: "fish", 20: "chips"}
r1 = yield obj.list_fn([10, 20], 2, on_invalidate=invalidate0)
obj.mock.assert_called_once_with((10, 20), 2)
obj.mock.assert_called_once_with({10, 20}, 2)
self.assertEqual(r1, {10: "fish", 20: "chips"})
obj.mock.reset_mock()

Expand Down

0 comments on commit 5458eb8

Please sign in to comment.