diff --git a/aiohttp/client_reqrep.py b/aiohttp/client_reqrep.py index 529f9f0146c..2065ea4f75b 100644 --- a/aiohttp/client_reqrep.py +++ b/aiohttp/client_reqrep.py @@ -22,7 +22,6 @@ from .helpers import PY_36, HeadersMixin, TimerNoop, noop, reify, set_result from .http import SERVER_SOFTWARE, HttpVersion10, HttpVersion11, StreamWriter from .log import client_logger -from .signals import Signal from .streams import StreamReader @@ -483,8 +482,9 @@ async def on_chunk_sent(chunk): await trace.send_request_chunk_sent(chunk) writer = StreamWriter( - conn.protocol, conn.transport, self.loop) - writer.on_chunk_sent.append(on_chunk_sent) + conn.protocol, conn.transport, self.loop, + on_chunk_sent=on_chunk_sent + ) if self.compress: writer.enable_compression(self.compress) @@ -518,17 +518,13 @@ async def on_chunk_sent(chunk): self._writer = self.loop.create_task(self.write_bytes(writer, conn)) - async def on_chunk_received(chunk): - for trace in self.traces: - await trace.send_response_chunk_received(chunk) - self.response = self.response_class( self.method, self.original_url, writer=self._writer, continue100=self._continue, timer=self._timer, request_info=self.request_info, - auto_decompress=self._auto_decompress) - self.response.on_chunk_received.append(on_chunk_received) - + auto_decompress=self._auto_decompress, + traces=self.traces, + ) self.response._post_init(self.loop, self._session) return self.response @@ -569,7 +565,8 @@ class ClientResponse(HeadersMixin): def __init__(self, method, url, *, writer=None, continue100=None, timer=None, - request_info=None, auto_decompress=True): + request_info=None, auto_decompress=True, + traces=[]): assert isinstance(url, URL) self.method = method @@ -586,9 +583,7 @@ def __init__(self, method, url, *, self._timer = timer if timer is not None else TimerNoop() self._auto_decompress = auto_decompress self._cache = {} # reqired for @reify method decorator - - # avoid circular reference so that __del__ works - self.on_chunk_received = Signal(owner=None) + self._traces = traces @property def url(self): @@ -813,8 +808,8 @@ async def read(self): if self._content is None: try: self._content = await self.content.read() - self.on_chunk_received.freeze() - await self.on_chunk_received.send(self._content) + for trace in self._traces: + await trace.send_response_chunk_received(self._content) except BaseException: self.close() raise diff --git a/aiohttp/http_writer.py b/aiohttp/http_writer.py index 46a93d19900..abf33a42281 100644 --- a/aiohttp/http_writer.py +++ b/aiohttp/http_writer.py @@ -2,11 +2,10 @@ import asyncio import collections +import inspect import zlib from .abc import AbstractStreamWriter -from .helpers import noop -from .signals import Signal __all__ = ('StreamWriter', 'HttpVersion', 'HttpVersion10', 'HttpVersion11') @@ -18,7 +17,12 @@ class StreamWriter(AbstractStreamWriter): - def __init__(self, protocol, transport, loop): + def __init__(self, protocol, transport, loop, on_chunk_sent=None): + assert ( + on_chunk_sent is None or + inspect.iscoroutinefunction(on_chunk_sent) + ) + self._protocol = protocol self._transport = transport @@ -32,7 +36,7 @@ def __init__(self, protocol, transport, loop): self._compress = None self._drain_waiter = None - self.on_chunk_sent = Signal(self) + self._on_chunk_sent = on_chunk_sent @property def transport(self): @@ -66,10 +70,8 @@ async def write(self, chunk, *, drain=True, LIMIT=64*1024): writer can't be used after write_eof() method being called. write() return drain future. """ - self.on_chunk_sent.freeze() - self.loop.create_task( - self.on_chunk_sent.send(chunk) - ) + if self._on_chunk_sent: + await self._on_chunk_sent(chunk) if self._compress is not None: chunk = self._compress.compress(chunk)