Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/bugfix: make the HTTP client able to return HTTP chunks when chunked transfer encoding is used #2150

Merged
merged 22 commits into from
Sep 1, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
7a30dc5
implement http chunk parsing in http parser (C version)
jlacoline Jul 19, 2017
ad942af
http chunk decoding: implement chunk signals in Python parser
jlacoline Jul 20, 2017
09b1da5
StreamReader: add tests for [begin|end]_chunk_receiving methods
jlacoline Jul 20, 2017
511a680
update documentation to clarify the difference between iter_any() and…
jlacoline Jul 31, 2017
6d0211f
add tests for http chunks parsing
jlacoline Jul 31, 2017
32e4273
add changelog file for PR 2150
jlacoline Jul 31, 2017
fae40a7
http chunk parsing: readchunk() now returns tuples of (data, end_of_h…
jlacoline Aug 4, 2017
f56b069
Merge remote-tracking branch 'upstream/master' into add_http_chunk_fe…
jlacoline Aug 4, 2017
4563338
http chunk parsing: adapt iterchunks() generator to new return format
jlacoline Aug 4, 2017
513f0ad
streams.py: use parenthesis for line wrapping instead of backslash
jlacoline Aug 11, 2017
85ea0eb
add unit tests for ChunkTupleAsyncStreamIterator
jlacoline Aug 11, 2017
09848db
do not catch EofStream in ChunkTupleAsyncStreamIterator
jlacoline Aug 11, 2017
d69d890
change the behaviour of stream.readchunk when searching for the next …
jlacoline Aug 11, 2017
5e8e25f
add tests to the stream.readchunk() method
jlacoline Aug 11, 2017
939d386
http_parser.py: remove useless blank line
jlacoline Aug 11, 2017
bf31405
update documentation in streams.rst
jlacoline Aug 16, 2017
19b9d14
update documentation in docs/client_reference.rst
jlacoline Aug 16, 2017
5ed76d6
minor change to test_streams.py
jlacoline Aug 16, 2017
e172a23
change formatting in streams.rst
jlacoline Aug 16, 2017
c99e06c
fix spelling errors in documentation
jlacoline Aug 16, 2017
1136c56
stream.rs: replace 'boolean' with :class:
jlacoline Aug 16, 2017
39381ba
Merge branch 'master' into add_http_chunk_feature
asvetlov Sep 1, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions aiohttp/_http_parser.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,8 @@ cdef class HttpParser:
self._csettings.on_body = cb_on_body
self._csettings.on_message_begin = cb_on_message_begin
self._csettings.on_message_complete = cb_on_message_complete
self._csettings.on_chunk_header = cb_on_chunk_header
self._csettings.on_chunk_complete = cb_on_chunk_complete

self._last_error = None

Expand Down Expand Up @@ -208,6 +210,11 @@ cdef class HttpParser:
self._payload.feed_eof()
self._payload = None

cdef _on_chunk_header(self):
self._payload.begin_http_chunk_receiving()

cdef _on_chunk_complete(self):
self._payload.end_http_chunk_receiving()

### Public API ###

Expand Down Expand Up @@ -436,6 +443,28 @@ cdef int cb_on_message_complete(cparser.http_parser* parser) except -1:
return 0


cdef int cb_on_chunk_header(cparser.http_parser* parser) except -1:
cdef HttpParser pyparser = <HttpParser>parser.data
try:
pyparser._on_chunk_header()
except BaseException as exc:
pyparser._last_error = exc
return -1
else:
return 0


cdef int cb_on_chunk_complete(cparser.http_parser* parser) except -1:
cdef HttpParser pyparser = <HttpParser>parser.data
try:
pyparser._on_chunk_complete()
except BaseException as exc:
pyparser._last_error = exc
return -1
else:
return 0


cdef parser_error_from_errno(cparser.http_errno errno):
cdef bytes desc = cparser.http_errno_description(errno)

Expand Down
13 changes: 9 additions & 4 deletions aiohttp/http_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -538,6 +538,7 @@ def feed_data(self, chunk, SEP=b'\r\n', CHUNK_EXT=b';'):
else:
self._chunk = ChunkState.PARSE_CHUNKED_CHUNK
self._chunk_size = size
self.payload.begin_http_chunk_receiving()
else:
self._chunk_tail = chunk
return False, None
Expand All @@ -547,18 +548,16 @@ def feed_data(self, chunk, SEP=b'\r\n', CHUNK_EXT=b';'):
required = self._chunk_size
chunk_len = len(chunk)

if required >= chunk_len:
if required > chunk_len:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please explain why you replaced >= with >.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If required == chunk_len, then self._chunk_size == 0 and the "else" part does what is needed.
So lines 552 and 553 looked like duplicated code, and deduplicating this allowed me to put self.payload.end_http_chunk_receiving() at only one place in the code.

But the code will continue after the 'else' as opposed to the previous version where (False, None) was returned immediately. So the behaviour is not exactly the same even if the same result will be eventually returned.

So imo it looks better but if you want me to revert to the previous version tell me.

Also I'm going to remove the useless blank line(554)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, fine with the change

self._chunk_size = required - chunk_len
if self._chunk_size == 0:
self._chunk = ChunkState.PARSE_CHUNKED_CHUNK_EOF

self.payload.feed_data(chunk, chunk_len)
return False, None
else:
self._chunk_size = 0
self.payload.feed_data(chunk[:required], required)
chunk = chunk[required:]
self._chunk = ChunkState.PARSE_CHUNKED_CHUNK_EOF
self.payload.end_http_chunk_receiving()

# toss the CRLF at the end of the chunk
if self._chunk == ChunkState.PARSE_CHUNKED_CHUNK_EOF:
Expand Down Expand Up @@ -644,6 +643,12 @@ def feed_eof(self):

self.out.feed_eof()

def begin_http_chunk_receiving(self):
self.out.begin_http_chunk_receiving()

def end_http_chunk_receiving(self):
self.out.end_http_chunk_receiving()


HttpRequestParser = HttpRequestParserPy
HttpResponseParser = HttpResponseParserPy
Expand Down
61 changes: 52 additions & 9 deletions aiohttp/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,14 @@ def __anext__(self):
raise StopAsyncIteration # NOQA
return rv

class ChunkTupleAsyncStreamIterator(AsyncStreamIterator):
@asyncio.coroutine
def __anext__(self):
rv = yield from self.read_func()
if rv == (b'', False):
raise StopAsyncIteration # NOQA
return rv


class AsyncStreamReaderMixin:

Expand All @@ -58,20 +66,21 @@ def iter_chunked(self, n):
return AsyncStreamIterator(lambda: self.read(n))

def iter_any(self):
"""Returns an asynchronous iterator that yields slices of data
as they come.
"""Returns an asynchronous iterator that yields all the available
data as soon as it is received

Python-3.5 available for Python 3.5+ only
"""
return AsyncStreamIterator(self.readany)

def iter_chunks(self):
"""Returns an asynchronous iterator that yields chunks of the
size as received by the server.
"""Returns an asynchronous iterator that yields chunks of data
as they are received by the server. The yielded objects are tuples
of (bytes, bool) as returned by the StreamReader.readchunk method.

Python-3.5 available for Python 3.5+ only
"""
return AsyncStreamIterator(self.readchunk)
return ChunkTupleAsyncStreamIterator(self.readchunk)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a test for the call.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done



class StreamReader(AsyncStreamReaderMixin):
Expand All @@ -96,6 +105,8 @@ def __init__(self, limit=DEFAULT_LIMIT, timer=None, loop=None):
loop = asyncio.get_event_loop()
self._loop = loop
self._size = 0
self._cursor = 0
self._http_chunk_splits = None
self._buffer = collections.deque()
self._buffer_offset = 0
self._eof = False
Expand Down Expand Up @@ -200,6 +211,7 @@ def unread_data(self, data):
self._buffer[0] = self._buffer[0][self._buffer_offset:]
self._buffer_offset = 0
self._size += len(data)
self._cursor -= len(data)
self._buffer.appendleft(data)

def feed_data(self, data):
Expand All @@ -218,6 +230,18 @@ def feed_data(self, data):
if not waiter.done():
waiter.set_result(False)

def begin_http_chunk_receiving(self):
if self._http_chunk_splits is None:
self._http_chunk_splits = []

def end_http_chunk_receiving(self):
if self._http_chunk_splits is None:
raise RuntimeError("Called end_chunk_receiving without calling "
"begin_chunk_receiving first")
if not self._http_chunk_splits or \
self._http_chunk_splits[-1] != self.total_bytes:
self._http_chunk_splits.append(self.total_bytes)

@asyncio.coroutine
def _wait(self, func_name):
# StreamReader uses a future to link the protocol feed_data() method
Expand Down Expand Up @@ -320,16 +344,34 @@ def readany(self):

@asyncio.coroutine
def readchunk(self):
"""Returns a tuple of (data, end_of_http_chunk). When chunked transfer
encoding is used, end_of_http_chunk is a boolean indicating if the end
of the data corresponds to the end of a HTTP chunk , otherwise it is
always False.
"""
if self._exception is not None:
raise self._exception

if not self._buffer and not self._eof:
if (self._http_chunk_splits and
self._cursor == self._http_chunk_splits[0]):
# end of http chunk without available data
self._http_chunk_splits = self._http_chunk_splits[1:]
return (b"", True)
yield from self._wait('readchunk')

if self._buffer:
return self._read_nowait_chunk(-1)
if not self._buffer:
# end of file
return (b"", False)
elif self._http_chunk_splits is not None:
while self._http_chunk_splits:
pos = self._http_chunk_splits[0]
self._http_chunk_splits = self._http_chunk_splits[1:]
if pos > self._cursor:
return (self._read_nowait(pos-self._cursor), True)
return (self._read_nowait(-1), False)
else:
return b""
return (self._read_nowait_chunk(-1), False)

@asyncio.coroutine
def readexactly(self, n):
Expand Down Expand Up @@ -378,6 +420,7 @@ def _read_nowait_chunk(self, n):
data = self._buffer.popleft()

self._size -= len(data)
self._cursor += len(data)
return data

def _read_nowait(self, n):
Expand Down Expand Up @@ -438,7 +481,7 @@ def readany(self):

@asyncio.coroutine
def readchunk(self):
return b''
return (b'', False)

@asyncio.coroutine
def readexactly(self, n):
Expand Down
1 change: 1 addition & 0 deletions changes/2150.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Make the HTTP client able to return HTTP chunks when chunked transfer encoding is used.
5 changes: 4 additions & 1 deletion docs/client_reference.rst
Original file line number Diff line number Diff line change
Expand Up @@ -991,7 +991,10 @@ Response object

.. attribute:: content

Payload stream, contains response's BODY (:class:`StreamReader`).
Payload stream, which contains response's BODY (:class:`StreamReader`).
It supports various reading methods depending on the expected format.
When chunked transfer encoding is used by the server, allows retrieving
the actual http chunks.

Reading from the stream may raise
:exc:`aiohttp.ClientPayloadError` if the response object is
Expand Down
28 changes: 27 additions & 1 deletion docs/streams.rst
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,21 @@ Reading Methods
:return bytes: the given line


.. comethod:: StreamReader.readchunk()

Read a chunk of data as it was received by the server.

Returns a tuple of (data, end_of_HTTP_chunk).

When chunked transfer encoding is used, end_of_HTTP_chunk is a :class:`bool`
indicating if the end of the data corresponds to the end of a HTTP chunk,
otherwise it is always ``False``.

:return tuple[bytes, bool]: a chunk of data and a :class:`bool` that is ``True``
when the end of the returned chunk corresponds
to the end of a HTTP chunk.


Asynchronous Iteration Support
------------------------------

Expand Down Expand Up @@ -109,9 +124,20 @@ size limit and over any available data.

Iterates over data chunks as received from the server::

async for data in response.content.iter_chunks():
async for data, _ in response.content.iter_chunks():
print(data)

If chunked transfer encoding is used, the original http chunks formatting
can be retrieved by reading the second element of returned tuples::

buffer = b""

async for data, end_of_http_chunk in response.content.iter_chunks():
buffer += data
if end_of_http_chunk:
print(buffer)
buffer = b""


Helpers
-------
Expand Down
6 changes: 4 additions & 2 deletions tests/test_flowcontrol_streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,16 +71,18 @@ def test_readany_resume_paused(self):
def test_readchunk(self):
r = self._make_one()
r.feed_data(b'data', 4)
res = self.loop.run_until_complete(r.readchunk())
res, end_of_http_chunk = self.loop.run_until_complete(r.readchunk())
self.assertEqual(res, b'data')
self.assertFalse(end_of_http_chunk)
self.assertFalse(r._protocol.resume_reading.called)

def test_readchunk_resume_paused(self):
r = self._make_one()
r._protocol._reading_paused = True
r.feed_data(b'data', 4)
res = self.loop.run_until_complete(r.readchunk())
res, end_of_http_chunk = self.loop.run_until_complete(r.readchunk())
self.assertEqual(res, b'data')
self.assertFalse(end_of_http_chunk)
self.assertTrue(r._protocol.resume_reading.called)

def test_readexactly(self):
Expand Down
8 changes: 7 additions & 1 deletion tests/test_http_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -488,6 +488,7 @@ def test_http_request_chunked_payload(parser):
parser.feed_data(b'4\r\ndata\r\n4\r\nline\r\n0\r\n\r\n')

assert b'dataline' == b''.join(d for d in payload._buffer)
assert [4, 8] == payload._http_chunk_splits
assert payload.is_eof()


Expand All @@ -502,6 +503,7 @@ def test_http_request_chunked_payload_and_next_message(parser):
b'transfer-encoding: chunked\r\n\r\n')

assert b'dataline' == b''.join(d for d in payload._buffer)
assert [4, 8] == payload._http_chunk_splits
assert payload.is_eof()

assert len(messages) == 1
Expand All @@ -521,14 +523,17 @@ def test_http_request_chunked_payload_chunks(parser):
parser.feed_data(b'\n4')
parser.feed_data(b'\r')
parser.feed_data(b'\n')
parser.feed_data(b'line\r\n0\r\n')
parser.feed_data(b'li')
parser.feed_data(b'ne\r\n0\r\n')
parser.feed_data(b'test: test\r\n')

assert b'dataline' == b''.join(d for d in payload._buffer)
assert [4, 8] == payload._http_chunk_splits
assert not payload.is_eof()

parser.feed_data(b'\r\n')
assert b'dataline' == b''.join(d for d in payload._buffer)
assert [4, 8] == payload._http_chunk_splits
assert payload.is_eof()


Expand All @@ -541,6 +546,7 @@ def test_parse_chunked_payload_chunk_extension(parser):
b'4;test\r\ndata\r\n4\r\nline\r\n0\r\ntest: test\r\n\r\n')

assert b'dataline' == b''.join(d for d in payload._buffer)
assert [4, 8] == payload._http_chunk_splits
assert payload.is_eof()


Expand Down
21 changes: 21 additions & 0 deletions tests/test_py35/test_streams_35.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,3 +81,24 @@ async def test_stream_reader_iter(loop):
async for raw in create_stream(loop):
assert raw == next(it)
pytest.raises(StopIteration, next, it)


async def test_stream_reader_iter_chunks_no_chunked_encoding(loop):
it = iter([b'line1\nline2\nline3\n'])
async for data, end_of_chunk in create_stream(loop).iter_chunks():
assert (data, end_of_chunk) == (next(it), False)
pytest.raises(StopIteration, next, it)


async def test_stream_reader_iter_chunks_chunked_encoding(loop):
stream = streams.StreamReader(loop=loop)
for line in DATA.splitlines(keepends=True):
stream.begin_http_chunk_receiving()
stream.feed_data(line)
stream.end_http_chunk_receiving()
stream.feed_eof()

it = iter([b'line1\n', b'line2\n', b'line3\n'])
async for data, end_of_chunk in stream.iter_chunks():
assert (data, end_of_chunk) == (next(it), True)
pytest.raises(StopIteration, next, it)
Loading