diff --git a/distributed/utils_test.py b/distributed/utils_test.py index dba42c36fc..bc9515535e 100644 --- a/distributed/utils_test.py +++ b/distributed/utils_test.py @@ -170,16 +170,29 @@ def start(): @pytest.fixture def loop_in_thread(cleanup): - with pristine_loop() as loop: - thread = threading.Thread(target=loop.start, name="test IOLoop") - thread.daemon = True - thread.start() - loop_started = threading.Event() - loop.add_callback(loop_started.set) - loop_started.wait() - yield loop - loop.add_callback(loop.stop) - thread.join(timeout=5) + loop_started = concurrent.futures.Future() + with concurrent.futures.ThreadPoolExecutor( + 1, thread_name_prefix="test IOLoop" + ) as tpe: + + async def run(): + io_loop = IOLoop.current() + stop_event = asyncio.Event() + loop_started.set_result((io_loop, stop_event)) + await stop_event.wait() + + ran = tpe.submit(_run_and_close_tornado, run) + for f in concurrent.futures.as_completed((loop_started, ran)): + if f is loop_started: + io_loop, stop_event = loop_started.result() + try: + yield io_loop + finally: + io_loop.add_callback(stop_event.set) + + elif f is ran: + ran.result() + return @pytest.fixture