Skip to content

Commit

Permalink
Fix incorrect main thread id value forking from a thread (#453)
Browse files Browse the repository at this point in the history
* Fix incorrect main thread id value in mp.Process
* Make MAIN_THREAD_ID a lazy value and add test

Co-authored-by: Александр Менщиков <menshchikov@zvonok.com>
Co-authored-by: Fantix King <fantix.king@gmail.com>
  • Loading branch information
3 people authored Aug 31, 2022
1 parent 73d7253 commit e7934c8
Show file tree
Hide file tree
Showing 6 changed files with 60 additions and 10 deletions.
38 changes: 37 additions & 1 deletion tests/test_signals.py
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ async def coro():
with self.assertRaisesRegex(TypeError, 'coroutines cannot be used'):
self.loop.add_signal_handler(signal.SIGHUP, coro)

def test_wakeup_fd_unchanged(self):
def test_signals_wakeup_fd_unchanged(self):
async def runner():
PROG = R"""\
import uvloop
Expand Down Expand Up @@ -349,6 +349,42 @@ async def f(): pass

self.loop.run_until_complete(runner())

def test_signals_fork_in_thread(self):
# Refs #452, when forked from a thread, the main-thread-only signal
# operations failed thread ID checks because we didn't update
# MAIN_THREAD_ID after fork. It's now a lazy value set when needed and
# cleared after fork.
PROG = R"""\
import asyncio
import multiprocessing
import signal
import sys
import threading
import uvloop
multiprocessing.set_start_method('fork')
def subprocess():
loop = """ + self.NEW_LOOP + """
loop.add_signal_handler(signal.SIGINT, lambda *a: None)
def run():
loop = """ + self.NEW_LOOP + """
loop.add_signal_handler(signal.SIGINT, lambda *a: None)
p = multiprocessing.Process(target=subprocess)
t = threading.Thread(target=p.start)
t.start()
t.join()
p.join()
sys.exit(p.exitcode)
run()
"""

subprocess.check_call([
sys.executable, b'-W', b'ignore', b'-c', PROG,
])


class Test_UV_Signals(_TestSignal, tb.UVTestCase):
NEW_LOOP = 'uvloop.new_event_loop()'
Expand Down
11 changes: 11 additions & 0 deletions uvloop/includes/fork_handler.h
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
volatile uint64_t MAIN_THREAD_ID = 0;
volatile int8_t MAIN_THREAD_ID_SET = 0;

typedef void (*OnForkHandler)();

Expand All @@ -9,6 +11,10 @@ Note: Fork handler needs to be in C (not cython) otherwise it would require
GIL to be present, but some forks can exec non-python processes.
*/
void handleAtFork(void) {
// Reset the MAIN_THREAD_ID on fork, because the main thread ID is not
// always the same after fork, especially when forked from within a thread.
MAIN_THREAD_ID_SET = 0;

if (__forkHandler != NULL) {
__forkHandler();
}
Expand All @@ -25,3 +31,8 @@ void resetForkHandler(void)
{
__forkHandler = NULL;
}

void setMainThreadID(uint64_t id) {
MAIN_THREAD_ID = id;
MAIN_THREAD_ID_SET = 1;
}
2 changes: 1 addition & 1 deletion uvloop/includes/stdlib.pxi
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,8 @@ cdef int ssl_SSL_ERROR_WANT_READ = ssl.SSL_ERROR_WANT_READ
cdef int ssl_SSL_ERROR_WANT_WRITE = ssl.SSL_ERROR_WANT_WRITE
cdef int ssl_SSL_ERROR_SYSCALL = ssl.SSL_ERROR_SYSCALL

cdef uint64_t MAIN_THREAD_ID = <uint64_t><int64_t>threading.main_thread().ident
cdef threading_Thread = threading.Thread
cdef threading_main_thread = threading.main_thread

cdef int subprocess_PIPE = subprocess.PIPE
cdef int subprocess_STDOUT = subprocess.STDOUT
Expand Down
5 changes: 5 additions & 0 deletions uvloop/includes/system.pxd
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from libc.stdint cimport int8_t, uint64_t

cdef extern from "arpa/inet.h" nogil:

int ntohl(int)
Expand Down Expand Up @@ -85,7 +87,10 @@ cdef extern from "includes/compat.h" nogil:

cdef extern from "includes/fork_handler.h":

uint64_t MAIN_THREAD_ID
int8_t MAIN_THREAD_ID_SET
ctypedef void (*OnForkHandler)()
void handleAtFork()
void setForkHandler(OnForkHandler handler)
void resetForkHandler()
void setMainThreadID(uint64_t id)
1 change: 0 additions & 1 deletion uvloop/loop.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ cdef class Loop:
bint _stopping

uint64_t _thread_id
bint _thread_is_main

object _task_factory
object _exception_handler
Expand Down
13 changes: 6 additions & 7 deletions uvloop/loop.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,6 @@ cdef class Loop:

self._closed = 0
self._debug = 0
self._thread_is_main = 0
self._thread_id = 0
self._running = 0
self._stopping = 0
Expand Down Expand Up @@ -216,7 +215,11 @@ cdef class Loop:
self._servers = set()

cdef inline _is_main_thread(self):
return MAIN_THREAD_ID == PyThread_get_thread_ident()
cdef uint64_t main_thread_id = system.MAIN_THREAD_ID
if system.MAIN_THREAD_ID_SET == 0:
main_thread_id = <uint64_t><int64_t>threading_main_thread().ident
system.setMainThreadID(main_thread_id)
return main_thread_id == PyThread_get_thread_ident()

def __init__(self):
self.set_debug((not sys_ignore_environment
Expand Down Expand Up @@ -520,7 +523,6 @@ cdef class Loop:
self._last_error = None

self._thread_id = PyThread_get_thread_ident()
self._thread_is_main = MAIN_THREAD_ID == self._thread_id
self._running = 1

self.handler_check__exec_writes.start()
Expand All @@ -541,7 +543,6 @@ cdef class Loop:

self._pause_signals()

self._thread_is_main = 0
self._thread_id = 0
self._running = 0
self._stopping = 0
Expand Down Expand Up @@ -3287,16 +3288,14 @@ cdef Loop __forking_loop = None


cdef void __get_fork_handler() nogil:
global __forking
global __forking_loop

with gil:
if (__forking and __forking_loop is not None and
__forking_loop.active_process_handler is not None):
__forking_loop.active_process_handler._after_fork()

cdef __install_atfork():
global __atfork_installed

if __atfork_installed:
return
__atfork_installed = 1
Expand Down

0 comments on commit e7934c8

Please sign in to comment.