Skip to content

Commit

Permalink
Fix race condition on stream.read
Browse files Browse the repository at this point in the history
  • Loading branch information
florimondmanca committed Nov 17, 2019
1 parent 6045ee2 commit f95688f
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 12 deletions.
9 changes: 7 additions & 2 deletions httpx/concurrency/asyncio.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ def __init__(
self.stream_reader = stream_reader
self.stream_writer = stream_writer
self.timeout = timeout
self.read_lock = asyncio.Lock()

self._inner: typing.Optional[SocketStream] = None

Expand Down Expand Up @@ -111,8 +112,10 @@ async def read(
should_raise = flag is None or flag.raise_on_read_timeout
read_timeout = timeout.read_timeout if should_raise else 0.01
try:
data = await asyncio.wait_for(self.stream_reader.read(n), read_timeout)
break
async with self.read_lock:
data = await asyncio.wait_for(
self.stream_reader.read(n), read_timeout
)
except asyncio.TimeoutError:
if should_raise:
raise ReadTimeout() from None
Expand All @@ -122,6 +125,8 @@ async def read(
# doesn't seem to allow on 3.6.
# See: https://github.com/encode/httpx/issues/382
await asyncio.sleep(0)
else:
break

return data

Expand Down
4 changes: 3 additions & 1 deletion httpx/concurrency/trio.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ def __init__(
self.stream = stream
self.timeout = timeout
self.write_buffer = b""
self.read_lock = trio.Lock()
self.write_lock = trio.Lock()

async def start_tls(
Expand Down Expand Up @@ -74,7 +75,8 @@ async def read(
read_timeout = _or_inf(timeout.read_timeout if should_raise else 0.01)

with trio.move_on_after(read_timeout):
return await self.stream.receive_some(max_bytes=n)
async with self.read_lock:
return await self.stream.receive_some(max_bytes=n)

if should_raise:
raise ReadTimeout() from None
Expand Down
30 changes: 21 additions & 9 deletions tests/concurrency.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@

import asyncio
import functools
import trio

from httpx import AsyncioBackend
from httpx.concurrency.trio import TrioBackend


@functools.singledispatch
Expand All @@ -19,13 +21,23 @@ async def _sleep_asyncio(backend, seconds: int):
await asyncio.sleep(seconds)


try:
import trio
from httpx.concurrency.trio import TrioBackend
except ImportError: # pragma: no cover
pass
else:
@sleep.register(TrioBackend)
async def _sleep_trio(backend, seconds: int):
await trio.sleep(seconds)

@sleep.register(TrioBackend)
async def _sleep_trio(backend, seconds: int):
await trio.sleep(seconds)

@functools.singledispatch
async def run_concurrently(backend, *async_fns):
raise NotImplementedError # pragma: no cover


@run_concurrently.register(AsyncioBackend)
async def _run_concurrently_asyncio(backend, *async_fns):
await asyncio.gather(*(fn() for fn in async_fns))


@run_concurrently.register(TrioBackend)
async def _run_concurrently_trio(backend, *async_fns):
async with trio.open_nursery() as nursery:
for fn in async_fns:
nursery.start_soon(fn)
17 changes: 17 additions & 0 deletions tests/test_concurrency.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

from httpx import AsyncioBackend, HTTPVersionConfig, SSLConfig, TimeoutConfig
from httpx.concurrency.trio import TrioBackend
from tests.concurrency import run_concurrently


@pytest.mark.parametrize(
Expand Down Expand Up @@ -68,3 +69,19 @@ async def test_start_tls_on_socket_stream(https_server, backend, get_cipher):

finally:
await stream.close()


async def test_concurrent_read(server, backend):
"""
Regression test for: https://github.com/encode/httpx/issues/527
"""
stream = await backend.open_tcp_stream(
server.url.host, server.url.port, ssl_context=None, timeout=TimeoutConfig(5)
)
try:
await stream.write(b"GET / HTTP/1.1\r\n\r\n")
await run_concurrently(
backend, lambda: stream.read(10), lambda: stream.read(10)
)
finally:
await stream.close()

0 comments on commit f95688f

Please sign in to comment.