Skip to content

Commit

Permalink
Remove internal usage of Signal in favor of simple callbacks.
Browse files Browse the repository at this point in the history
  • Loading branch information
kowalski committed Feb 27, 2018
1 parent 329f89a commit 24e1db9
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 24 deletions.
27 changes: 11 additions & 16 deletions aiohttp/client_reqrep.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
from .helpers import PY_36, HeadersMixin, TimerNoop, noop, reify, set_result
from .http import SERVER_SOFTWARE, HttpVersion10, HttpVersion11, StreamWriter
from .log import client_logger
from .signals import Signal
from .streams import StreamReader


Expand Down Expand Up @@ -483,8 +482,9 @@ async def on_chunk_sent(chunk):
await trace.send_request_chunk_sent(chunk)

writer = StreamWriter(
conn.protocol, conn.transport, self.loop)
writer.on_chunk_sent.append(on_chunk_sent)
conn.protocol, conn.transport, self.loop,
on_chunk_sent=on_chunk_sent
)

if self.compress:
writer.enable_compression(self.compress)
Expand Down Expand Up @@ -518,17 +518,13 @@ async def on_chunk_sent(chunk):

self._writer = self.loop.create_task(self.write_bytes(writer, conn))

async def on_chunk_received(chunk):
for trace in self.traces:
await trace.send_response_chunk_received(chunk)

self.response = self.response_class(
self.method, self.original_url,
writer=self._writer, continue100=self._continue, timer=self._timer,
request_info=self.request_info,
auto_decompress=self._auto_decompress)
self.response.on_chunk_received.append(on_chunk_received)

auto_decompress=self._auto_decompress,
traces=self.traces,
)
self.response._post_init(self.loop, self._session)
return self.response

Expand Down Expand Up @@ -569,7 +565,8 @@ class ClientResponse(HeadersMixin):

def __init__(self, method, url, *,
writer=None, continue100=None, timer=None,
request_info=None, auto_decompress=True):
request_info=None, auto_decompress=True,
traces=[]):
assert isinstance(url, URL)

self.method = method
Expand All @@ -586,9 +583,7 @@ def __init__(self, method, url, *,
self._timer = timer if timer is not None else TimerNoop()
self._auto_decompress = auto_decompress
self._cache = {} # reqired for @reify method decorator

# avoid circular reference so that __del__ works
self.on_chunk_received = Signal(owner=None)
self._traces = traces

@property
def url(self):
Expand Down Expand Up @@ -813,8 +808,8 @@ async def read(self):
if self._content is None:
try:
self._content = await self.content.read()
self.on_chunk_received.freeze()
await self.on_chunk_received.send(self._content)
for trace in self._traces:
await trace.send_response_chunk_received(self._content)
except BaseException:
self.close()
raise
Expand Down
18 changes: 10 additions & 8 deletions aiohttp/http_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,10 @@

import asyncio
import collections
import inspect
import zlib

from .abc import AbstractStreamWriter
from .helpers import noop
from .signals import Signal


__all__ = ('StreamWriter', 'HttpVersion', 'HttpVersion10', 'HttpVersion11')
Expand All @@ -18,7 +17,12 @@

class StreamWriter(AbstractStreamWriter):

def __init__(self, protocol, transport, loop):
def __init__(self, protocol, transport, loop, on_chunk_sent=None):
assert (
on_chunk_sent is None or
inspect.iscoroutinefunction(on_chunk_sent)
)

self._protocol = protocol
self._transport = transport

Expand All @@ -32,7 +36,7 @@ def __init__(self, protocol, transport, loop):
self._compress = None
self._drain_waiter = None

self.on_chunk_sent = Signal(self)
self._on_chunk_sent = on_chunk_sent

@property
def transport(self):
Expand Down Expand Up @@ -66,10 +70,8 @@ async def write(self, chunk, *, drain=True, LIMIT=64*1024):
writer can't be used after write_eof() method being called.
write() return drain future.
"""
self.on_chunk_sent.freeze()
self.loop.create_task(
self.on_chunk_sent.send(chunk)
)
if self._on_chunk_sent:
await self._on_chunk_sent(chunk)

if self._compress is not None:
chunk = self._compress.compress(chunk)
Expand Down

0 comments on commit 24e1db9

Please sign in to comment.