Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Write error on HTTP/2: "AssertionError: assert waiter is None or waiter.cancelled()" #551

Closed
PrimozGodec opened this issue Nov 25, 2019 · 6 comments · Fixed by #699
Closed
Labels
bug Something isn't working http/2 Issues and PRs related to HTTP/2

Comments

@PrimozGodec
Copy link
Contributor

Since #527 is now closed I am opening a new issue. We already discussed this error in #527 but we could not find a simple test that would reproduce the issue.

Issue When I run a code appended below I get the following error.

Traceback (most recent call last):
  File "/Users/primoz/PycharmProjects/orange3-imageanalytics/example.py", line 57, in <module>
    asyncio.get_event_loop().run_until_complete(cl.embedd_batch())
  File "/Users/primoz/miniconda3/lib/python3.7/asyncio/base_events.py", line 584, in run_until_complete
    return future.result()
  File "/Users/primoz/PycharmProjects/orange3-imageanalytics/example.py", line 19, in embedd_batch
    embeddings = await asyncio.gather(*requests)
  File "/Users/primoz/PycharmProjects/orange3-imageanalytics/example.py", line 43, in _send_to_server
    emb = await self._send_request(client, im, url)
  File "/Users/primoz/PycharmProjects/orange3-imageanalytics/example.py", line 50, in _send_request
    response = await client.post(url, headers=headers, data=image)
  File "/Users/primoz/venv/orange/lib/python3.7/site-packages/httpx/client.py", line 492, in post
    trust_env=trust_env,
  File "/Users/primoz/venv/orange/lib/python3.7/site-packages/httpx/client.py", line 634, in request
    trust_env=trust_env,
  File "/Users/primoz/venv/orange/lib/python3.7/site-packages/httpx/client.py", line 658, in send
    trust_env=trust_env,
  File "/Users/primoz/venv/orange/lib/python3.7/site-packages/httpx/client.py", line 273, in _get_response
    return await get_response(request)
  File "/Users/primoz/venv/orange/lib/python3.7/site-packages/httpx/middleware/redirect.py", line 31, in __call__
    response = await get_response(request)
  File "/Users/primoz/venv/orange/lib/python3.7/site-packages/httpx/client.py", line 228, in get_response
    request, verify=verify, cert=cert, timeout=timeout
  File "/Users/primoz/venv/orange/lib/python3.7/site-packages/httpx/dispatch/connection_pool.py", line 126, in send
    raise exc
  File "/Users/primoz/venv/orange/lib/python3.7/site-packages/httpx/dispatch/connection_pool.py", line 121, in send
    request, verify=verify, cert=cert, timeout=timeout
  File "/Users/primoz/venv/orange/lib/python3.7/site-packages/httpx/dispatch/connection.py", line 62, in send
    response = await self.h2_connection.send(request, timeout=timeout)
  File "/Users/primoz/venv/orange/lib/python3.7/site-packages/httpx/dispatch/http2.py", line 49, in send
    stream_id = await self.send_headers(request, timeout)
  File "/Users/primoz/venv/orange/lib/python3.7/site-packages/httpx/dispatch/http2.py", line 121, in send_headers
    await self.stream.write(data_to_send, timeout)
  File "/Users/primoz/venv/orange/lib/python3.7/site-packages/httpx/concurrency/asyncio.py", line 149, in write
    self.stream_writer.drain(), timeout.write_timeout
  File "/Users/primoz/miniconda3/lib/python3.7/asyncio/tasks.py", line 416, in wait_for
    return fut.result()
  File "/Users/primoz/miniconda3/lib/python3.7/asyncio/streams.py", line 348, in drain
    await self._protocol._drain_helper()
  File "/Users/primoz/miniconda3/lib/python3.7/asyncio/streams.py", line 206, in _drain_helper
    assert waiter is None or waiter.cancelled()
AssertionError

It seems that there is a lock required around the write function:

await asyncio.wait_for( # type: ignore
self.stream_writer.drain(), timeout.write_timeout
)

same that it is done #535 for the read.

Code to reproduce the error

import asyncio
import pickle

import httpx


class SendRequests:
    num_parallel_requests = 0
    MAX_PARALLEL = 100

    async def embedd_batch(self):
        requests = []
        async with httpx.AsyncClient(
                timeout=httpx.TimeoutConfig(timeout=60),
                base_url="https://api.garaza.io/") as client:
            for i in range(1000):
                requests.append(self._send_to_server(client))

            embeddings = await asyncio.gather(*requests)

        return embeddings

    async def __wait_until_released(self):
        while self.num_parallel_requests >= self.MAX_PARALLEL:
            await asyncio.sleep(0.1)

    async def _send_to_server(self, client):
        await self.__wait_until_released()

        self.num_parallel_requests += 1
        # simplified image loading 
        with open("image.pkl", "rb") as f:
            im = pickle.load(f)
        url = "/image/inception-v3?machine=1&session=1&retry=0"
        emb = await self._send_request(client, im, url)
        self.num_parallel_requests -= 1
        return emb

    async def _send_request(self, client, image, url):
        headers = {'Content-Type': 'image/jpeg',
                   'Content-Length': str(len(image))}
        response = await client.post(url, headers=headers, data=image)
        print(response.content)
        return response


if __name__ == "__main__":
    cl = SendRequests()
    asyncio.get_event_loop().run_until_complete(cl.embedd_batch())

The pickle file required in the code snippet is here:
image.pkl.zip

@yeraydiazdiaz
Copy link
Contributor

Hi @PrimozGodec, I cannot reproduce this issue with a stripped down version of your code.

import asyncio
from pathlib import Path

import httpx


async def send_requests(path: Path) -> list:
    requests = []
    async with httpx.Client(
        timeout=httpx.TimeoutConfig(timeout=60), base_url="http://localhost:5000/"
    ) as client:
        for _ in range(1000):
            requests.append(_send_to_server(client, path))

        results = await asyncio.gather(*requests)

    return results


async def _send_to_server(client: httpx.Client, path: Path) -> httpx.Response:
    # this should probably be done elsewhere
    with open(path, "rb") as fd:
        payload = fd.read()
        size = len(payload)
    headers = {"Content-Type": "application/octet-stream", "Content-Length": str(size)}
    response = await client.post("/", headers=headers, data=payload)
    print(response.json())
    return response


if __name__ == "__main__":
    path = Path(__file__).parent / 'image.pkl.zip'
    asyncio.run(send_requests(path))
Hitting a very simple Quart server.
from quart import Quart, request

app = Quart(__name__)


@app.route("/", methods=["POST"])
async def post() -> dict:
    data = await request.get_data()
    return {"size": len(data)}


if __name__ == "__main__":
    app.run(host="0.0.0.0", port=5000)

I'm using the same image.pkl.zip file that you posted and encountered no errors on master. I did encounter the PoolTimeout error in 0.8.0 described in #557 and fixed via #563.

Maybe you could try again using the master branch?

@tomchristie
Copy link
Member

For housekeeping I'll close this, pending confirmation.

@PrimozGodec
Copy link
Contributor Author

I can confirm that this issue is fixed with the master branch. Probably it is the consequence of one of the changes.

@PrimozGodec
Copy link
Contributor Author

I said it too fast. The problem was that I didn't notice that HTTPX switched to HTTP1.1 as default. :( The issue still persists.

Since I had problems to set up HTTP2 local server I changed @yeraydiazdiaz example such that it communicates with our server (which is HTTP2 compatible).

import asyncio
import pickle
from pathlib import Path

import httpx


MAX_R = 0

async def send_requests(path: Path) -> list:
    requests = []
    async with httpx.Client(
        timeout=httpx.TimeoutConfig(timeout=60), base_url="https://api.garaza.io/",
        http2=True
    ) as client:
        for _ in range(1000):
            requests.append(_send_to_server(client, path))

        results = await asyncio.gather(*requests)

    return results

async def _send_to_server(client: httpx.Client, path: Path) -> httpx.Response:
    # this should probably be done elsewhere
    with open(path, "rb") as f:
        payload = pickle.load(f)
        size = len(payload)
    headers = {"Content-Type": 'image/jpeg', "Content-Length": str(size)}
    response = await client.post(
        "/image/inception-v3?machine=1&session=1&retry=0", headers=headers, data=payload)
    print(response.http_version)
    print(response.json())
    return response


if __name__ == "__main__":
    path = Path(__file__).parent / 'image.pkl'
    asyncio.run(send_requests(path))

I changed:

  • url adresses
  • upickling the image
  • adding http2=True to the client

After those changes code still works correctly. The issue appears when I add my custom waiting strategy:

import asyncio
import pickle
from pathlib import Path

import httpx


MAX_R = 0

async def send_requests(path: Path) -> list:
    requests = []
    async with httpx.Client(
        timeout=httpx.TimeoutConfig(timeout=60), base_url="https://api.garaza.io/",
        http2=True
    ) as client:
        for _ in range(1000):
            requests.append(_send_to_server(client, path))

        results = await asyncio.gather(*requests)

    return results

async def __wait_until_released():
    while MAX_R >= 100:
        await asyncio.sleep(0.1)

async def _send_to_server(client: httpx.Client, path: Path) -> httpx.Response:
    await __wait_until_released()
    global MAX_R
    MAX_R += 1
    with open(path, "rb") as f:
        payload = pickle.load(f)
        size = len(payload)
    headers = {"Content-Type": 'image/jpeg', "Content-Length": str(size)}
    response = await client.post(
        "/image/inception-v3?machine=1&session=1&retry=0", headers=headers, data=payload)
    print(response.http_version)
    print(response.json())
    MAX_R -= 1
    return response


if __name__ == "__main__":
    path = Path(__file__).parent / 'image.pkl'
    asyncio.run(send_requests(path))

This functionality makes sure that the client never sends more than 100 requests to the server. I know that it is not necessary for HTTPX but I need that functionality to prevent the client from loading too many images in the memory at once. The loading functionality is omitted here since it is not required to reproduce the error.

It is also possible that waiting functionality is not good. Do you have any idea how to change it or what is the problem here?

The error stops appearing when I add the lock around write as it is described in the issue description.

@tomchristie tomchristie reopened this Dec 3, 2019
@tomchristie tomchristie added the http/2 Issues and PRs related to HTTP/2 label Dec 3, 2019
@tomchristie tomchristie changed the title asyncio: write error write error on HTTP/2 Dec 3, 2019
@tomchristie
Copy link
Member

Expect this to have been closed in master via #612, but happy to reopen if I've made an incorrect assessment there.

@PrimozGodec
Copy link
Contributor Author

Sorry, but it is not fixed with #612 the issue still persists. The example from the above still fails with the same error as before. I can not reopen the issue since it was not closed by me.

@tomchristie tomchristie reopened this Dec 8, 2019
@florimondmanca florimondmanca changed the title write error on HTTP/2 Write error on HTTP/2: "AssertionError: assert waiter is None or waiter.cancelled()" Dec 9, 2019
@tomchristie tomchristie added the bug Something isn't working label Dec 12, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working http/2 Issues and PRs related to HTTP/2
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants