Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Asyncio client creates RuntimeError on parallel requests #245

Open
hvraven opened this issue Mar 1, 2024 · 7 comments
Open

Asyncio client creates RuntimeError on parallel requests #245

hvraven opened this issue Mar 1, 2024 · 7 comments
Labels
enhancement New feature or request

Comments

@hvraven
Copy link

hvraven commented Mar 1, 2024

I am working on an app where I want make multiple parallel requests to a server. As some of these request require time for processing the client waits for multiple replies in parallel. In this situation (at least) the asyncio code crashes with a RuntimeError: read() called while another coroutine is already waiting for incoming data.

The error can be easily replicated by changing the asyncio_echo example slightly. Instead of the single call to client.echo parallel calls can be made using asyncio.gather. The server already introduces a slight delay, triggering the bug.

async def main():
    client = await make_aio_client(
        echo_thrift.EchoService, '127.0.0.1', 6000)

    # prints hello, world
    print(await client.echo('hello, world'))
    # raises RuntimeError
    print(await asyncio.gather(
        client.echo('hello, world'), client.echo('hello, world')
    ))
    client.close()

The code at least needs a lock somewhere to block the parallel reads. This would however prevent parallelised calls to the server. I am also happy to look into a solution for this, but would like to get some pointers on where to start.

Full error message:

hello, world
Traceback (most recent call last):
  File "thriftpy2/examples/asyncio_echo/client.py", line 22, in <module>
    asyncio.run(main())
  File "/usr/lib/python3.12/asyncio/runners.py", line 194, in run
    return runner.run(main)
           ^^^^^^^^^^^^^^^^
  File "/usr/lib/python3.12/asyncio/runners.py", line 118, in run
    return self._loop.run_until_complete(task)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/lib/python3.12/asyncio/base_events.py", line 685, in run_until_complete
    return future.result()
           ^^^^^^^^^^^^^^^
  File "thriftpy2/examples/asyncio_echo/client.py", line 15, in main
    print(await asyncio.gather(
          ^^^^^^^^^^^^^^^^^^^^^
  File "thriftpy2/thriftpy2/contrib/aio/client.py", line 40, in _req
    return await self._recv(_api)
           ^^^^^^^^^^^^^^^^^^^^^^
  File "thriftpy2/thriftpy2/contrib/aio/client.py", line 54, in _recv
    fname, mtype, rseqid = await self._iprot.read_message_begin()
                           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "thriftpy2/thriftpy2/contrib/aio/protocol/binary.py", line 258, in read_message_begin
    api, ttype, seqid = await read_message_begin(
                        ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "thriftpy2/thriftpy2/contrib/aio/protocol/binary.py", line 27, in read_message_begin
    sz = unpack_i32(await inbuf.read(4))
                    ^^^^^^^^^^^^^^^^^^^
  File "thriftpy2/thriftpy2/contrib/aio/transport/base.py", line 41, in read
    return await readall(self._read, sz)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "thriftpy2/thriftpy2/contrib/aio/transport/base.py", line 12, in readall
    chunk = await read_fn(sz - have)
            ^^^^^^^^^^^^^^^^^^^^^^^^
  File "thriftpy2/thriftpy2/contrib/aio/transport/buffered.py", line 37, in _read
    buf = await self._trans.read(max(rest_len, self._buf_size))
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "thriftpy2/thriftpy2/contrib/aio/socket.py", line 169, in read
    buff = await asyncio.wait_for(
           ^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/lib/python3.12/asyncio/tasks.py", line 520, in wait_for
    return await fut
           ^^^^^^^^^
  File "/usr/lib/python3.12/asyncio/streams.py", line 713, in read
    await self._wait_for_data('read')
  File "/usr/lib/python3.12/asyncio/streams.py", line 531, in _wait_for_data
    raise RuntimeError(
RuntimeError: read() called while another coroutine is already waiting for incoming data

@aisk
Copy link
Member

aisk commented Mar 1, 2024

I think this is similar to the non-async client. If you create a non-async client and use multiple threads to call it, there would be similar exceptions.

For both the async and non-async client usage, I think the users should have a high-level wrapper, like a connection pool, to access it. Maybe this high-level wrapper should be built in with thriftpy2.

Finally, we should mention this in the documentation.

@aisk aisk added the enhancement New feature or request label Mar 1, 2024
@hvraven
Copy link
Author

hvraven commented Mar 1, 2024

A note in the documentation would be great, yes.

I am currently not quite sure how the connection pool should look like, but I am also not very familiar with the message format of thrift. Every packet appears to have a seqid, which I assume can be used to track which reply belongs to which request, is this correct?

In this case I would extend the TAsyncClient to have a single reader thread, assigning replies to the open requests as they come in. At least for the asyncio implementation it would be great to have this code in the library. I will start building my own client extension on top of this and then add a PR once I am happy with it.

@aisk
Copy link
Member

aisk commented Mar 1, 2024

Every packet appears to have a seqid, which I assume can be used to track which reply belongs to which request, is this correct?

This is not a thrift protocol issue, but a TCP issue. TCP is stream-oriented, not packet-oriented. When you try to send or receive a thrift packet, TCP does not ensure that it will process the whole packet, so you may send or receive part of a packet sequentially. Thus, the seqid of this packet can't be used here.

In this case I would extend the TAsyncClient to have a single reader thread, assigning replies to the open requests as they come in.

It's more complicated than that. If you create a single reader, the remote server may close the underlying TCP connection immediately after processing one request, or after some idle time, or for any other reason. This depends on how the server's code is implemented.

I suggest that, if performance is not a concern, you had better create a new client every time before sending a request, and close it after receiving the response. This is the simplest way to avoid the complexity.

Or, you should Google or ask ChatGPT how to implement a connection pool, as it may involve some complex policies to adapt to common usage. So I advise updating the documentation, or implementing an example connection pool in the documentation, before we add it to the code base.

@hvraven
Copy link
Author

hvraven commented Mar 1, 2024

Every packet appears to have a seqid, which I assume can be used to track which reply belongs to which request, is this correct?

This is not a thrift protocol issue, but a TCP issue. TCP is stream-oriented, not packet-oriented. When you try to send or receive a thrift packet, TCP does not ensure that it will process the whole packet, so you may send or receive part of a packet sequentially. Thus, the seqid of this packet can't be used here.

I am not saying anywhere, that it is a protocol issue, but rather asking if the seqid can be reliably used to track requests (as I found out by testing myself: yes).

This is not a TCP problem, rather an asyncio problem. Asyncio always operates on streams and handles TCP fragmentation internally. In fact I built a custom protocol layer using a serial link and get the same error behaviour. Especially in asyncio environments it is expected that parallel requests just work (worst case serialising using locks), as that's the whole point of the asyncio abstraction layer.

In this case I would extend the TAsyncClient to have a single reader thread, assigning replies to the open requests as they come in.

It's more complicated than that. If you create a single reader, the remote server may close the underlying TCP connection immediately after processing one request, or after some idle time, or for any other reason. This depends on how the server's code is implemented.

Fair point, but in that case the current client will also behave pretty badly and require constant manual requests.

I suggest that, if performance is not a concern, you had better create a new client every time before sending a request, and close it after receiving the response. This is the simplest way to avoid the complexity.

Or, you should Google or ask ChatGPT how to implement a connection pool, as it may involve some complex policies to adapt to common usage. So I advise updating the documentation, or implementing an example connection pool in the documentation, before we add it to the code base.

I would recommend being a bit more open to bug reports you receive and not assuming the worst about the people informing about and trying to fix problems in your project.

I now implemented my own parallelism by extending TAsyncClient with a single reader assigning replies to different requests using the seqid and Futures. Below my code in case anyone else hits the same problem in the future. It involves quite some copy & paste from the default TAsyncClient.

import asyncio
import logging

from thriftpy2.contrib.aio.client import TAsyncClient
from thriftpy2.thrift import TApplicationException, TMessageType, args_to_kwargs

log = logging.getLogger(__name__)


class TParallelAsyncClient(TAsyncClient):
    def __init__(self, service, iprot, oprot=None):
        super().__init__(service, iprot, oprot)
        self._open_requests: dict[int, asyncio.Future] = {}
        self._message_processor: asyncio.Task | None = None

    async def _req(self, _api, *args, **kwargs):
        try:
            service_args = getattr(self._service, _api + "_args")
            kwargs = args_to_kwargs(service_args.thrift_spec, *args, **kwargs)
        except ValueError as e:
            raise TApplicationException(
                TApplicationException.UNKNOWN_METHOD,
                "missing required argument {arg} for {service}.{api}".format(
                    arg=e.args[0], service=self._service.__name__, api=_api
                ),
            )

        fut = await self._send(_api, **kwargs)
        if fut is not None:
            self._process_messages()
            return await fut

    async def _send(self, _api, **kwargs) -> asyncio.Future | None:
        oneway = getattr(getattr(self._service, _api + "_result"), "oneway")
        msg_type = TMessageType.ONEWAY if oneway else TMessageType.CALL
        seq_id = self._get_seqid()
        self._oprot.write_message_begin(_api, msg_type, seq_id)
        args = getattr(self._service, _api + "_args")()
        for k, v in kwargs.items():
            setattr(args, k, v)
        self._oprot.write_struct(args)
        self._oprot.write_message_end()
        await self._oprot.trans.flush()
        log.debug("Sent seqid %d: %s", seq_id, _api)

        if oneway:
            return None
        else:
            self._open_requests[seq_id] = asyncio.Future()
            return self._open_requests[seq_id]

    def _process_messages(self):
        if self._message_processor is None or self._message_processor.done():
            self._message_processor = asyncio.create_task(self._message_handler())

    async def _message_handler(self):
        while self._open_requests:
            fname, mtype, rseqid = await self._iprot.read_message_begin()
            log.debug("Reply for seqid %d: %s %s", rseqid, fname, mtype)
            fut = self._open_requests.pop(rseqid, None)
            if fut is None:
                log.error("Received message with unknown seqid %d", rseqid)

            try:
                fut.set_result(await self._process_message(fname, mtype))
            except Exception as e:
                fut.set_exception(e)

    async def _process_message(self, fname, mtype):
        """process a single message"""
        if mtype == TMessageType.EXCEPTION:
            x = TApplicationException()
            await self._iprot.read_struct(x)
            await self._iprot.read_message_end()
            raise x
        result = getattr(self._service, fname + "_result")()
        await self._iprot.read_struct(result)
        await self._iprot.read_message_end()

        if hasattr(result, "success") and result.success is not None:
            return result.success

        # void api without throws
        if len(result.thrift_spec) == 0:
            return

        # check throws
        for k, v in result.__dict__.items():
            if k != "success" and v:
                raise v

        # no throws & not void api
        if hasattr(result, "success"):
            raise TApplicationException(TApplicationException.MISSING_RESULT)

    def _get_seqid(self) -> int:
        seq_id = self._seqid
        self._seqid += 1
        return seq_id

    def close(self):
        if self._message_processor is not None and not self._message_processor.done():
            self._message_processor.cancel()
            self._message_processor = None
        super().close()

@aisk
Copy link
Member

aisk commented Mar 1, 2024

I would recommend being a bit more open to bug reports you receive and not assuming the worst about the people informing about and trying to fix problems in your project.

I'm not good at English, and all the comments I made above were translated by ChatGPT. I'm reading your comments via ChatGPT too, so there may be some contextual misunderstandings, but I have no assumptions about any people. The suggestions above (use short connections or write your own connection pool) were also made to my colleagues.

@hvraven
Copy link
Author

hvraven commented Mar 1, 2024

Ok, sorry for my harsh response. For me ChatGPT is a useful tool, but not for programming, in general I got the impression of a "you should know that, don't ask me about the basic things". But I might have read too much into that, and translations can easily lead to these types of problems.

Regarding the actual problem. The asyncio code is in a contrib folder, so I assume it was provided by someone else? I can create a PR adding the changes above the the TAsyncClient class. This allows multiplexing of multiple requests over a single connection (as long as that one is alive, but no changes there). It has one part I don't like, and that's the spawning of an extra task to handle the messages. It should be shut down properly all the time, but errors within this task can not be properly handled, as there is no caller. If you (or anyone else) have a suggestion on how to deal with this situation I would be glad.

@aisk
Copy link
Member

aisk commented Mar 1, 2024

The whole project is community contribution, and I'll check the codes you provide above tomorrow or a few days latter, thank for reporting this!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

2 participants