diff --git a/httpcore/_async/http2.py b/httpcore/_async/http2.py index c201ee4c..26a8bcfe 100644 --- a/httpcore/_async/http2.py +++ b/httpcore/_async/http2.py @@ -401,6 +401,9 @@ async def _receive_remote_settings_change(self, event: h2.events.Event) -> None: await self._max_streams_semaphore.acquire() self._max_streams -= 1 + async def _reset_steam(self, stream_id: int, error_code: int) -> None: + self._h2_state.reset_stream(stream_id=stream_id, error_code=error_code) + async def _response_closed(self, stream_id: int) -> None: await self._max_streams_semaphore.release() del self._events[stream_id] @@ -578,12 +581,18 @@ async def __aiter__(self) -> typing.AsyncIterator[bytes]: # we want to close the response (and possibly the connection) # before raising that exception. with AsyncShieldCancellation(): - await self.aclose() + # close the stream with cancel + await self.aclose(cancel_stream=True) raise exc - async def aclose(self) -> None: + async def aclose(self, cancel_stream: bool = False) -> None: if not self._closed: self._closed = True kwargs = {"stream_id": self._stream_id} async with Trace("response_closed", logger, self._request, kwargs): + if cancel_stream: + await self._connection._reset_steam( + stream_id=self._stream_id, + error_code=h2.settings.ErrorCodes.CANCEL, + ) await self._connection._response_closed(stream_id=self._stream_id) diff --git a/httpcore/_sync/http2.py b/httpcore/_sync/http2.py index 1ee4bbb3..1a7d2868 100644 --- a/httpcore/_sync/http2.py +++ b/httpcore/_sync/http2.py @@ -401,6 +401,9 @@ def _receive_remote_settings_change(self, event: h2.events.Event) -> None: self._max_streams_semaphore.acquire() self._max_streams -= 1 + def _reset_steam(self, stream_id: int, error_code: int) -> None: + self._h2_state.reset_stream(stream_id=stream_id, error_code=error_code) + def _response_closed(self, stream_id: int) -> None: self._max_streams_semaphore.release() del self._events[stream_id] @@ -578,12 +581,18 @@ def __iter__(self) -> typing.Iterator[bytes]: # we want to close the response (and possibly the connection) # before raising that exception. with ShieldCancellation(): - self.close() + # close the stream with cancel + self.close(cancel_stream=True) raise exc - def close(self) -> None: + def close(self, cancel_stream: bool = False) -> None: if not self._closed: self._closed = True kwargs = {"stream_id": self._stream_id} with Trace("response_closed", logger, self._request, kwargs): + if cancel_stream: + self._connection._reset_steam( + stream_id=self._stream_id, + error_code=h2.settings.ErrorCodes.CANCEL, + ) self._connection._response_closed(stream_id=self._stream_id)