Skip to content

Commit

Permalink
Fix logging and close
Browse files Browse the repository at this point in the history
  • Loading branch information
hendrikmakait committed Aug 1, 2024
1 parent 539969b commit efb9837
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 11 deletions.
3 changes: 1 addition & 2 deletions distributed/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -5409,8 +5409,7 @@ async def _unregister_worker_plugin(self, name, nanny=None):

for response in responses.values():
if response["status"] == "error":
exc = response["exception"]
tb = response["traceback"]
_, exc, tb = clean_exception(**response)
raise exc.with_traceback(tb)
return responses

Expand Down
30 changes: 29 additions & 1 deletion distributed/diagnostics/tests/test_nanny_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ def setup(self, nanny):


@gen_cluster(client=True, nthreads=[("", 1)], Worker=Nanny)
async def test_register_plugin_with_broken_setup_to_existing_nanny_raises(c, s, a):
async def test_register_plugin_with_broken_setup_to_existing_nannies_raises(c, s, a):
with pytest.raises(RuntimeError, match="test error"):
with captured_logger("distributed.nanny", level=logging.ERROR) as caplog:
await c.register_plugin(BrokenSetupPlugin(), name="TestPlugin1")
Expand All @@ -189,3 +189,31 @@ async def test_plugin_with_broken_setup_on_new_nanny_logs(c, s):
logs = caplog.getvalue()
assert "TestPlugin1 failed to setup" in logs
assert "test error" in logs


class BrokenTeardownPlugin(NannyPlugin):
def teardown(self, nanny):
raise RuntimeError("test error")


@gen_cluster(client=True, nthreads=[("", 1)], Worker=Nanny)
async def test_unregister_nanny_plugin_with_broken_teardown_raises(c, s, a):
await c.register_plugin(BrokenTeardownPlugin(), name="TestPlugin1")
with pytest.raises(RuntimeError, match="test error"):
with captured_logger("distributed.nanny", level=logging.ERROR) as caplog:
await c.unregister_worker_plugin("TestPlugin1", nanny=True)
logs = caplog.getvalue()
assert "TestPlugin1 failed to teardown" in logs
assert "test error" in logs


@gen_cluster(client=True, nthreads=[])
async def test_nanny_plugin_with_broken_teardown_logs_on_close(c, s):
await c.register_plugin(BrokenTeardownPlugin(), name="TestPlugin1")

with captured_logger("distributed.nanny", level=logging.ERROR) as caplog:
async with Nanny(s.address):
pass
logs = caplog.getvalue()
assert "TestPlugin1 failed to teardown" in logs
assert "test error" in logs
11 changes: 3 additions & 8 deletions distributed/nanny.py
Original file line number Diff line number Diff line change
Expand Up @@ -484,7 +484,7 @@ async def plugin_add(
if isawaitable(result):
result = await result
except Exception as e:
logger.exception("Worker plugin %s failed to setup", name)
logger.exception("Nanny plugin %s failed to setup", name)
return error_message(e)
if getattr(plugin, "restart", False):
await self.restart(reason=f"nanny-plugin-{name}-restart")
Expand All @@ -501,6 +501,7 @@ async def plugin_remove(self, name: str) -> ErrorMessage | OKMessage:
if isawaitable(result):
result = await result
except Exception as e:
logger.exception("Nanny plugin %s failed to teardown", name)
msg = error_message(e)
return msg

Expand Down Expand Up @@ -611,13 +612,7 @@ async def close( # type:ignore[override]

await self.preloads.teardown()

teardowns = [
plugin.teardown(self)
for plugin in self.plugins.values()
if hasattr(plugin, "teardown")
]

await asyncio.gather(*(td for td in teardowns if isawaitable(td)))
await asyncio.gather(*(self.plugin_remove(name) for name in self.plugins))

self.stop()
if self.process is not None:
Expand Down

0 comments on commit efb9837

Please sign in to comment.