diff --git a/distributed/core.py b/distributed/core.py index 28467594b5..d5eacb7d3c 100644 --- a/distributed/core.py +++ b/distributed/core.py @@ -1174,11 +1174,11 @@ async def send_recv_from_rpc(**kwargs): except (RPCClosed, CommClosedError) as e: if comm: raise type(e)( - f"Exception while trying to call remote method {key!r} before comm was established." + f"Exception while trying to call remote method {key!r} using comm {comm!r}." ) from e else: raise type(e)( - f"Exception while trying to call remote method {key!r} using comm {comm!r}." + f"Exception while trying to call remote method {key!r} before comm was established." ) from e self.comms[comm] = True # mark as open diff --git a/distributed/tests/test_core.py b/distributed/tests/test_core.py index af25353bf2..0e94dc96a5 100644 --- a/distributed/tests/test_core.py +++ b/distributed/tests/test_core.py @@ -22,6 +22,7 @@ from distributed.comm.tcp import TCPBackend, TCPListener from distributed.core import ( ConnectionPool, + RPCClosed, Server, Status, _expects_comm, @@ -923,6 +924,20 @@ async def test_rpc_serialization(): assert result == {"result": inc} +@gen_test() +async def test_rpc_closed_exception(): + async with Server({"echo": echo_serialize}) as server: + await server.listen("tcp://") + + async with rpc(server.address, serializers=["msgpack"]) as r: + r.status = Status.closed + with pytest.raises( + RPCClosed, + match="Exception while trying to call remote method .* before comm was established.", + ): + await r.__getattr__("foo")() + + @gen_cluster() async def test_thread_id(s, a, b): assert s.thread_id == a.thread_id == b.thread_id == threading.get_ident()