Skip to content

Commit

Permalink
Fix resolve_host "Task was destroyed but it is pending" errors (#8967)
Browse files Browse the repository at this point in the history
  • Loading branch information
Dreamsorcerer authored Sep 2, 2024
1 parent 8f3b1f4 commit cd761a3
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 6 deletions.
1 change: 1 addition & 0 deletions CHANGES/8967.bugfix.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fixed resolve_host() 'Task was destroyed but is pending' errors -- by :user:`Dreamsorcerer`.
20 changes: 15 additions & 5 deletions aiohttp/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from itertools import cycle, islice
from time import monotonic
from types import TracebackType
from typing import ( # noqa
from typing import (
TYPE_CHECKING,
Any,
Awaitable,
Expand Down Expand Up @@ -404,8 +404,8 @@ async def close(self) -> None:
err_msg = "Error while closing connector: " + repr(res)
logging.error(err_msg)

def _close_immediately(self) -> List["asyncio.Future[None]"]:
waiters: List["asyncio.Future[None]"] = []
def _close_immediately(self) -> List[Awaitable[object]]:
waiters: List[Awaitable[object]] = []

if self._closed:
return waiters
Expand Down Expand Up @@ -805,11 +805,19 @@ def __init__(
self._local_addr_infos = aiohappyeyeballs.addr_to_addr_infos(local_addr)
self._happy_eyeballs_delay = happy_eyeballs_delay
self._interleave = interleave
self._resolve_host_tasks: Set["asyncio.Task[List[ResolveResult]]"] = set()

def _close_immediately(self) -> List["asyncio.Future[None]"]:
def _close_immediately(self) -> List[Awaitable[object]]:
for ev in self._throttle_dns_events.values():
ev.cancel()
return super()._close_immediately()

waiters = super()._close_immediately()

for t in self._resolve_host_tasks:
t.cancel()
waiters.append(t)

return waiters

@property
def family(self) -> int:
Expand Down Expand Up @@ -885,6 +893,8 @@ async def _resolve_host(
resolved_host_task = asyncio.create_task(
self._resolve_host_with_throttle(key, host, port, traces)
)
self._resolve_host_tasks.add(resolved_host_task)
resolved_host_task.add_done_callback(self._resolve_host_tasks.discard)
try:
return await asyncio.shield(resolved_host_task)
except asyncio.CancelledError:
Expand Down
35 changes: 34 additions & 1 deletion tests/test_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import sys
import uuid
from collections import deque
from contextlib import closing
from contextlib import closing, suppress
from typing import (
Awaitable,
Callable,
Expand Down Expand Up @@ -1839,6 +1839,39 @@ async def test_close_cancels_cleanup_handle(
assert conn._cleanup_handle is None


async def test_close_cancels_resolve_host(loop: asyncio.AbstractEventLoop) -> None:
cancelled = False

async def delay_resolve_host(*args: object) -> None:
"""Delay _resolve_host() task in order to test cancellation."""
nonlocal cancelled
try:
await asyncio.sleep(10)
except asyncio.CancelledError:
cancelled = True
raise

conn = aiohttp.TCPConnector()
req = ClientRequest(
"GET", URL("http://localhost:80"), loop=loop, response_class=mock.Mock()
)
with mock.patch.object(conn, "_resolve_host_with_throttle", delay_resolve_host):
t = asyncio.create_task(conn.connect(req, [], ClientTimeout()))
# Let it create the internal task
await asyncio.sleep(0)
# Let that task start running
await asyncio.sleep(0)

# We now have a task being tracked and can ensure that .close() cancels it.
assert len(conn._resolve_host_tasks) == 1
await conn.close()
assert cancelled
assert len(conn._resolve_host_tasks) == 0

with suppress(asyncio.CancelledError):
await t


async def test_close_abort_closed_transports(loop: asyncio.AbstractEventLoop) -> None:
tr = mock.Mock()

Expand Down

0 comments on commit cd761a3

Please sign in to comment.