Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Python 3.7: RuntimeError: read() called while another coroutine is already waiting for incoming data #527

Closed
PrimozGodec opened this issue Nov 14, 2019 · 16 comments · Fixed by #535
Labels
bug Something isn't working concurrency Issues related to concurrency and usage of async libraries http/2 Issues and PRs related to HTTP/2

Comments

@PrimozGodec
Copy link
Contributor

PrimozGodec commented Nov 14, 2019

As we already discussed with @florimondmanca in #382, I am reporting this issue here.

I get the following error: RuntimeError: read() called while another coroutine is already waiting for incoming data when I am sending images to the server. As you can see from the code it is implemented that only max 100 requests are sent to the server at the same time. I know there could be lazy loading used but the limitation is made custom because before we send images to the server we check whether they are in the local cache or not. The limit of 100 is set that not all images are loaded in the memory at the same time (there can be also 10000 images).

Here is the trace:

Traceback (most recent call last):
  File "/Users/primoz/PycharmProjects/orange3-imageanalytics/example.py", line 63, in <module>
    asyncio.get_event_loop().run_until_complete(cl.embedd_batch())
  File "/Users/primoz/miniconda3/lib/python3.7/asyncio/base_events.py", line 584, in run_until_complete
    return future.result()
  File "/Users/primoz/PycharmProjects/orange3-imageanalytics/example.py", line 18, in embedd_batch
    embeddings = await asyncio.gather(*requests)
  File "/Users/primoz/PycharmProjects/orange3-imageanalytics/example.py", line 41, in _send_to_server
    emb = await self._send_request(client, im, url)
  File "/Users/primoz/PycharmProjects/orange3-imageanalytics/example.py", line 56, in _send_request
    data=image
  File "/Users/primoz/venv/orange/lib/python3.7/site-packages/httpx/client.py", line 484, in post
    trust_env=trust_env,
  File "/Users/primoz/venv/orange/lib/python3.7/site-packages/httpx/client.py", line 626, in request
    trust_env=trust_env,
  File "/Users/primoz/venv/orange/lib/python3.7/site-packages/httpx/client.py", line 650, in send
    trust_env=trust_env,
  File "/Users/primoz/venv/orange/lib/python3.7/site-packages/httpx/client.py", line 265, in _get_response
    return await get_response(request)
  File "/Users/primoz/venv/orange/lib/python3.7/site-packages/httpx/middleware/redirect.py", line 31, in __call__
    response = await get_response(request)
  File "/Users/primoz/venv/orange/lib/python3.7/site-packages/httpx/client.py", line 226, in get_response
    request, verify=verify, cert=cert, timeout=timeout
  File "/Users/primoz/venv/orange/lib/python3.7/site-packages/httpx/dispatch/connection_pool.py", line 126, in send
    raise exc
  File "/Users/primoz/venv/orange/lib/python3.7/site-packages/httpx/dispatch/connection_pool.py", line 121, in send
    request, verify=verify, cert=cert, timeout=timeout
  File "/Users/primoz/venv/orange/lib/python3.7/site-packages/httpx/dispatch/connection.py", line 62, in send
    response = await self.h2_connection.send(request, timeout=timeout)
  File "/Users/primoz/venv/orange/lib/python3.7/site-packages/httpx/dispatch/http2.py", line 52, in send
    status_code, headers = await self.receive_response(stream_id, timeout)
  File "/Users/primoz/venv/orange/lib/python3.7/site-packages/httpx/dispatch/http2.py", line 171, in receive_response
    event = await self.receive_event(stream_id, timeout)
  File "/Users/primoz/venv/orange/lib/python3.7/site-packages/httpx/dispatch/http2.py", line 206, in receive_event
    data = await self.stream.read(self.READ_NUM_BYTES, timeout, flag=flag)
  File "/Users/primoz/venv/orange/lib/python3.7/site-packages/httpx/concurrency/asyncio.py", line 114, in read
    data = await asyncio.wait_for(self.stream_reader.read(n), read_timeout)
  File "/Users/primoz/miniconda3/lib/python3.7/asyncio/tasks.py", line 416, in wait_for
    return fut.result()
  File "/Users/primoz/miniconda3/lib/python3.7/asyncio/streams.py", line 640, in read
    await self._wait_for_data('read')
  File "/Users/primoz/miniconda3/lib/python3.7/asyncio/streams.py", line 460, in _wait_for_data
    f'{func_name}() called while another coroutine is '
RuntimeError: read() called while another coroutine is already waiting for incoming data

The minimum example that reproduces the error is here:

import asyncio
import pickle

import httpx


class SendRequests:
    num_parallel_requests = 0
    MAX_PARALLEL = 100

    async def embedd_batch(self):
        requests = []
        async with httpx.AsyncClient(
                timeout=httpx.TimeoutConfig(timeout=60),
                base_url="https://api.garaza.io/") as client:
            for i in range(1000):
                requests.append(self._send_to_server(client))

            embeddings = await asyncio.gather(*requests)

        return embeddings

    async def __wait_until_released(self):
        while self.num_parallel_requests >= self.MAX_PARALLEL:
            await asyncio.sleep(0.1)

    async def _send_to_server(self, client):
        await self.__wait_until_released()

        self.num_parallel_requests += 1
        # simplified image loading 
        with open("image.pkl", "rb") as f:
            im = pickle.load(f)
        url = "/image/inception-v3?machine=1&session=1&retry=0"
        emb = await self._send_request(client, im, url)
        self.num_parallel_requests -= 1
        return emb

    async def _send_request(self, client, image, url):
        headers = {'Content-Type': 'image/jpeg',
                   'Content-Length': str(len(image))}
        response = await client.post(url, headers=headers, data=image)
        print(response.content)
        return response


if __name__ == "__main__":
    cl = SendRequests()
    asyncio.get_event_loop().run_until_complete(cl.embedd_batch())

All advice about this error is welcome. I tried to debug this error by myself but was not successful.

The pickle file required in the code snippet is here:
image.pkl.zip

@florimondmanca florimondmanca added bug Something isn't working concurrency Issues related to concurrency and usage of async libraries labels Nov 16, 2019
@florimondmanca
Copy link
Member

@iluxonchik posted another example in #382, which I'm reposting here in a runnable form:

import asyncio
import httpx

async def print_content_from_url(url, client):
    res = await client.get(url)
    print(res, url)

async def main():
    urls = ['https://example.com', 'https://google.com']*100
    async with httpx.AsyncClient() as client:
        await asyncio.gather(*(print_content_from_url(url, client) for url in urls))

asyncio.run(main())

This program runs fine on my side, on 3.7.3 and 3.8.0. @iluxonchik Which Python version were you using?

@florimondmanca
Copy link
Member

@PrimozGodec I was able to run your code and reproduce the error you listed (on 3.8.0).

From the traceback I noticed that the requests are made using HTTP/2. I forced usage of HTTP/1.1 by passing http_versions=["HTTP/1.1"] and the requests went through just fine. Can you confirm this dirty-fixes the issue for you as well?

This is probably somewhat related to #514.

@florimondmanca
Copy link
Member

florimondmanca commented Nov 17, 2019

Interestingly, if I remove the HTTP/1.1 hot fix, but then set MAX_PARALLEL = 2 and only make 4 requests, the issue occurs too. It doesn't occur if using anything else than (2, 4), e.g. (1, 2) goes through just fine, and so does (2, 3).

So I turned on TRACE logs and here's what we get:

HTTPX_LOG_LEVEL=trace python debug/read_error_prizmo.py
TRACE [2019-11-17 12:11:58] httpx.dispatch.connection_pool - acquire_connection origin=Origin(scheme='https' host='api.garaza.io' port=443)
TRACE [2019-11-17 12:11:58] httpx.dispatch.connection_pool - acquire_connection origin=Origin(scheme='https' host='api.garaza.io' port=443)
TRACE [2019-11-17 12:11:58] httpx.dispatch.connection_pool - new_connection connection=HTTPConnection(origin=Origin(scheme='https' host='api.garaza.io' port=443))
TRACE [2019-11-17 12:11:58] httpx.config - load_ssl_context verify=True cert=None trust_env=True http_versions=HTTPVersionConfig(['HTTP/1.1', 'HTTP/2'])
TRACE [2019-11-17 12:11:58] httpx.dispatch.connection_pool - new_connection connection=HTTPConnection(origin=Origin(scheme='https' host='api.garaza.io' port=443))
TRACE [2019-11-17 12:11:58] httpx.config - load_ssl_context verify=True cert=None trust_env=True http_versions=HTTPVersionConfig(['HTTP/1.1', 'HTTP/2'])
TRACE [2019-11-17 12:11:58] httpx.config - load_verify_locations cafile=/Users/florimond/.pyenv/versions/3.8.0/lib/python3.8/site-packages/certifi/cacert.pem
TRACE [2019-11-17 12:11:58] httpx.config - load_verify_locations cafile=/Users/florimond/.pyenv/versions/3.8.0/lib/python3.8/site-packages/certifi/cacert.pem
TRACE [2019-11-17 12:11:58] httpx.dispatch.connection - start_connect host='api.garaza.io' port=443 timeout=TimeoutConfig(timeout=60)
TRACE [2019-11-17 12:11:58] httpx.dispatch.connection - start_connect host='api.garaza.io' port=443 timeout=TimeoutConfig(timeout=60)
TRACE [2019-11-17 12:11:58] httpx.dispatch.connection - connected http_version='HTTP/2'
TRACE [2019-11-17 12:11:58] httpx.dispatch.http2 - send_headers stream_id=1 method='POST' target='/image/inception-v3?machine=1&session=1&retry=0' headers=[(b':method', b'POST'), (b':authority', b'api.garaza.io'), (b':scheme', b'https'), (b':path', b'/image/inception-v3?machine=1&session=1&retry=0'), (b'user-agent', b'python-httpx/0.7.7'), (b'accept', b'*/*'), (b'accept-encoding', b'gzip, deflate'), (b'connection', b'keep-alive'), (b'content-type', b'image/jpeg'), (b'content-length', b'30959')]
TRACE [2019-11-17 12:11:58] httpx.dispatch.http2 - receive_event stream_id=0 event=<RemoteSettingsChanged changed_settings:{ChangedSetting(setting=SettingCodes.MAX_CONCURRENT_STREAMS, original_value=None, new_value=128), ChangedSetting(setting=SettingCodes.INITIAL_WINDOW_SIZE, original_value=65535, new_value=65536), ChangedSetting(setting=SettingCodes.MAX_FRAME_SIZE, original_value=16384, new_value=16777215)}>
TRACE [2019-11-17 12:11:58] httpx.dispatch.http2 - receive_event stream_id=0 event=<WindowUpdated stream_id:0, delta:2147418112>
TRACE [2019-11-17 12:11:58] httpx.dispatch.http2 - end_stream stream_id=1
TRACE [2019-11-17 12:11:58] httpx.dispatch.connection - connected http_version='HTTP/2'
TRACE [2019-11-17 12:11:58] httpx.dispatch.http2 - send_headers stream_id=1 method='POST' target='/image/inception-v3?machine=1&session=1&retry=0' headers=[(b':method', b'POST'), (b':authority', b'api.garaza.io'), (b':scheme', b'https'), (b':path', b'/image/inception-v3?machine=1&session=1&retry=0'), (b'user-agent', b'python-httpx/0.7.7'), (b'accept', b'*/*'), (b'accept-encoding', b'gzip, deflate'), (b'connection', b'keep-alive'), (b'content-type', b'image/jpeg'), (b'content-length', b'30959')]
TRACE [2019-11-17 12:11:58] httpx.dispatch.http2 - receive_event stream_id=0 event=<RemoteSettingsChanged changed_settings:{ChangedSetting(setting=SettingCodes.MAX_CONCURRENT_STREAMS, original_value=None, new_value=128), ChangedSetting(setting=SettingCodes.INITIAL_WINDOW_SIZE, original_value=65535, new_value=65536), ChangedSetting(setting=SettingCodes.MAX_FRAME_SIZE, original_value=16384, new_value=16777215)}>
TRACE [2019-11-17 12:11:58] httpx.dispatch.http2 - receive_event stream_id=0 event=<WindowUpdated stream_id:0, delta:2147418112>
TRACE [2019-11-17 12:11:58] httpx.dispatch.http2 - end_stream stream_id=1
TRACE [2019-11-17 12:11:58] httpx.dispatch.http2 - receive_event stream_id=0 event=<SettingsAcknowledged changed_settings:{}>
TRACE [2019-11-17 12:11:58] httpx.dispatch.http2 - receive_event stream_id=1 event=<WindowUpdated stream_id:1, delta:2147418111>
TRACE [2019-11-17 12:11:58] httpx.dispatch.http2 - receive_event stream_id=0 event=<SettingsAcknowledged changed_settings:{}>
TRACE [2019-11-17 12:11:58] httpx.dispatch.http2 - receive_event stream_id=1 event=<WindowUpdated stream_id:1, delta:2147418111>
TRACE [2019-11-17 12:11:58] httpx.dispatch.http2 - receive_event stream_id=1 event=<ResponseReceived stream_id:1, headers:[(b':status', b'200'), (b'server', b'nginx/1.13.5'), (b'date', b'Sun, 17 Nov 2019 11:11:58 GMT'), (b'content-type', b'application/json'), (b'strict-transport-security', b'max-age=15724800; includeSubDomains;'), (b'content-encoding', b'gzip')]>
TRACE [2019-11-17 12:11:58] httpx.dispatch.http2 - receive_event stream_id=1 event=<DataReceived stream_id:1, flow_controlled_length:2006, data:1f8b08000000000000034c974d8e64450c84efc2>
TRACE [2019-11-17 12:11:58] httpx.dispatch.http2 - receive_event stream_id=1 event=<DataReceived stream_id:1, flow_controlled_length:1940, data:54996d92eb2610453734492101022d28fbdf42ce>
TRACE [2019-11-17 12:11:58] httpx.dispatch.http2 - receive_event stream_id=1 event=<DataReceived stream_id:1, flow_controlled_length:1894, data:54da4996a4381004d00bf502100271ff8bb57d97>
TRACE [2019-11-17 12:11:58] httpx.dispatch.http2 - receive_event stream_id=1 event=<DataReceived stream_id:1, flow_controlled_length:1849, data:64db4992e3380c05d00bf5c2a2e6fb5facf13e28>
TRACE [2019-11-17 12:11:58] httpx.dispatch.http2 - receive_event stream_id=1 event=<DataReceived stream_id:1, flow_controlled_length:1861, data:5cdc59d69c2a1405e00965dda52828f39fd8e5db>
TRACE [2019-11-17 12:11:58] httpx.dispatch.http2 - receive_event stream_id=1 event=<ResponseReceived stream_id:1, headers:[(b':status', b'200'), (b'server', b'nginx/1.13.5'), (b'date', b'Sun, 17 Nov 2019 11:11:58 GMT'), (b'content-type', b'application/json'), (b'strict-transport-security', b'max-age=15724800; includeSubDomains;'), (b'content-encoding', b'gzip')]>
TRACE [2019-11-17 12:11:58] httpx.dispatch.http2 - receive_event stream_id=1 event=<DataReceived stream_id:1, flow_controlled_length:2006, data:1f8b08000000000000034c974d8e64450c84efc2>
TRACE [2019-11-17 12:11:58] httpx.dispatch.http2 - receive_event stream_id=1 event=<DataReceived stream_id:1, flow_controlled_length:1940, data:54996d92eb2610453734492101022d28fbdf42ce>
TRACE [2019-11-17 12:11:58] httpx.dispatch.http2 - receive_event stream_id=1 event=<DataReceived stream_id:1, flow_controlled_length:1894, data:54da4996a4381004d00bf502100271ff8bb57d97>
TRACE [2019-11-17 12:11:58] httpx.dispatch.http2 - receive_event stream_id=1 event=<DataReceived stream_id:1, flow_controlled_length:1849, data:64db4992e3380c05d00bf5c2a2e6fb5facf13e28>
TRACE [2019-11-17 12:11:58] httpx.dispatch.http2 - receive_event stream_id=1 event=<DataReceived stream_id:1, flow_controlled_length:1861, data:5cdc59d69c2a1405e00965dda52828f39fd8e5db>
TRACE [2019-11-17 12:11:58] httpx.dispatch.http2 - receive_event stream_id=1 event=<DataReceived stream_id:1, flow_controlled_length:1861, data:54dc59b2a4380c05d00d755430d80cfbdf58fb5c>
TRACE [2019-11-17 12:11:58] httpx.dispatch.http2 - receive_event stream_id=1 event=<DataReceived stream_id:1, flow_controlled_length:1880, data:54dd5992e3380c04d00b755458bb74ff8b0d5f02>
TRACE [2019-11-17 12:11:58] httpx.dispatch.http2 - receive_event stream_id=1 event=<DataReceived stream_id:1, flow_controlled_length:1862, data:5cdd5996e3361044d11df99000c7fd6fccba9109>
TRACE [2019-11-17 12:11:58] httpx.dispatch.http2 - receive_event stream_id=1 event=<DataReceived stream_id:1, flow_controlled_length:1259, data:4cdd4992e3380c05d07d9fa516d44c1da68e5277>
TRACE [2019-11-17 12:11:58] httpx.dispatch.http2 - receive_event stream_id=1 event=<DataReceived stream_id:1, flow_controlled_length:10, data:0300b2290cf3398a0000>
TRACE [2019-11-17 12:11:58] httpx.dispatch.http2 - receive_event stream_id=1 event=<StreamEnded stream_id:1>
TRACE [2019-11-17 12:11:58] httpx.dispatch.connection_pool - release_connection connection=HTTPConnection(origin=Origin(scheme='https' host='api.garaza.io' port=443))
DEBUG [2019-11-17 12:11:58] httpx.client - HTTP Request: POST https://api.garaza.io/image/inception-v3?machine=1&session=1&retry=0 "HTTP/2 200 OK"
TRACE [2019-11-17 12:11:58] httpx.dispatch.http2 - receive_event stream_id=1 event=<DataReceived stream_id:1, flow_controlled_length:1861, data:54dc59b2a4380c05d00d755430d80cfbdf58fb5c>
TRACE [2019-11-17 12:11:58] httpx.dispatch.http2 - receive_event stream_id=1 event=<DataReceived stream_id:1, flow_controlled_length:1880, data:54dd5992e3380c04d00b755458bb74ff8b0d5f02>
TRACE [2019-11-17 12:11:58] httpx.dispatch.http2 - receive_event stream_id=1 event=<DataReceived stream_id:1, flow_controlled_length:1862, data:5cdd5996e3361044d11df99000c7fd6fccba9109>
TRACE [2019-11-17 12:11:58] httpx.dispatch.http2 - receive_event stream_id=1 event=<DataReceived stream_id:1, flow_controlled_length:1259, data:4cdd4992e3380c05d07d9fa516d44c1da68e5277>
TRACE [2019-11-17 12:11:58] httpx.dispatch.http2 - receive_event stream_id=1 event=<DataReceived stream_id:1, flow_controlled_length:10, data:0300b2290cf3398a0000>
TRACE [2019-11-17 12:11:58] httpx.dispatch.http2 - receive_event stream_id=1 event=<StreamEnded stream_id:1>
TRACE [2019-11-17 12:11:58] httpx.dispatch.connection_pool - release_connection connection=HTTPConnection(origin=Origin(scheme='https' host='api.garaza.io' port=443))
DEBUG [2019-11-17 12:11:58] httpx.client - HTTP Request: POST https://api.garaza.io/image/inception-v3?machine=1&session=1&retry=0 "HTTP/2 200 OK"
TRACE [2019-11-17 12:11:58] httpx.dispatch.connection_pool - acquire_connection origin=Origin(scheme='https' host='api.garaza.io' port=443)
TRACE [2019-11-17 12:11:58] httpx.dispatch.connection_pool - reuse_connection connection=HTTPConnection(origin=Origin(scheme='https' host='api.garaza.io' port=443))
TRACE [2019-11-17 12:11:58] httpx.dispatch.http2 - send_headers stream_id=3 method='POST' target='/image/inception-v3?machine=1&session=1&retry=0' headers=[(b':method', b'POST'), (b':authority', b'api.garaza.io'), (b':scheme', b'https'), (b':path', b'/image/inception-v3?machine=1&session=1&retry=0'), (b'user-agent', b'python-httpx/0.7.7'), (b'accept', b'*/*'), (b'accept-encoding', b'gzip, deflate'), (b'connection', b'keep-alive'), (b'content-type', b'image/jpeg'), (b'content-length', b'30959')]
TRACE [2019-11-17 12:11:58] httpx.dispatch.connection_pool - acquire_connection origin=Origin(scheme='https' host='api.garaza.io' port=443)
TRACE [2019-11-17 12:11:58] httpx.dispatch.connection_pool - reuse_connection connection=HTTPConnection(origin=Origin(scheme='https' host='api.garaza.io' port=443))
TRACE [2019-11-17 12:11:58] httpx.dispatch.http2 - send_headers stream_id=5 method='POST' target='/image/inception-v3?machine=1&session=1&retry=0' headers=[(b':method', b'POST'), (b':authority', b'api.garaza.io'), (b':scheme', b'https'), (b':path', b'/image/inception-v3?machine=1&session=1&retry=0'), (b'user-agent', b'python-httpx/0.7.7'), (b'accept', b'*/*'), (b'accept-encoding', b'gzip, deflate'), (b'connection', b'keep-alive'), (b'content-type', b'image/jpeg'), (b'content-length', b'30959')]
TRACE [2019-11-17 12:11:58] httpx.dispatch.http2 - end_stream stream_id=3
TRACE [2019-11-17 12:11:58] httpx.dispatch.http2 - end_stream stream_id=5
TRACE [2019-11-17 12:11:58] httpx.dispatch.connection - close_connection
TRACE [2019-11-17 12:11:58] httpx.dispatch.http2 - receive_event stream_id=3 event=<WindowUpdated stream_id:3, delta:2147418111>
TRACE [2019-11-17 12:11:58] httpx.dispatch.http2 - receive_event stream_id=5 event=<WindowUpdated stream_id:5, delta:2147418111>
Traceback (most recent call last):
  File "debug/read_error_prizmo.py", line 53, in <module>
    asyncio.get_event_loop().run_until_complete(cl.embedd_batch())
  File "/Users/florimond/.pyenv/versions/3.8.0/lib/python3.8/asyncio/base_events.py", line 608, in run_until_complete
    return future.result()
  File "debug/read_error_prizmo.py", line 24, in embedd_batch
    embeddings = await asyncio.gather(*requests)
  File "debug/read_error_prizmo.py", line 40, in _send_to_server
    emb = await self._send_request(client, im, url)
  File "debug/read_error_prizmo.py", line 46, in _send_request
    response = await client.post(url, headers=headers, data=image)
  File "/Users/florimond/.pyenv/versions/3.8.0/lib/python3.8/site-packages/httpx/client.py", line 477, in post
    return await self.request(
  File "/Users/florimond/.pyenv/versions/3.8.0/lib/python3.8/site-packages/httpx/client.py", line 626, in request
    response = await self.send(
  File "/Users/florimond/.pyenv/versions/3.8.0/lib/python3.8/site-packages/httpx/client.py", line 650, in send
    return await self._get_response(
  File "/Users/florimond/.pyenv/versions/3.8.0/lib/python3.8/site-packages/httpx/client.py", line 273, in _get_response
    return await get_response(request)
  File "/Users/florimond/.pyenv/versions/3.8.0/lib/python3.8/site-packages/httpx/middleware/redirect.py", line 31, in __call__
    response = await get_response(request)
  File "/Users/florimond/.pyenv/versions/3.8.0/lib/python3.8/site-packages/httpx/client.py", line 227, in get_response
    response = await dispatch.send(
  File "/Users/florimond/.pyenv/versions/3.8.0/lib/python3.8/site-packages/httpx/dispatch/connection_pool.py", line 126, in send
    raise exc
  File "/Users/florimond/.pyenv/versions/3.8.0/lib/python3.8/site-packages/httpx/dispatch/connection_pool.py", line 120, in send
    response = await connection.send(
  File "/Users/florimond/.pyenv/versions/3.8.0/lib/python3.8/site-packages/httpx/dispatch/connection.py", line 62, in send
    response = await self.h2_connection.send(request, timeout=timeout)
  File "/Users/florimond/.pyenv/versions/3.8.0/lib/python3.8/site-packages/httpx/dispatch/http2.py", line 57, in send
    status_code, headers = await self.receive_response(stream_id, timeout)
  File "/Users/florimond/.pyenv/versions/3.8.0/lib/python3.8/site-packages/httpx/dispatch/http2.py", line 176, in receive_response
    event = await self.receive_event(stream_id, timeout)
  File "/Users/florimond/.pyenv/versions/3.8.0/lib/python3.8/site-packages/httpx/dispatch/http2.py", line 211, in receive_event
    data = await self.stream.read(self.READ_NUM_BYTES, timeout, flag=flag)
  File "/Users/florimond/.pyenv/versions/3.8.0/lib/python3.8/site-packages/httpx/concurrency/asyncio.py", line 114, in read
    data = await asyncio.wait_for(self.stream_reader.read(n), read_timeout)
  File "/Users/florimond/.pyenv/versions/3.8.0/lib/python3.8/asyncio/tasks.py", line 483, in wait_for
    return fut.result()
  File "/Users/florimond/.pyenv/versions/3.8.0/lib/python3.8/asyncio/streams.py", line 701, in read
    await self._wait_for_data('read')
  File "/Users/florimond/.pyenv/versions/3.8.0/lib/python3.8/asyncio/streams.py", line 520, in _wait_for_data
    raise RuntimeError(
RuntimeError: read() called while another coroutine is already waiting for incoming data

It's definitely a race condition. I looked at the code, and roughly what we're doing is this:

while not has_event():
    data = await stream.read(...)  # (A)
    ...
    data_to_send = ...
    await stream.write(data_to_send) # (B)

From the logs, it seems that streams (as in HTTP/2 streams) 3 and 5 both got to the same point — they received an event, and are going to reach point B). Here's what I think the event loop is doing:

  • Stream 3 reaches B), then suspends.
  • Stream 5 reaches B), then suspends.
  • Stream 3 wakes up and reaches A), then suspends.
  • Stream 5 wakes up and reaches A). At that point, it tries to .read() from the asyncio stream, but stream 3, which is accessing the same asyncio stream, is already trying to read data from it. => 💥

So, I guess the solution would be to put locks around all .write() and .read() calls within SocketStream?

@florimondmanca florimondmanca added the http/2 Issues and PRs related to HTTP/2 label Nov 17, 2019
@iluxonchik
Copy link

@florimondmanca I'm using Python 3.8

@iluxonchik
Copy link

Live Demo Demonstration

Here is the full stack trace:

unhandled exception during asyncio.run() shutdown
task: <Task finished name='Task-115' coro=<print_content_from_url() done, defined at demo.py:4> exception=KeyError(HTTPConnection(origin=Origin(scheme='https' host='www.google.com' port=443)))>
Traceback (most recent call last):
  File "/Users/iluxonchik/.local/share/virtualenvs/tmp-agwWamBd/lib/python3.8/site-packages/httpx/dispatch/connection_pool.py", line 120, in send
    response = await connection.send(
  File "/Users/iluxonchik/.local/share/virtualenvs/tmp-agwWamBd/lib/python3.8/site-packages/httpx/dispatch/connection.py", line 62, in send
    response = await self.h2_connection.send(request, timeout=timeout)
  File "/Users/iluxonchik/.local/share/virtualenvs/tmp-agwWamBd/lib/python3.8/site-packages/httpx/dispatch/http2.py", line 57, in send
    status_code, headers = await self.receive_response(stream_id, timeout)
  File "/Users/iluxonchik/.local/share/virtualenvs/tmp-agwWamBd/lib/python3.8/site-packages/httpx/dispatch/http2.py", line 176, in receive_response
    event = await self.receive_event(stream_id, timeout)
  File "/Users/iluxonchik/.local/share/virtualenvs/tmp-agwWamBd/lib/python3.8/site-packages/httpx/dispatch/http2.py", line 211, in receive_event
    data = await self.stream.read(self.READ_NUM_BYTES, timeout, flag=flag)
  File "/Users/iluxonchik/.local/share/virtualenvs/tmp-agwWamBd/lib/python3.8/site-packages/httpx/concurrency/asyncio.py", line 114, in read
    data = await asyncio.wait_for(self.stream_reader.read(n), read_timeout)
  File "/Users/iluxonchik/.pyenv/versions/3.8.0/lib/python3.8/asyncio/tasks.py", line 489, in wait_for
    await _cancel_and_wait(fut, loop=loop)
  File "/Users/iluxonchik/.pyenv/versions/3.8.0/lib/python3.8/asyncio/tasks.py", line 550, in _cancel_and_wait
    await waiter
asyncio.exceptions.CancelledError

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
File "demo.py", line 5, in print_content_from_url
    res = await client.get(url)
  File "/Users/iluxonchik/.local/share/virtualenvs/tmp-agwWamBd/lib/python3.8/site-packages/httpx/client.py", line 384, in get
    return await self.request(
  File "/Users/iluxonchik/.local/share/virtualenvs/tmp-agwWamBd/lib/python3.8/site-packages/httpx/client.py", line 626, in request
    response = await self.send(
  File "/Users/iluxonchik/.local/share/virtualenvs/tmp-agwWamBd/lib/python3.8/site-packages/httpx/client.py", line 650, in send
    return await self._get_response(
  File "/Users/iluxonchik/.local/share/virtualenvs/tmp-agwWamBd/lib/python3.8/site-packages/httpx/client.py", line 273, in _get_response
    return await get_response(request)
  File "/Users/iluxonchik/.local/share/virtualenvs/tmp-agwWamBd/lib/python3.8/site-packages/httpx/middleware/redirect.py", line 41, in __call__
    return await self(next_request, get_response)
  File "/Users/iluxonchik/.local/share/virtualenvs/tmp-agwWamBd/lib/python3.8/site-packages/httpx/middleware/redirect.py", line 31, in __call__
    response = await get_response(request)
  File "/Users/iluxonchik/.local/share/virtualenvs/tmp-agwWamBd/lib/python3.8/site-packages/httpx/client.py", line 227, in get_response
    response = await dispatch.send(
  File "/Users/iluxonchik/.local/share/virtualenvs/tmp-agwWamBd/lib/python3.8/site-packages/httpx/dispatch/connection_pool.py", line 124, in send
    self.active_connections.remove(connection)
  File "/Users/iluxonchik/.local/share/virtualenvs/tmp-agwWamBd/lib/python3.8/site-packages/httpx/dispatch/connection_pool.py", line 65, in remove
    del self.all[connection]
KeyError: HTTPConnection(origin=Origin(scheme='https' host='www.google.com' port=443))
Traceback (most recent call last):
  File "demo.py", line 13, in <module>
    asyncio.run(main())
  File "/Users/iluxonchik/.pyenv/versions/3.8.0/lib/python3.8/asyncio/runners.py", line 43, in run
    return loop.run_until_complete(main)
  File "/Users/iluxonchik/.pyenv/versions/3.8.0/lib/python3.8/asyncio/base_events.py", line 608, in run_until_complete
    return future.result()
  File "demo.py", line 11, in main
    await asyncio.gather(*(print_content_from_url(url, client) for url in urls))
  File "demo.py", line 5, in print_content_from_url
    res = await client.get(url)
  File "/Users/iluxonchik/.local/share/virtualenvs/tmp-agwWamBd/lib/python3.8/site-packages/httpx/client.py", line 384, in get
    return await self.request(
  File "/Users/iluxonchik/.local/share/virtualenvs/tmp-agwWamBd/lib/python3.8/site-packages/httpx/client.py", line 626, in request
    response = await self.send(
  File "/Users/iluxonchik/.local/share/virtualenvs/tmp-agwWamBd/lib/python3.8/site-packages/httpx/client.py", line 650, in send
    return await self._get_response(
  File "/Users/iluxonchik/.local/share/virtualenvs/tmp-agwWamBd/lib/python3.8/site-packages/httpx/client.py", line 273, in _get_response
    return await get_response(request)
  File "/Users/iluxonchik/.local/share/virtualenvs/tmp-agwWamBd/lib/python3.8/site-packages/httpx/middleware/redirect.py", line 41, in __call__
    return await self(next_request, get_response)
  File "/Users/iluxonchik/.local/share/virtualenvs/tmp-agwWamBd/lib/python3.8/site-packages/httpx/middleware/redirect.py", line 31, in __call__
    response = await get_response(request)
  File "/Users/iluxonchik/.local/share/virtualenvs/tmp-agwWamBd/lib/python3.8/site-packages/httpx/client.py", line 227, in get_response
    response = await dispatch.send(
  File "/Users/iluxonchik/.local/share/virtualenvs/tmp-agwWamBd/lib/python3.8/site-packages/httpx/dispatch/connection_pool.py", line 126, in send
    raise exc
  File "/Users/iluxonchik/.local/share/virtualenvs/tmp-agwWamBd/lib/python3.8/site-packages/httpx/dispatch/connection_pool.py", line 120, in send
    response = await connection.send(
  File "/Users/iluxonchik/.local/share/virtualenvs/tmp-agwWamBd/lib/python3.8/site-packages/httpx/dispatch/connection.py", line 62, in send
    response = await self.h2_connection.send(request, timeout=timeout)
  File "/Users/iluxonchik/.local/share/virtualenvs/tmp-agwWamBd/lib/python3.8/site-packages/httpx/dispatch/http2.py", line 57, in send
    status_code, headers = await self.receive_response(stream_id, timeout)
  File "/Users/iluxonchik/.local/share/virtualenvs/tmp-agwWamBd/lib/python3.8/site-packages/httpx/dispatch/http2.py", line 176, in receive_response
    event = await self.receive_event(stream_id, timeout)
  File "/Users/iluxonchik/.local/share/virtualenvs/tmp-agwWamBd/lib/python3.8/site-packages/httpx/dispatch/http2.py", line 211, in receive_event
    data = await self.stream.read(self.READ_NUM_BYTES, timeout, flag=flag)
  File "/Users/iluxonchik/.local/share/virtualenvs/tmp-agwWamBd/lib/python3.8/site-packages/httpx/concurrency/asyncio.py", line 114, in read
    data = await asyncio.wait_for(self.stream_reader.read(n), read_timeout)
  File "/Users/iluxonchik/.pyenv/versions/3.8.0/lib/python3.8/asyncio/tasks.py", line 483, in wait_for
    return fut.result()
  File "/Users/iluxonchik/.pyenv/versions/3.8.0/lib/python3.8/asyncio/streams.py", line 701, in read
    await self._wait_for_data('read')
  File "/Users/iluxonchik/.pyenv/versions/3.8.0/lib/python3.8/asyncio/streams.py", line 520, in _wait_for_data
    raise RuntimeError(
RuntimeError: read() called while another coroutine is already waiting for incoming data

So the origin of the issue seems to be here, since the read_timeoout gets set to 0.01. Although setting httpx.TimeoutConfig's fields to 100 does not solve the issue. I don't have time to investigate it further now. If you run the same example with a smaller list of urls (e.g. 3), the issue does not happen.

Perhaps this is some sort of a race condition? Or something OS dependent? I'm running macOS Catalina.

@florimondmanca
Copy link
Member

florimondmanca commented Nov 17, 2019

@iluxonchik Thanks for the detailed debugging material. As I wrote in a previous comment this does seem to be a race condition caused by HTTP/2 streams accessing the same asyncio stream. I pushed a fix in #535; if you'd like to check, you can install the patched version:

pip install "git+https://github.com/encode/httpx.git@fix/concurrent-read"

@iluxonchik
Copy link

@florimondmanca I didn't read everything above 😅 Yes, I can confirm that it's fixed!

However, now I have different issue (which randomly existed in the previous version as well). If you specify an httpx.ConfigTimeout() with a high value, such as httpx.ConfigTimeout(timeout=100), it raise a:

  File "/usr/local/Cellar/python/3.7.5/Frameworks/Python.framework/Versions/3.7/lib/python3.7/ssl.py", line 778, in unwrap
    return self._sslobj.shutdown()
ssl.SSLError: [SSL: KRB5_S_INIT] application data after close notify (_ssl.c:2629)

However, you seem to already be aware of this issue, so I won't be discussing this further here.

By the way, if I reduce the timeout to something like 10, I get the following exception:

File "/Users/iluxonchik/.local/share/virtualenvs/tmp-agwWamBd/lib/python3.7/site-packages/httpx/dispatch/http2.py", line 211, in receive_event
    data = await self.stream.read(self.READ_NUM_BYTES, timeout, flag=flag)
  File "/Users/iluconchik/.local/share/virtualenvs/tmp-agwWamBd/lib/python3.7/site-packages/httpx/concurrency/asyncio.py", line 121, in read
    raise ReadTimeout() from None
httpx.exceptions.ReadTimeout

Again, I don't have time to investigate this further now, otherwise I would have posted something more complete.

@PrimozGodec
Copy link
Contributor Author

PrimozGodec commented Nov 18, 2019

The read() error seems to be solved. I already go more responses when I run code from my example above. With the same script (code in an issue description) now I get (python 3.7):

Traceback (most recent call last):
  File "/Users/primoz/PycharmProjects/orange3-imageanalytics/example.py", line 57, in <module>
    asyncio.get_event_loop().run_until_complete(cl.embedd_batch())
  File "/Users/primoz/miniconda3/lib/python3.7/asyncio/base_events.py", line 584, in run_until_complete
    return future.result()
  File "/Users/primoz/PycharmProjects/orange3-imageanalytics/example.py", line 19, in embedd_batch
    embeddings = await asyncio.gather(*requests)
  File "/Users/primoz/PycharmProjects/orange3-imageanalytics/example.py", line 43, in _send_to_server
    emb = await self._send_request(client, im, url)
  File "/Users/primoz/PycharmProjects/orange3-imageanalytics/example.py", line 50, in _send_request
    response = await client.post(url, headers=headers, data=image)
  File "/Users/primoz/venv/orange/lib/python3.7/site-packages/httpx/client.py", line 492, in post
    trust_env=trust_env,
  File "/Users/primoz/venv/orange/lib/python3.7/site-packages/httpx/client.py", line 634, in request
    trust_env=trust_env,
  File "/Users/primoz/venv/orange/lib/python3.7/site-packages/httpx/client.py", line 658, in send
    trust_env=trust_env,
  File "/Users/primoz/venv/orange/lib/python3.7/site-packages/httpx/client.py", line 273, in _get_response
    return await get_response(request)
  File "/Users/primoz/venv/orange/lib/python3.7/site-packages/httpx/middleware/redirect.py", line 31, in __call__
    response = await get_response(request)
  File "/Users/primoz/venv/orange/lib/python3.7/site-packages/httpx/client.py", line 228, in get_response
    request, verify=verify, cert=cert, timeout=timeout
  File "/Users/primoz/venv/orange/lib/python3.7/site-packages/httpx/dispatch/connection_pool.py", line 126, in send
    raise exc
  File "/Users/primoz/venv/orange/lib/python3.7/site-packages/httpx/dispatch/connection_pool.py", line 121, in send
    request, verify=verify, cert=cert, timeout=timeout
  File "/Users/primoz/venv/orange/lib/python3.7/site-packages/httpx/dispatch/connection.py", line 62, in send
    response = await self.h2_connection.send(request, timeout=timeout)
  File "/Users/primoz/venv/orange/lib/python3.7/site-packages/httpx/dispatch/http2.py", line 49, in send
    stream_id = await self.send_headers(request, timeout)
  File "/Users/primoz/venv/orange/lib/python3.7/site-packages/httpx/dispatch/http2.py", line 121, in send_headers
    await self.stream.write(data_to_send, timeout)
  File "/Users/primoz/venv/orange/lib/python3.7/site-packages/httpx/concurrency/asyncio.py", line 149, in write
    self.stream_writer.drain(), timeout.write_timeout
  File "/Users/primoz/miniconda3/lib/python3.7/asyncio/tasks.py", line 416, in wait_for
    return fut.result()
  File "/Users/primoz/miniconda3/lib/python3.7/asyncio/streams.py", line 348, in drain
    await self._protocol._drain_helper()
  File "/Users/primoz/miniconda3/lib/python3.7/asyncio/streams.py", line 206, in _drain_helper
    assert waiter is None or waiter.cancelled()
AssertionError

@florimondmanca
Copy link
Member

Hmm, probably a similar problem but on the write side. I wasn’t able to naively reproduce it in a test for #535, but we could see if a write lock solves it.

@PrimozGodec
Copy link
Contributor Author

@florimondmanca I tested adding the lock around write and it solves the issue. :) Can you add that in #535?

I also discovered another issue that is not directly connected with this one. So I prepared a pull request with a fix #540.

@PrimozGodec
Copy link
Contributor Author

After both bugs are fixed I am able to get at least 100 responses (which is much more than before), but then everything stops. There are no new receive_events for about minute and then RedTimeout is triggered. I set a timeout to 60 seconds. I was trying to source the error but I couldn't find the reason for that. The error appears when I run the script from the issue description. Maybe this bug is for the next issue since it does not seems connected.

Traceback (most recent call last):
  File "/Users/primoz/PycharmProjects/orange3-imageanalytics/example.py", line 60, in <module>
    asyncio.get_event_loop().run_until_complete(cl.embedd_batch())
  File "/Users/primoz/miniconda3/envs/orange/lib/python3.7/asyncio/base_events.py", line 584, in run_until_complete
    return future.result()
  File "/Users/primoz/PycharmProjects/orange3-imageanalytics/example.py", line 20, in embedd_batch
    embeddings = await asyncio.gather(*requests)
  File "/Users/primoz/PycharmProjects/orange3-imageanalytics/example.py", line 45, in _send_to_server
    emb = await self._send_request(client, im, url)
  File "/Users/primoz/PycharmProjects/orange3-imageanalytics/example.py", line 52, in _send_request
    response = await client.post(url, headers=headers, data=image)
  File "/Users/primoz/miniconda3/envs/orange/lib/python3.7/site-packages/httpx/client.py", line 492, in post
    trust_env=trust_env,
  File "/Users/primoz/miniconda3/envs/orange/lib/python3.7/site-packages/httpx/client.py", line 634, in request
    trust_env=trust_env,
  File "/Users/primoz/miniconda3/envs/orange/lib/python3.7/site-packages/httpx/client.py", line 658, in send
    trust_env=trust_env,
  File "/Users/primoz/miniconda3/envs/orange/lib/python3.7/site-packages/httpx/client.py", line 273, in _get_response
    return await get_response(request)
  File "/Users/primoz/miniconda3/envs/orange/lib/python3.7/site-packages/httpx/middleware/redirect.py", line 31, in __call__
    response = await get_response(request)
  File "/Users/primoz/miniconda3/envs/orange/lib/python3.7/site-packages/httpx/client.py", line 242, in get_response
    await response.read()
  File "/Users/primoz/miniconda3/envs/orange/lib/python3.7/site-packages/httpx/models.py", line 996, in read
    self._content = b"".join([part async for part in self.stream()])
  File "/Users/primoz/miniconda3/envs/orange/lib/python3.7/site-packages/httpx/models.py", line 996, in <listcomp>
    self._content = b"".join([part async for part in self.stream()])
  File "/Users/primoz/miniconda3/envs/orange/lib/python3.7/site-packages/httpx/models.py", line 1007, in stream
    async for chunk in self.raw():
  File "/Users/primoz/miniconda3/envs/orange/lib/python3.7/site-packages/httpx/models.py", line 1035, in raw
    async for part in self._raw_stream:
  File "/Users/primoz/miniconda3/envs/orange/lib/python3.7/site-packages/httpx/dispatch/http2.py", line 197, in body_iter
    event = await self.receive_event(stream_id, timeout)
  File "/Users/primoz/miniconda3/envs/orange/lib/python3.7/site-packages/httpx/dispatch/http2.py", line 211, in receive_event
    data = await self.stream.read(self.READ_NUM_BYTES, timeout, flag=flag)
  File "/Users/primoz/miniconda3/envs/orange/lib/python3.7/site-packages/httpx/concurrency/asyncio.py", line 122, in read
    raise ReadTimeout() from None
httpx.exceptions.ReadTimeout

@yeraydiazdiaz
Copy link
Contributor

Using @florimondmanca's example I'm most of the time hitting the SSL error which seems the same as the one fixed by @JayH5. This happens on Python 3.8.0 and 3.7.4, both of which mentioned as fixed upstream in a similar aiohttp issue.

Sometimes though I do get the ReadTimeout error described above, it's hard to debug though as the SSLError is a lot more common.

@florimondmanca
Copy link
Member

@PrimozGodec I managed to reproduce the ReadTimeout error. At first it happened quite often, but then it happened on very rare occasions (less than 1 in 10), and now almost never anymore. 😕 Same behavior on 3.7.5 and 3.8.0. Could it be a server-side problem (Google or example.com not correctly handling the concurrency)?

Also, I don't have the SSL errors @yeraydiazdiaz is experiencing.

@florimondmanca
Copy link
Member

florimondmanca commented Nov 19, 2019

@PrimozGodec

Can you add that in #535?

I still haven't managed to reproduce the issue locally, and I'm not sure in what situation exactly it happens (which means I don't know yet how to add a test for it). So I suggest treating the write issue separate from #535.

@PrimozGodec
Copy link
Contributor Author

I still haven't managed to reproduce the issue locally, and I'm not sure in what situation exactly it happens (which means I don't know yet how to add a test for it). So I suggest treating the write issue separate from #535.

Should I open another issue about this? I also cannot reproduce this issue otherwise than with my script on images.

@PrimozGodec
Copy link
Contributor Author

@PrimozGodec I managed to reproduce the ReadTimeout error. At first it happened quite often, but then it happened on very rare occasions (less than 1 in 10), and now almost never anymore. 😕 Same behavior on 3.7.5 and 3.8.0. Could it be a server-side problem (Google or example.com not correctly handling the concurrency)?

I will check the server if it reports anything.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working concurrency Issues related to concurrency and usage of async libraries http/2 Issues and PRs related to HTTP/2
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants