diff --git a/distributed/client.py b/distributed/client.py index ad283a352a..0601b0db5f 100644 --- a/distributed/client.py +++ b/distributed/client.py @@ -5409,8 +5409,7 @@ async def _unregister_worker_plugin(self, name, nanny=None): for response in responses.values(): if response["status"] == "error": - exc = response["exception"] - tb = response["traceback"] + _, exc, tb = clean_exception(**response) raise exc.with_traceback(tb) return responses diff --git a/distributed/diagnostics/tests/test_nanny_plugin.py b/distributed/diagnostics/tests/test_nanny_plugin.py index 7906e9a066..3c481dce26 100644 --- a/distributed/diagnostics/tests/test_nanny_plugin.py +++ b/distributed/diagnostics/tests/test_nanny_plugin.py @@ -170,7 +170,7 @@ def setup(self, nanny): @gen_cluster(client=True, nthreads=[("", 1)], Worker=Nanny) -async def test_register_plugin_with_broken_setup_to_existing_nanny_raises(c, s, a): +async def test_register_plugin_with_broken_setup_to_existing_nannies_raises(c, s, a): with pytest.raises(RuntimeError, match="test error"): with captured_logger("distributed.nanny", level=logging.ERROR) as caplog: await c.register_plugin(BrokenSetupPlugin(), name="TestPlugin1") @@ -189,3 +189,31 @@ async def test_plugin_with_broken_setup_on_new_nanny_logs(c, s): logs = caplog.getvalue() assert "TestPlugin1 failed to setup" in logs assert "test error" in logs + + +class BrokenTeardownPlugin(NannyPlugin): + def teardown(self, nanny): + raise RuntimeError("test error") + + +@gen_cluster(client=True, nthreads=[("", 1)], Worker=Nanny) +async def test_unregister_nanny_plugin_with_broken_teardown_raises(c, s, a): + await c.register_plugin(BrokenTeardownPlugin(), name="TestPlugin1") + with pytest.raises(RuntimeError, match="test error"): + with captured_logger("distributed.nanny", level=logging.ERROR) as caplog: + await c.unregister_worker_plugin("TestPlugin1", nanny=True) + logs = caplog.getvalue() + assert "TestPlugin1 failed to teardown" in logs + assert "test error" in logs + + +@gen_cluster(client=True, nthreads=[]) +async def test_nanny_plugin_with_broken_teardown_logs_on_close(c, s): + await c.register_plugin(BrokenTeardownPlugin(), name="TestPlugin1") + + with captured_logger("distributed.nanny", level=logging.ERROR) as caplog: + async with Nanny(s.address): + pass + logs = caplog.getvalue() + assert "TestPlugin1 failed to teardown" in logs + assert "test error" in logs diff --git a/distributed/nanny.py b/distributed/nanny.py index 711dd804e9..7a14ee6576 100644 --- a/distributed/nanny.py +++ b/distributed/nanny.py @@ -484,7 +484,7 @@ async def plugin_add( if isawaitable(result): result = await result except Exception as e: - logger.exception("Worker plugin %s failed to setup", name) + logger.exception("Nanny plugin %s failed to setup", name) return error_message(e) if getattr(plugin, "restart", False): await self.restart(reason=f"nanny-plugin-{name}-restart") @@ -501,6 +501,7 @@ async def plugin_remove(self, name: str) -> ErrorMessage | OKMessage: if isawaitable(result): result = await result except Exception as e: + logger.exception("Nanny plugin %s failed to teardown", name) msg = error_message(e) return msg @@ -611,13 +612,7 @@ async def close( # type:ignore[override] await self.preloads.teardown() - teardowns = [ - plugin.teardown(self) - for plugin in self.plugins.values() - if hasattr(plugin, "teardown") - ] - - await asyncio.gather(*(td for td in teardowns if isawaitable(td))) + await asyncio.gather(*(self.plugin_remove(name) for name in self.plugins)) self.stop() if self.process is not None: