Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

"Assertion `loop->watchers[w->fd] == w' failed." with multiprocessing and OS pipes #317

Closed
momocow opened this issue Feb 20, 2020 · 16 comments
Labels

Comments

@momocow
Copy link

momocow commented Feb 20, 2020

Hi, I encountered the libuv assertion error while running multiprocessing and communicating with child processes via OS pipes.

I've prepared a minimal reproducible case.

I'm using kchmck/aiopipe to create pipes, which actually wraps file descriptors from os.pipe() into asyncio StreamReaders/StreamWrites via loop.connect_read_pipe() and loop.connect_write_pipe().

Here, I have decapsulated wrapper classes from aiopipe into functions in the aiopipe_decap.py for easier code tracing.

# aiopipe_decap.py

import asyncio
import os
from asyncio import (BaseTransport, StreamReader, StreamReaderProtocol,
                     StreamWriter, get_running_loop)
from contextlib import asynccontextmanager, contextmanager


async def _open_reader(fd):
    rx = StreamReader()
    transport, _ = await get_running_loop().connect_read_pipe(
        lambda: StreamReaderProtocol(rx),
        os.fdopen(fd))
    return transport, rx


async def _open_writer(fd):
    rx = StreamReader()
    transport, proto = await get_running_loop().connect_write_pipe(
        lambda: StreamReaderProtocol(rx),
        os.fdopen(fd, "w"))
    tx = StreamWriter(transport, proto, rx, get_running_loop())
    return transport, tx


@asynccontextmanager
async def open_stream(fd, mode):
    transport, stream = await _open_reader(fd) if mode == "r" \
        else await _open_writer(fd)

    try:
        yield stream
    finally:
        try:
            transport.close()
        except OSError:
            # The transport/protocol sometimes closes the fd before this is reached.
            pass

        # Allow event loop callbacks to run and handle closed transport.
        await asyncio.sleep(0)

@contextmanager
def detach(fd):
    os.set_inheritable(fd, True)
    try:
        yield
    finally:
        os.close(fd)

And here is the reproducible snippet, which creates a pipe connection between a child process and the master process. Once the child starts up, it will start a loop to send messages back to the master.

You can use the following environment variables to control some of the behaviors in this snippet.

  • U: enable uvloop if this variable is set (default: False)
  • R: number of messages to send (default: 1)
  • M: content of messages (default: "a")
# uv.py

import asyncio
import os
from multiprocessing import Process

from aiopipe_decap import *
import uvloop


async def child_task(fd, message, repeat):
    async with open_stream(fd, "w") as tx:
        for i in range(repeat):
            tx.write(message)
            await tx.drain()
        tx.write_eof()


def child_main(fd, message, repeat):
    asyncio.run(child_task(fd, message, repeat))


async def main(*, message=b"a", repeat=1):
    rfd, tfd = os.pipe()

    with detach(tfd):
        proc = Process(target=child_main, args=(tfd, message, repeat))
        proc.start()

    count = 0
    async with open_stream(rfd, "r") as rx:
        while True:
            msg = await rx.read(1)
            if not msg:
                break
            count += 1
            assert msg == message
    assert count == repeat


if __name__ == "__main__":
    if os.getenv("U", ""):
        uvloop.install()
    rp = int(os.getenv("R", "1"))
    msg = os.getenv("M", "a").encode()
    asyncio.run(main(message=msg, repeat=rp))

Here's my result on my NAS (in Ubuntu 16.04 container). I found the assertion error related to the number of repeated times, the more the number is, the more chance to trigger the error. In the log shows that 9216 is a magic number but I doubt it's depending on different environments.

(venv) $ U=1 R=9210 python uv.py
(venv) $ U=1 R=9211 python uv.py
(venv) $ U=1 R=9212 python uv.py
(venv) $ U=1 R=9213 python uv.py
(venv) $ U=1 R=9214 python uv.py
(venv) $ U=1 R=9215 python uv.py
(venv) $ U=1 R=9216 python uv.py
(venv) $ U=1 R=9217 python uv.py
python: src/unix/core.c:932: uv__io_stop: Assertion `loop->watchers[w->fd] == w' failed.
Traceback (most recent call last):
  File "uv.py", line 85, in <module>
    asyncio.run(main(message=msg, repeat=rp))
  File "/usr/lib/python3.7/asyncio/runners.py", line 43, in run
    return loop.run_until_complete(main)
  File "uvloop/loop.pyx", line 1456, in uvloop.loop.Loop.run_until_complete
  File "uv.py", line 77, in main
    assert count == repeat
AssertionError
(venv) $ U=1 R=9218 python uv.py
python: src/unix/core.c:932: uv__io_stop: Assertion `loop->watchers[w->fd] == w' failed.
Traceback (most recent call last):
  File "uv.py", line 85, in <module>
    asyncio.run(main(message=msg, repeat=rp))
  File "/usr/lib/python3.7/asyncio/runners.py", line 43, in run
    return loop.run_until_complete(main)
  File "uvloop/loop.pyx", line 1456, in uvloop.loop.Loop.run_until_complete
  File "uv.py", line 77, in main
    assert count == repeat
AssertionError
(venv) $ U=1 R=9219 python uv.py
python: src/unix/core.c:932: uv__io_stop: Assertion `loop->watchers[w->fd] == w' failed.
Traceback (most recent call last):
  File "uv.py", line 85, in <module>
    asyncio.run(main(message=msg, repeat=rp))
  File "/usr/lib/python3.7/asyncio/runners.py", line 43, in run
    return loop.run_until_complete(main)
  File "uvloop/loop.pyx", line 1456, in uvloop.loop.Loop.run_until_complete
  File "uv.py", line 77, in main
    assert count == repeat
AssertionError
(venv) $ U=1 R=9220 python uv.py
python: src/unix/core.c:932: uv__io_stop: Assertion `loop->watchers[w->fd] == w' failed.
Traceback (most recent call last):
  File "uv.py", line 85, in <module>
    asyncio.run(main(message=msg, repeat=rp))
  File "/usr/lib/python3.7/asyncio/runners.py", line 43, in run
    return loop.run_until_complete(main)
  File "uvloop/loop.pyx", line 1456, in uvloop.loop.Loop.run_until_complete
  File "uv.py", line 77, in main
    assert count == repeat
AssertionError

It works like a charm with vanilla asyncio.

(venv) $ R=9210 python uv.py
(venv) $ R=9211 python uv.py
(venv) $ R=9212 python uv.py
(venv) $ R=9213 python uv.py
(venv) $ R=9214 python uv.py
(venv) $ R=9215 python uv.py
(venv) $ R=9216 python uv.py
(venv) $ R=9217 python uv.py
(venv) $ R=9218 python uv.py
(venv) $ R=9219 python uv.py
(venv) $ R=9220 python uv.py

With even larger number of repeated times.

  • R=100K
(venv) $ R=100000 python uv.py
(venv) $ U=1 R=100000 python uv.py
python: src/unix/core.c:932: uv__io_stop: Assertion `loop->watchers[w->fd] == w' failed.
Traceback (most recent call last):
  File "uv.py", line 44, in <module>
    asyncio.run(main(message=msg, repeat=rp))
  File "/usr/lib/python3.7/asyncio/runners.py", line 43, in run
    return loop.run_until_complete(main)
  File "uvloop/loop.pyx", line 1456, in uvloop.loop.Loop.run_until_complete
  File "uv.py", line 36, in main
    assert count == repeat
AssertionError
  • R=1M
(venv) $ R=1000000 python uv.py
(venv) $ U=1 R=1000000 python uv.py
python: src/unix/core.c:932: uv__io_stop: Assertion `loop->watchers[w->fd] == w' failed.
Traceback (most recent call last):
  File "uv.py", line 44, in <module>
    asyncio.run(main(message=msg, repeat=rp))
  File "/usr/lib/python3.7/asyncio/runners.py", line 43, in run
    return loop.run_until_complete(main)
  File "uvloop/loop.pyx", line 1456, in uvloop.loop.Loop.run_until_complete
  File "uv.py", line 36, in main
    assert count == repeat
AssertionError
  • uvloop version: 0.14.0
  • Python version: Python 3.7.3
  • Platform: Linux-4.2.8-x86_64-with-debian-stretch-sid'
  • Can you reproduce the bug with PYTHONASYNCIODEBUG in env?: Yes
(venv) $ PYTHONASYNCIODEBUG=1 U=1 R=100000 python uv.py
python: src/unix/core.c:932: uv__io_stop: Assertion `loop->watchers[w->fd] == w' failed.
Traceback (most recent call last):
  File "uv.py", line 44, in <module>
    asyncio.run(main(message=msg, repeat=rp))
  File "/usr/lib/python3.7/asyncio/runners.py", line 43, in run
    return loop.run_until_complete(main)
  File "uvloop/loop.pyx", line 1456, in uvloop.loop.Loop.run_until_complete
  File "uv.py", line 36, in main
    assert count == repeat
AssertionError
  • Does uvloop behave differently from vanilla asyncio? How?: Vanilla asyncio works like a charm while uvloop raise the assertion error.
@1st1
Copy link
Member

1st1 commented Feb 20, 2020

Thanks so much! I'll take a look in a few weeks when it's time for uvloop 0.15. If anyone wants to further debug & work on fix please go ahead!

@fantix
Copy link
Member

fantix commented Apr 19, 2020

This seems to be a pretty serious issue - when the buffer is full writing to a pipe, this is 100% reproducible:

import asyncio
from asyncio.subprocess import PIPE

import uvloop


async def main():
    proc = await asyncio.create_subprocess_shell('sleep 3600', stdin=PIPE)
    while True:
        proc.stdin.write(b'x' * 32768)
        await proc.stdin.drain()


if __name__ == '__main__':
    uvloop.install()
    asyncio.run(main())

The error is the same:

Assertion failed: (loop->watchers[w->fd] == w), function uv__io_stop, file src/unix/core.c, line 932.
Abort trap: 6

Before we have a proper fix from libuv (I'll create a PR in libuv when time, refs libuv/libuv#2058), we can easily fix uvloop by partially reverting d8fe153 and adding this patch:

diff --git a/uvloop/handles/pipe.pyx b/uvloop/handles/pipe.pyx
index 581554f..7a2c8ec 100644
--- a/uvloop/handles/pipe.pyx
+++ b/uvloop/handles/pipe.pyx
@@ -12,6 +12,7 @@ cdef __pipe_init_uv_handle(UVStream handle, Loop loop):
     err = uv.uv_pipe_init(handle._loop.uvloop,
                           <uv.uv_pipe_t*>handle._handle,
                           0)
+    # UV_HANDLE_READABLE allows calling uv_read_start() on this pipe
+    # even if it is O_WRONLY, see also #317, libuv/libuv#2058
+    handle._handle.flags |= 0x00004000
     if err < 0:
         handle._abort_init()
         raise convert_error(err)
diff --git a/uvloop/includes/uv.pxd b/uvloop/includes/uv.pxd
index 5f034b3..9efdab1 100644
--- a/uvloop/includes/uv.pxd
+++ b/uvloop/includes/uv.pxd
@@ -82,6 +82,7 @@ cdef extern from "uv.h" nogil:
     ctypedef struct uv_handle_t:
         void* data
         uv_loop_t* loop
+        unsigned int flags
         # ...

     ctypedef struct uv_idle_t:

But UV_HANDLE_READABLE is an internal flag in uv-common.h, so using the magic number here is a bit dirty - @1st1 shall I create a PR for this patch in uvloop with proper tests, or do you prefer to wait for a new version of libuv?

fantix added a commit to fantix/uvloop that referenced this issue May 9, 2020
* in order to detect peer close on O_WRONLY pipe_t
* refs libuv/libuv#2058
* refs MagicStack#317
fantix added a commit to fantix/uvloop that referenced this issue May 9, 2020
* in order to detect peer close on O_WRONLY pipe_t
* partially reverted d8fe153
* refs libuv/libuv#2058
* refs MagicStack#317
* fixes MagicStack#311, fixes MagicStack#312
fantix added a commit to fantix/uvloop that referenced this issue May 13, 2020
* in order to detect peer close on O_WRONLY pipe_t
* partially reverted d8fe153
* refs libuv/libuv#2058
* refs MagicStack#317
* fixes MagicStack#311, fixes MagicStack#312
fantix added a commit that referenced this issue May 14, 2020
* in order to detect peer close on O_WRONLY pipe_t
* partially reverted d8fe153
* refs libuv/libuv#2058
* refs #317
* fixes #311, fixes #312
@mikepurvis
Copy link

mikepurvis commented Sep 10, 2020

I also hit this error— I have a fairly generic asyncio application which downloads a bunch of tarballs in parallel with httpx.stream and pipes the content into tar subprocesses which are unpacking them onto the filesystem:

        async with self.stream_repo_tarball() as tarball_stream:
            tar_proc = await asyncio.create_subprocess_exec('tar', '--extract', '--verbose',
                    '--gzip', '--strip-components=1', cwd=path, stdin=asyncio.subprocess.PIPE)

            async for chunk in tarball_stream.aiter_bytes():
                tar_proc.stdin.write(chunk)
                await tar_proc.stdin.drain()
            tar_proc.stdin.close()
            await tar_proc.wait()

This works under asyncio, but uvloop crashes pretty reliably with:

python3: src/unix/core.c:932: uv__io_stop: Assertion `loop->watchers[w->fd] == w' failed.

If it's relevant, this is in the context of a Sanic webserver, so I believe there is a second worker thread running the loop. For now I am disabling uvloop and then it is fine again:

asyncio.set_event_loop_policy(asyncio.DefaultEventLoopPolicy())

@1st1
Copy link
Member

1st1 commented Sep 10, 2020

@fantix have we fixed this one in master?

@enumag
Copy link

enumag commented Oct 19, 2020

just ran into this as well... what's the current status? is it fixed in master already? if so, when can we expect a release?

@dmoklaf
Copy link

dmoklaf commented Nov 16, 2020

Critical issue as well

@1st1
Copy link
Member

1st1 commented Nov 16, 2020

@fantix It seems that we now just need to bump libuv to resolve this?

@fantix
Copy link
Member

fantix commented Nov 25, 2020

@1st1 sry for the late reply! Let me try the latest libuv 1.39.0

@fantix
Copy link
Member

fantix commented Nov 26, 2020

#342 should've already fixed the crashing (using libuv internal flags as a workaround), while there isn't a proper fix from the upstream yet.

However, I think this issue revealed another pipe bug in uvloop (master): the pipe is closed before all data is transmitted. I've simplified @momocow 's script to make it work for also Python > 3.7:

import asyncio
import os
from asyncio import StreamReader, StreamReaderProtocol, StreamWriter, get_running_loop
from contextlib import asynccontextmanager


async def _open_reader(fd):
    rx = StreamReader()
    transport, _ = await get_running_loop().connect_read_pipe(
        lambda: StreamReaderProtocol(rx), os.fdopen(fd)
    )
    return transport, rx


async def _open_writer(fd):
    rx = StreamReader()
    transport, proto = await get_running_loop().connect_write_pipe(
        lambda: StreamReaderProtocol(rx), os.fdopen(fd, "w")
    )
    tx = StreamWriter(transport, proto, rx, get_running_loop())
    return transport, tx


@asynccontextmanager
async def open_stream(fd, mode):
    transport, stream = (
        await _open_reader(fd) if mode == "r" else await _open_writer(fd)
    )

    try:
        yield stream
    finally:
        try:
            transport.close()
        except OSError:
            # The transport/protocol sometimes closes the fd before this is reached.
            pass

        # Allow event loop callbacks to run and handle closed transport.
        await asyncio.sleep(0)


async def child_task(fd, message, repeat):
    async with open_stream(fd, "w") as tx:
        for i in range(repeat):
            tx.write(message)
            await tx.drain()
        tx.write_eof()


async def parent_task(rfd, message, repeat):
    count = 0
    async with open_stream(rfd, "r") as rx:
        while True:
            msg = await rx.read(1)
            if not msg:
                break
            count += 1
            assert msg == message
    assert count == repeat, f"{count} != {repeat}"


def main():
    repeat = int(os.getenv("R", "1"))
    message = os.getenv("M", "a").encode()
    rfd, tfd = os.pipe()
    pid = os.fork()
    if os.getenv("U", ""):
        import uvloop

        uvloop.install()

    if pid:
        os.close(rfd)
        asyncio.run(child_task(tfd, message, repeat))
    else:
        os.close(tfd)
        asyncio.run(parent_task(rfd, message, repeat))


if __name__ == "__main__":
    main()

And it fails like:

% U=1 R=9217 python uv.py
Traceback (most recent call last):
  File "uv.py", line 82, in <module>
    main()
  File "uv.py", line 78, in main
    asyncio.run(parent_task(rfd, message, repeat))
  File "asyncio/runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "uvloop/loop.pyx", line 1477, in uvloop.loop.Loop.run_until_complete
    return future.result()
  File "uv.py", line 60, in parent_task
    assert count == repeat, f"{count} != {repeat}"
AssertionError: 9216 != 9217

It is reproducible for me (macOS 11 on Intel CPU) on Python 3.7, 3.8 and 3.9, with libuv 1.33.1 or 1.39.0. I'll investigate a bit first.

@fantix
Copy link
Member

fantix commented Nov 26, 2020

Ah okay, this is not a uvloop bug, but rather a difference to asyncio: you must await tx.wait_closed() in order to wait for all the data to be transmitted, while in asyncio it may still succeed even if you don't await on anything (this is for sure not recommended). Therefore, the following child_task should work for both asyncio and uvloop (master):

async def child_task(fd, message, repeat):
    _, tx = await _open_writer(fd)
    for i in range(repeat):
        tx.write(message)
        await tx.drain()
    tx.write_eof()
    tx.close()
    await tx.wait_closed()

I think we can now call this issue (#317) fixed.

@1st1
Copy link
Member

1st1 commented Dec 1, 2020

Ah okay, this is not a uvloop bug, but rather a difference to asyncio: you must await tx.wait_closed() in order to wait for all the data to be transmitted,

Maybe we can throw our error somehow instead of letting libuv to crash with an asserion error?

@fantix
Copy link
Member

fantix commented Dec 1, 2020

Oh, we've fixed the crash, the missing await tx.wait_closed() is only causing data not fully received.

@fantix
Copy link
Member

fantix commented Dec 1, 2020

I've also created an upstream PR so that we don't have to hack in the private constant in uvloop.

@PJovy
Copy link

PJovy commented Apr 21, 2021

So is this bug already fixed now, and if we use the latest version 0.15.2, does it means we won't hit this issue again? Thx.
@fantix

@fantix
Copy link
Member

fantix commented Apr 21, 2021

@PJovy right, the temporary fix is included in 0.15. An ideal fix would depend on something from upstream, but we should be fine without it.

@fantix
Copy link
Member

fantix commented Sep 9, 2022

We'll probably live with this temporary fix for now, as the upstream doesn't have a decision yet. I'm closing this issue to avoid confusion.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

7 participants