Skip to content

Commit

Permalink
forcely add UV_HANDLE_READABLE on pipe_t
Browse files Browse the repository at this point in the history
* in order to detect peer close on O_WRONLY pipe_t
* refs libuv/libuv#2058
* refs MagicStack#317
  • Loading branch information
fantix committed May 9, 2020
1 parent 9e017e6 commit 5ee11cd
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 67 deletions.
30 changes: 30 additions & 0 deletions tests/test_pipes.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ def connection_lost(self, exc):

class MyWritePipeProto(asyncio.BaseProtocol):
done = None
paused = False

def __init__(self, loop=None):
self.state = 'INITIAL'
Expand All @@ -61,6 +62,12 @@ def connection_lost(self, exc):
if self.done:
self.done.set_result(None)

def pause_writing(self):
self.paused = True

def resume_writing(self):
self.paused = False


class _BasePipeTest:
def test_read_pipe(self):
Expand Down Expand Up @@ -241,6 +248,29 @@ def reader(data):
self.loop.run_until_complete(proto.done)
self.assertEqual('CLOSED', proto.state)

def test_write_buffer_full(self):
rpipe, wpipe = os.pipe()
pipeobj = io.open(wpipe, 'wb', 1024)

proto = MyWritePipeProto(loop=self.loop)
connect = self.loop.connect_write_pipe(lambda: proto, pipeobj)
transport, p = self.loop.run_until_complete(connect)
self.assertIs(p, proto)
self.assertIs(transport, proto.transport)
self.assertEqual('CONNECTED', proto.state)

for i in range(32):
transport.write(b'x' * 32768)
if proto.paused:
transport.write(b'x' * 32768)
break
else:
self.fail("Didn't reach a full buffer")

os.close(rpipe)
self.loop.run_until_complete(asyncio.wait_for(proto.done, 1))
self.assertEqual('CLOSED', proto.state)


class Test_UV_Pipes(_BasePipeTest, tb.UVTestCase):
pass
Expand Down
4 changes: 0 additions & 4 deletions uvloop/handles/pipe.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,6 @@ cdef class ReadUnixTransport(UVStream):

cdef class WriteUnixTransport(UVStream):

cdef:
uv.uv_poll_t disconnect_listener
bint disconnect_listener_inited

@staticmethod
cdef WriteUnixTransport new(Loop loop, object protocol, Server server,
object waiter)
66 changes: 3 additions & 63 deletions uvloop/handles/pipe.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ cdef __pipe_init_uv_handle(UVStream handle, Loop loop):
err = uv.uv_pipe_init(handle._loop.uvloop,
<uv.uv_pipe_t*>handle._handle,
0)
# UV_HANDLE_READABLE allows calling uv_read_start() on this pipe
# even if it is O_WRONLY, see also #317, libuv/libuv#2058
handle._handle.flags |= 0x00004000 # UV_HANDLE_READABLE
if err < 0:
handle._abort_init()
raise convert_error(err)
Expand Down Expand Up @@ -147,10 +150,6 @@ cdef class ReadUnixTransport(UVStream):
@cython.no_gc_clear
cdef class WriteUnixTransport(UVStream):

def __cinit__(self):
self.disconnect_listener_inited = False
self.disconnect_listener.data = NULL

@staticmethod
cdef WriteUnixTransport new(Loop loop, object protocol, Server server,
object waiter):
Expand All @@ -167,46 +166,6 @@ cdef class WriteUnixTransport(UVStream):
__pipe_init_uv_handle(<UVStream>handle, loop)
return handle

cdef _start_reading(self):
# A custom implementation for monitoring for EOF:
# libuv since v1.23.1 prohibits using uv_read_start on
# write-only FDs, so we use a throw-away uv_poll_t handle
# for that purpose, as suggested in
# https://github.com/libuv/libuv/issues/2058.

cdef int err

if not self.disconnect_listener_inited:
err = uv.uv_poll_init(self._loop.uvloop,
&self.disconnect_listener,
self._fileno())
if err < 0:
raise convert_error(err)
self.disconnect_listener.data = <void*>self
self.disconnect_listener_inited = True

err = uv.uv_poll_start(&self.disconnect_listener,
uv.UV_READABLE | uv.UV_DISCONNECT,
__on_write_pipe_poll_event)
if err < 0:
raise convert_error(err)

cdef _stop_reading(self):
cdef int err
if not self.disconnect_listener_inited:
return
err = uv.uv_poll_stop(&self.disconnect_listener)
if err < 0:
raise convert_error(err)

cdef _close(self):
if self.disconnect_listener_inited:
self.disconnect_listener.data = NULL
uv.uv_close(<uv.uv_handle_t *>(&self.disconnect_listener), NULL)
self.disconnect_listener_inited = False

UVStream._close(self)

cdef _new_socket(self):
return __pipe_get_socket(<UVSocketHandle>self)

Expand All @@ -220,25 +179,6 @@ cdef class WriteUnixTransport(UVStream):
raise NotImplementedError


cdef void __on_write_pipe_poll_event(uv.uv_poll_t* handle,
int status, int events) with gil:
cdef WriteUnixTransport tr

if handle.data is NULL:
return

tr = <WriteUnixTransport>handle.data
if tr._closed:
return

if events & uv.UV_DISCONNECT:
try:
tr._stop_reading()
tr._on_eof()
except BaseException as ex:
tr._fatal_error(ex, False)


cdef class _PipeConnectRequest(UVRequest):
cdef:
UnixTransport transport
Expand Down
1 change: 1 addition & 0 deletions uvloop/includes/uv.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ cdef extern from "uv.h" nogil:
ctypedef struct uv_handle_t:
void* data
uv_loop_t* loop
unsigned int flags
# ...

ctypedef struct uv_idle_t:
Expand Down

0 comments on commit 5ee11cd

Please sign in to comment.