From 6f4de81b82d211e65a2c45ed2b0e99fdd47bf55f Mon Sep 17 00:00:00 2001 From: Itamar Oren Date: Wed, 3 Jul 2024 08:51:21 -0700 Subject: [PATCH] Backport gh-107803: Double linked list implementation for asyncio tasks Summary: upstream issue: https://github.com/python/cpython/issues/107803 backported commits: - https://github.com/python/cpython/commit/4717aaa1a72d1964f1531a7c613f37ce3d9056d9 - https://github.com/python/cpython/commit/82235449b85165add62c1b200299456a50a1d097 Differential Revision: D59280319 fbshipit-source-id: 9fb14b7f5b6662ff5093ed27c56841b8de8c5a2c --- .../pycore_global_objects_fini_generated.h | 1 + Include/internal/pycore_global_strings.h | 1 + Include/internal/pycore_object.h | 4 +- .../internal/pycore_runtime_init_generated.h | 1 + .../internal/pycore_unicodeobject_generated.h | 4 + Lib/asyncio/tasks.py | 5 +- Lib/test/test_asyncio/test_tasks.py | 34 +- Modules/_asynciomodule.c | 369 ++++++++++++++---- Modules/clinic/_asynciomodule.c.h | 62 ++- 9 files changed, 382 insertions(+), 99 deletions(-) diff --git a/Include/internal/pycore_global_objects_fini_generated.h b/Include/internal/pycore_global_objects_fini_generated.h index 16cb4793ad1..7ebe8e7665b 100644 --- a/Include/internal/pycore_global_objects_fini_generated.h +++ b/Include/internal/pycore_global_objects_fini_generated.h @@ -898,6 +898,7 @@ _PyStaticObjects_CheckRefcnt(PyInterpreterState *interp) { _PyStaticObject_CheckRefcnt((PyObject *)&_Py_ID(displayhook)); _PyStaticObject_CheckRefcnt((PyObject *)&_Py_ID(dklen)); _PyStaticObject_CheckRefcnt((PyObject *)&_Py_ID(doc)); + _PyStaticObject_CheckRefcnt((PyObject *)&_Py_ID(done)); _PyStaticObject_CheckRefcnt((PyObject *)&_Py_ID(dont_inherit)); _PyStaticObject_CheckRefcnt((PyObject *)&_Py_ID(dst)); _PyStaticObject_CheckRefcnt((PyObject *)&_Py_ID(dst_dir_fd)); diff --git a/Include/internal/pycore_global_strings.h b/Include/internal/pycore_global_strings.h index 1e768532971..260b0e387a3 100644 --- a/Include/internal/pycore_global_strings.h +++ b/Include/internal/pycore_global_strings.h @@ -387,6 +387,7 @@ struct _Py_global_strings { STRUCT_FOR_ID(displayhook) STRUCT_FOR_ID(dklen) STRUCT_FOR_ID(doc) + STRUCT_FOR_ID(done) STRUCT_FOR_ID(dont_inherit) STRUCT_FOR_ID(dst) STRUCT_FOR_ID(dst_dir_fd) diff --git a/Include/internal/pycore_object.h b/Include/internal/pycore_object.h index d1e2773a247..6f10f10d173 100644 --- a/Include/internal/pycore_object.h +++ b/Include/internal/pycore_object.h @@ -126,8 +126,8 @@ static inline void _Py_RefcntAdd(PyObject* op, Py_ssize_t n) } #define _Py_RefcntAdd(op, n) _Py_RefcntAdd(_PyObject_CAST(op), n) -extern void _Py_SetImmortal(PyObject *op); -extern void _Py_SetImmortalUntracked(PyObject *op); +PyAPI_FUNC(void) _Py_SetImmortal(PyObject *op); +PyAPI_FUNC(void) _Py_SetImmortalUntracked(PyObject *op); // Makes an immortal object mortal again with the specified refcnt. Should only // be used during runtime finalization. diff --git a/Include/internal/pycore_runtime_init_generated.h b/Include/internal/pycore_runtime_init_generated.h index f4d0ee4122e..650fe7ff317 100644 --- a/Include/internal/pycore_runtime_init_generated.h +++ b/Include/internal/pycore_runtime_init_generated.h @@ -896,6 +896,7 @@ extern "C" { INIT_ID(displayhook), \ INIT_ID(dklen), \ INIT_ID(doc), \ + INIT_ID(done), \ INIT_ID(dont_inherit), \ INIT_ID(dst), \ INIT_ID(dst_dir_fd), \ diff --git a/Include/internal/pycore_unicodeobject_generated.h b/Include/internal/pycore_unicodeobject_generated.h index e1bc9cf1c7b..fa09dd855e0 100644 --- a/Include/internal/pycore_unicodeobject_generated.h +++ b/Include/internal/pycore_unicodeobject_generated.h @@ -1352,6 +1352,10 @@ _PyUnicode_InitStaticStrings(PyInterpreterState *interp) { _PyUnicode_InternStatic(interp, &string); assert(_PyUnicode_CheckConsistency(string, 1)); assert(PyUnicode_GET_LENGTH(string) != 1); + string = &_Py_ID(done); + _PyUnicode_InternStatic(interp, &string); + assert(_PyUnicode_CheckConsistency(string, 1)); + assert(PyUnicode_GET_LENGTH(string) != 1); string = &_Py_ID(dont_inherit); _PyUnicode_InternStatic(interp, &string); assert(_PyUnicode_CheckConsistency(string, 1)); diff --git a/Lib/asyncio/tasks.py b/Lib/asyncio/tasks.py index dadcb5b5f36..cd869931e01 100644 --- a/Lib/asyncio/tasks.py +++ b/Lib/asyncio/tasks.py @@ -1097,14 +1097,14 @@ def _unregister_eager_task(task): _py_enter_task = _enter_task _py_leave_task = _leave_task _py_swap_current_task = _swap_current_task - +_py_all_tasks = all_tasks try: from _asyncio import (_register_task, _register_eager_task, _unregister_task, _unregister_eager_task, _enter_task, _leave_task, _swap_current_task, _scheduled_tasks, _eager_tasks, _current_tasks, - current_task) + current_task, all_tasks) except ImportError: pass else: @@ -1116,3 +1116,4 @@ def _unregister_eager_task(task): _c_enter_task = _enter_task _c_leave_task = _leave_task _c_swap_current_task = _swap_current_task + _c_all_tasks = all_tasks diff --git a/Lib/test/test_asyncio/test_tasks.py b/Lib/test/test_asyncio/test_tasks.py index 5b09c81faef..da72765413d 100644 --- a/Lib/test/test_asyncio/test_tasks.py +++ b/Lib/test/test_asyncio/test_tasks.py @@ -86,6 +86,7 @@ class BaseTaskTests: Task = None Future = None + all_tasks = None def new_task(self, loop, coro, name='TestTask', context=None): return self.__class__.Task(coro, loop=loop, name=name, context=context) @@ -2267,7 +2268,7 @@ async def kill_me(loop): coro = kill_me(self.loop) task = asyncio.ensure_future(coro, loop=self.loop) - self.assertEqual(asyncio.all_tasks(loop=self.loop), {task}) + self.assertEqual(self.all_tasks(loop=self.loop), {task}) asyncio.set_event_loop(None) @@ -2282,7 +2283,7 @@ async def kill_me(loop): # no more reference to kill_me() task: the task is destroyed by the GC support.gc_collect() - self.assertEqual(asyncio.all_tasks(loop=self.loop), set()) + self.assertEqual(self.all_tasks(loop=self.loop), set()) mock_handler.assert_called_with(self.loop, { 'message': 'Task was destroyed but it is pending!', @@ -2431,7 +2432,7 @@ async def coro(): message = m_log.error.call_args[0][0] self.assertIn('Task was destroyed but it is pending', message) - self.assertEqual(asyncio.all_tasks(self.loop), set()) + self.assertEqual(self.all_tasks(self.loop), set()) def test_create_task_with_noncoroutine(self): with self.assertRaisesRegex(TypeError, @@ -2731,6 +2732,7 @@ async def func(): # Add patched Task & Future back to the test case cls.Task = Task cls.Future = Future + cls.all_tasks = tasks.all_tasks # Add an extra unit-test cls.test_subclasses_ctask_cfuture = test_subclasses_ctask_cfuture @@ -2804,6 +2806,7 @@ class CTask_CFuture_Tests(BaseTaskTests, SetMethodsTest, Task = getattr(tasks, '_CTask', None) Future = getattr(futures, '_CFuture', None) + all_tasks = getattr(tasks, '_c_all_tasks', None) @support.refcount_test def test_refleaks_in_task___init__(self): @@ -2835,6 +2838,7 @@ class CTask_CFuture_SubclassTests(BaseTaskTests, test_utils.TestCase): Task = getattr(tasks, '_CTask', None) Future = getattr(futures, '_CFuture', None) + all_tasks = getattr(tasks, '_c_all_tasks', None) @unittest.skipUnless(hasattr(tasks, '_CTask'), @@ -2844,6 +2848,7 @@ class CTaskSubclass_PyFuture_Tests(BaseTaskTests, test_utils.TestCase): Task = getattr(tasks, '_CTask', None) Future = futures._PyFuture + all_tasks = getattr(tasks, '_c_all_tasks', None) @unittest.skipUnless(hasattr(futures, '_CFuture'), @@ -2853,6 +2858,7 @@ class PyTask_CFutureSubclass_Tests(BaseTaskTests, test_utils.TestCase): Future = getattr(futures, '_CFuture', None) Task = tasks._PyTask + all_tasks = tasks._py_all_tasks @unittest.skipUnless(hasattr(tasks, '_CTask'), @@ -2861,6 +2867,7 @@ class CTask_PyFuture_Tests(BaseTaskTests, test_utils.TestCase): Task = getattr(tasks, '_CTask', None) Future = futures._PyFuture + all_tasks = getattr(tasks, '_c_all_tasks', None) @unittest.skipUnless(hasattr(futures, '_CFuture'), @@ -2869,6 +2876,7 @@ class PyTask_CFuture_Tests(BaseTaskTests, test_utils.TestCase): Task = tasks._PyTask Future = getattr(futures, '_CFuture', None) + all_tasks = staticmethod(tasks._py_all_tasks) class PyTask_PyFuture_Tests(BaseTaskTests, SetMethodsTest, @@ -2876,6 +2884,7 @@ class PyTask_PyFuture_Tests(BaseTaskTests, SetMethodsTest, Task = tasks._PyTask Future = futures._PyFuture + all_tasks = staticmethod(tasks._py_all_tasks) @add_subclass_tests @@ -2915,6 +2924,7 @@ class BaseTaskIntrospectionTests: _unregister_task = None _enter_task = None _leave_task = None + all_tasks = None def test__register_task_1(self): class TaskLike: @@ -2928,9 +2938,9 @@ def done(self): task = TaskLike() loop = mock.Mock() - self.assertEqual(asyncio.all_tasks(loop), set()) + self.assertEqual(self.all_tasks(loop), set()) self._register_task(task) - self.assertEqual(asyncio.all_tasks(loop), {task}) + self.assertEqual(self.all_tasks(loop), {task}) self._unregister_task(task) def test__register_task_2(self): @@ -2944,9 +2954,9 @@ def done(self): task = TaskLike() loop = mock.Mock() - self.assertEqual(asyncio.all_tasks(loop), set()) + self.assertEqual(self.all_tasks(loop), set()) self._register_task(task) - self.assertEqual(asyncio.all_tasks(loop), {task}) + self.assertEqual(self.all_tasks(loop), {task}) self._unregister_task(task) def test__register_task_3(self): @@ -2960,9 +2970,9 @@ def done(self): task = TaskLike() loop = mock.Mock() - self.assertEqual(asyncio.all_tasks(loop), set()) + self.assertEqual(self.all_tasks(loop), set()) self._register_task(task) - self.assertEqual(asyncio.all_tasks(loop), set()) + self.assertEqual(self.all_tasks(loop), set()) self._unregister_task(task) def test__enter_task(self): @@ -3013,13 +3023,13 @@ def test__unregister_task(self): task.get_loop = lambda: loop self._register_task(task) self._unregister_task(task) - self.assertEqual(asyncio.all_tasks(loop), set()) + self.assertEqual(self.all_tasks(loop), set()) def test__unregister_task_not_registered(self): task = mock.Mock() loop = mock.Mock() self._unregister_task(task) - self.assertEqual(asyncio.all_tasks(loop), set()) + self.assertEqual(self.all_tasks(loop), set()) class PyIntrospectionTests(test_utils.TestCase, BaseTaskIntrospectionTests): @@ -3027,6 +3037,7 @@ class PyIntrospectionTests(test_utils.TestCase, BaseTaskIntrospectionTests): _unregister_task = staticmethod(tasks._py_unregister_task) _enter_task = staticmethod(tasks._py_enter_task) _leave_task = staticmethod(tasks._py_leave_task) + all_tasks = staticmethod(tasks._py_all_tasks) @unittest.skipUnless(hasattr(tasks, '_c_register_task'), @@ -3037,6 +3048,7 @@ class CIntrospectionTests(test_utils.TestCase, BaseTaskIntrospectionTests): _unregister_task = staticmethod(tasks._c_unregister_task) _enter_task = staticmethod(tasks._c_enter_task) _leave_task = staticmethod(tasks._c_leave_task) + all_tasks = staticmethod(tasks._c_all_tasks) else: _register_task = _unregister_task = _enter_task = _leave_task = None diff --git a/Modules/_asynciomodule.c b/Modules/_asynciomodule.c index a26714f9755..87ad236cdbb 100644 --- a/Modules/_asynciomodule.c +++ b/Modules/_asynciomodule.c @@ -6,6 +6,7 @@ #include "pycore_dict.h" // _PyDict_GetItem_KnownHash() #include "pycore_modsupport.h" // _PyArg_CheckPositional() #include "pycore_moduleobject.h" // _PyModule_GetState() +#include "pycore_object.h" // _Py_SetImmortalUntracked #include "pycore_pyerrors.h" // _PyErr_ClearExcState() #include "pycore_pylifecycle.h" // _Py_IsInterpreterFinalizing() #include "pycore_pystate.h" // _PyThreadState_GET() @@ -19,6 +20,60 @@ module _asyncio [clinic start generated code]*/ /*[clinic end generated code: output=da39a3ee5e6b4b0d input=8fd17862aa989c69]*/ +typedef enum { + STATE_PENDING, + STATE_CANCELLED, + STATE_FINISHED +} fut_state; + +#define FutureObj_HEAD(prefix) \ + PyObject_HEAD \ + PyObject *prefix##_loop; \ + PyObject *prefix##_callback0; \ + PyObject *prefix##_context0; \ + PyObject *prefix##_callbacks; \ + PyObject *prefix##_exception; \ + PyObject *prefix##_exception_tb; \ + PyObject *prefix##_result; \ + PyObject *prefix##_source_tb; \ + PyObject *prefix##_cancel_msg; \ + PyObject *prefix##_cancelled_exc; \ + fut_state prefix##_state; \ + /* These bitfields need to be at the end of the struct + so that these and bitfields from TaskObj are contiguous. + */ \ + unsigned prefix##_log_tb: 1; \ + unsigned prefix##_blocking: 1; + +typedef struct { + FutureObj_HEAD(fut) +} FutureObj; + +typedef struct TaskObj { + FutureObj_HEAD(task) + unsigned task_must_cancel: 1; + unsigned task_log_destroy_pending: 1; + int task_num_cancels_requested; + PyObject *task_fut_waiter; + PyObject *task_coro; + PyObject *task_name; + PyObject *task_context; + struct TaskObj *next; + struct TaskObj *prev; +} TaskObj; + +typedef struct { + PyObject_HEAD + TaskObj *sw_task; + PyObject *sw_arg; +} TaskStepMethWrapper; + + +#define Future_CheckExact(state, obj) Py_IS_TYPE(obj, state->FutureType) +#define Task_CheckExact(state, obj) Py_IS_TYPE(obj, state->TaskType) + +#define Future_Check(state, obj) PyObject_TypeCheck(obj, state->FutureType) +#define Task_Check(state, obj) PyObject_TypeCheck(obj, state->TaskType) #define FI_FREELIST_MAXLEN 255 @@ -38,8 +93,9 @@ typedef struct { all running event loops. {EventLoop: Task} */ PyObject *current_tasks; - /* WeakSet containing all tasks scheduled to run on event loops. */ - PyObject *scheduled_tasks; + /* WeakSet containing scheduled 3rd party tasks which don't + inherit from native asyncio.Task */ + PyObject *non_asyncio_tasks; /* Set containing all eagerly executing tasks. */ PyObject *eager_tasks; @@ -76,6 +132,51 @@ typedef struct { futureiterobject *fi_freelist; Py_ssize_t fi_freelist_len; + + /* Linked-list of all tasks which are instances of asyncio.Task or subclasses + of it. Third party tasks implementations which don't inherit from + asyncio.Task are tracked separately using the 'non_asyncio_tasks' WeakSet. + `tail` is used as a sentinel to mark the end of the linked-list. It avoids one + branch in checking for empty list when adding a new task, the list is + initialized with `head` pointing to `tail` to mark an empty list. + + Invariants: + * When the list is empty: + - asyncio_tasks.head == &asyncio_tasks.tail + - asyncio_tasks.head->prev == NULL + - asyncio_tasks.head->next == NULL + + * After adding the first task 'task1': + - asyncio_tasks.head == task1 + - task1->next == &asyncio_tasks.tail + - task1->prev == NULL + - asyncio_tasks.tail.prev == task1 + + * After adding a second task 'task2': + - asyncio_tasks.head == task2 + - task2->next == task1 + - task2->prev == NULL + - task1->prev == task2 + - asyncio_tasks.tail.prev == task1 + + * After removing task 'task1': + - asyncio_tasks.head == task2 + - task2->next == &asyncio_tasks.tail + - task2->prev == NULL + - asyncio_tasks.tail.prev == task2 + + * After removing task 'task2', the list is empty: + - asyncio_tasks.head == &asyncio_tasks.tail + - asyncio_tasks.head->prev == NULL + - asyncio_tasks.tail.prev == NULL + - asyncio_tasks.tail.next == NULL + */ + + struct { + TaskObj tail; + TaskObj *head; + } asyncio_tasks; + } asyncio_state; static inline asyncio_state * @@ -105,59 +206,6 @@ get_asyncio_state_by_def(PyObject *self) return get_asyncio_state(mod); } -typedef enum { - STATE_PENDING, - STATE_CANCELLED, - STATE_FINISHED -} fut_state; - -#define FutureObj_HEAD(prefix) \ - PyObject_HEAD \ - PyObject *prefix##_loop; \ - PyObject *prefix##_callback0; \ - PyObject *prefix##_context0; \ - PyObject *prefix##_callbacks; \ - PyObject *prefix##_exception; \ - PyObject *prefix##_exception_tb; \ - PyObject *prefix##_result; \ - PyObject *prefix##_source_tb; \ - PyObject *prefix##_cancel_msg; \ - PyObject *prefix##_cancelled_exc; \ - fut_state prefix##_state; \ - /* These bitfields need to be at the end of the struct - so that these and bitfields from TaskObj are contiguous. - */ \ - unsigned prefix##_log_tb: 1; \ - unsigned prefix##_blocking: 1; - -typedef struct { - FutureObj_HEAD(fut) -} FutureObj; - -typedef struct { - FutureObj_HEAD(task) - unsigned task_must_cancel: 1; - unsigned task_log_destroy_pending: 1; - int task_num_cancels_requested; - PyObject *task_fut_waiter; - PyObject *task_coro; - PyObject *task_name; - PyObject *task_context; -} TaskObj; - -typedef struct { - PyObject_HEAD - TaskObj *sw_task; - PyObject *sw_arg; -} TaskStepMethWrapper; - - -#define Future_CheckExact(state, obj) Py_IS_TYPE(obj, state->FutureType) -#define Task_CheckExact(state, obj) Py_IS_TYPE(obj, state->TaskType) - -#define Future_Check(state, obj) PyObject_TypeCheck(obj, state->FutureType) -#define Task_Check(state, obj) PyObject_TypeCheck(obj, state->TaskType) - #include "clinic/_asynciomodule.c.h" @@ -1967,16 +2015,21 @@ static PyMethodDef TaskWakeupDef = { /* ----- Task introspection helpers */ -static int -register_task(asyncio_state *state, PyObject *task) +static void +register_task(asyncio_state *state, TaskObj *task) { - PyObject *res = PyObject_CallMethodOneArg(state->scheduled_tasks, - &_Py_ID(add), task); - if (res == NULL) { - return -1; + assert(Task_Check(state, task)); + assert(task != &state->asyncio_tasks.tail); + if (task->next != NULL) { + // already registered + return; } - Py_DECREF(res); - return 0; + assert(task->prev == NULL); + assert(state->asyncio_tasks.head != NULL); + + task->next = state->asyncio_tasks.head; + state->asyncio_tasks.head->prev = task; + state->asyncio_tasks.head = task; } static int @@ -1985,16 +2038,27 @@ register_eager_task(asyncio_state *state, PyObject *task) return PySet_Add(state->eager_tasks, task); } -static int -unregister_task(asyncio_state *state, PyObject *task) -{ - PyObject *res = PyObject_CallMethodOneArg(state->scheduled_tasks, - &_Py_ID(discard), task); - if (res == NULL) { - return -1; +static void +unregister_task(asyncio_state *state, TaskObj *task) +{ + assert(Task_Check(state, task)); + assert(task != &state->asyncio_tasks.tail); + if (task->next == NULL) { + // not registered + assert(task->prev == NULL); + assert(state->asyncio_tasks.head != task); + return; } - Py_DECREF(res); - return 0; + task->next->prev = task->prev; + if (task->prev == NULL) { + assert(state->asyncio_tasks.head == task); + state->asyncio_tasks.head = task->next; + } else { + task->prev->next = task->next; + } + task->next = NULL; + task->prev = NULL; + assert(state->asyncio_tasks.head != task); } static int @@ -2178,7 +2242,8 @@ _asyncio_Task___init___impl(TaskObj *self, PyObject *coro, PyObject *loop, if (task_call_step_soon(state, self, NULL)) { return -1; } - return register_task(state, (PyObject*)self); + register_task(state, self); + return 0; } static int @@ -2586,6 +2651,15 @@ _asyncio_Task_set_name(TaskObj *self, PyObject *value) static void TaskObj_finalize(TaskObj *task) { + asyncio_state *state = get_asyncio_state_by_def((PyObject *)task); + // Unregister the task from the linked list of tasks. + // Since task is a native task, we directly call the + // unregister_task function. Third party event loops + // should use the asyncio._unregister_task function. + // See https://docs.python.org/3/library/asyncio-extending.html#task-lifetime-support + + unregister_task(state, task); + PyObject *context; PyObject *message = NULL; PyObject *func; @@ -3197,9 +3271,7 @@ task_eager_start(asyncio_state *state, TaskObj *task) } if (task->task_state == STATE_PENDING) { - if (register_task(state, (PyObject *)task) == -1) { - retval = -1; - } + register_task(state, task); } else { // This seems to really help performance on pyperformance benchmarks Py_CLEAR(task->task_coro); @@ -3365,9 +3437,20 @@ _asyncio__register_task_impl(PyObject *module, PyObject *task) /*[clinic end generated code: output=8672dadd69a7d4e2 input=21075aaea14dfbad]*/ { asyncio_state *state = get_asyncio_state(module); - if (register_task(state, task) < 0) { + if (Task_Check(state, task)) { + // task is an asyncio.Task instance or subclass, use efficient + // linked-list implementation. + register_task(state, (TaskObj *)task); + Py_RETURN_NONE; + } + // As task does not inherit from asyncio.Task, fallback to less efficient + // weakset implementation. + PyObject *res = PyObject_CallMethodOneArg(state->non_asyncio_tasks, + &_Py_ID(add), task); + if (res == NULL) { return NULL; } + Py_DECREF(res); Py_RETURN_NONE; } @@ -3408,9 +3491,16 @@ _asyncio__unregister_task_impl(PyObject *module, PyObject *task) /*[clinic end generated code: output=6e5585706d568a46 input=28fb98c3975f7bdc]*/ { asyncio_state *state = get_asyncio_state(module); - if (unregister_task(state, task) < 0) { + if (Task_Check(state, task)) { + unregister_task(state, (TaskObj *)task); + Py_RETURN_NONE; + } + PyObject *res = PyObject_CallMethodOneArg(state->non_asyncio_tasks, + &_Py_ID(discard), task); + if (res == NULL) { return NULL; } + Py_DECREF(res); Py_RETURN_NONE; } @@ -3541,8 +3631,117 @@ _asyncio_current_task_impl(PyObject *module, PyObject *loop) } +static inline int +add_one_task(asyncio_state *state, PyObject *tasks, PyObject *task, PyObject *loop) +{ + PyObject *done = PyObject_CallMethodNoArgs(task, &_Py_ID(done)); + if (done == NULL) { + return -1; + } + if (Py_IsTrue(done)) { + return 0; + } + Py_DECREF(done); + PyObject *task_loop = get_future_loop(state, task); + if (task_loop == NULL) { + return -1; + } + if (task_loop == loop) { + if (PySet_Add(tasks, task) < 0) { + Py_DECREF(task_loop); + return -1; + } + } + Py_DECREF(task_loop); + return 0; +} + /*********************** Module **************************/ +/*[clinic input] +_asyncio.all_tasks + + loop: object = None + +Return a set of all tasks for the loop. + +[clinic start generated code]*/ + +static PyObject * +_asyncio_all_tasks_impl(PyObject *module, PyObject *loop) +/*[clinic end generated code: output=0e107cbb7f72aa7b input=43a1b423c2d95bfa]*/ +{ + + asyncio_state *state = get_asyncio_state(module); + PyObject *tasks = PySet_New(NULL); + if (tasks == NULL) { + return NULL; + } + if (loop == Py_None) { + loop = _asyncio_get_running_loop_impl(module); + if (loop == NULL) { + Py_DECREF(tasks); + return NULL; + } + } else { + Py_INCREF(loop); + } + // First add eager tasks to the set so that we don't miss + // any tasks which graduates from eager to non-eager + PyObject *eager_iter = PyObject_GetIter(state->eager_tasks); + if (eager_iter == NULL) { + Py_DECREF(tasks); + Py_DECREF(loop); + return NULL; + } + PyObject *item; + while ((item = PyIter_Next(eager_iter)) != NULL) { + if (add_one_task(state, tasks, item, loop) < 0) { + Py_DECREF(tasks); + Py_DECREF(loop); + Py_DECREF(item); + Py_DECREF(eager_iter); + return NULL; + } + Py_DECREF(item); + } + Py_DECREF(eager_iter); + TaskObj *head = state->asyncio_tasks.head; + Py_INCREF(head); + assert(head != NULL); + assert(head->prev == NULL); + TaskObj *tail = &state->asyncio_tasks.tail; + while (head != tail) + { + if (add_one_task(state, tasks, (PyObject *)head, loop) < 0) { + Py_DECREF(tasks); + Py_DECREF(loop); + Py_DECREF(head); + return NULL; + } + Py_INCREF(head->next); + Py_SETREF(head, head->next); + } + PyObject *scheduled_iter = PyObject_GetIter(state->non_asyncio_tasks); + if (scheduled_iter == NULL) { + Py_DECREF(tasks); + Py_DECREF(loop); + return NULL; + } + while ((item = PyIter_Next(scheduled_iter)) != NULL) { + if (add_one_task(state, tasks, item, loop) < 0) { + Py_DECREF(tasks); + Py_DECREF(loop); + Py_DECREF(item); + Py_DECREF(scheduled_iter); + return NULL; + } + Py_DECREF(item); + } + Py_DECREF(scheduled_iter); + Py_DECREF(loop); + return tasks; +} static void module_free_freelists(asyncio_state *state) @@ -3584,7 +3783,7 @@ module_traverse(PyObject *mod, visitproc visit, void *arg) Py_VISIT(state->asyncio_InvalidStateError); Py_VISIT(state->asyncio_CancelledError); - Py_VISIT(state->scheduled_tasks); + Py_VISIT(state->non_asyncio_tasks); Py_VISIT(state->eager_tasks); Py_VISIT(state->current_tasks); Py_VISIT(state->iscoroutine_typecache); @@ -3622,7 +3821,7 @@ module_clear(PyObject *mod) Py_CLEAR(state->asyncio_InvalidStateError); Py_CLEAR(state->asyncio_CancelledError); - Py_CLEAR(state->scheduled_tasks); + Py_CLEAR(state->non_asyncio_tasks); Py_CLEAR(state->eager_tasks); Py_CLEAR(state->current_tasks); Py_CLEAR(state->iscoroutine_typecache); @@ -3703,9 +3902,9 @@ module_init(asyncio_state *state) PyObject *weak_set; WITH_MOD("weakref") GET_MOD_ATTR(weak_set, "WeakSet"); - state->scheduled_tasks = PyObject_CallNoArgs(weak_set); + state->non_asyncio_tasks = PyObject_CallNoArgs(weak_set); Py_CLEAR(weak_set); - if (state->scheduled_tasks == NULL) { + if (state->non_asyncio_tasks == NULL) { goto fail; } @@ -3740,6 +3939,7 @@ static PyMethodDef asyncio_methods[] = { _ASYNCIO__ENTER_TASK_METHODDEF _ASYNCIO__LEAVE_TASK_METHODDEF _ASYNCIO__SWAP_CURRENT_TASK_METHODDEF + _ASYNCIO_ALL_TASKS_METHODDEF {NULL, NULL} }; @@ -3747,6 +3947,9 @@ static int module_exec(PyObject *mod) { asyncio_state *state = get_asyncio_state(mod); + Py_SET_TYPE(&state->asyncio_tasks.tail, state->TaskType); + _Py_SetImmortalUntracked((PyObject *)&state->asyncio_tasks.tail); + state->asyncio_tasks.head = &state->asyncio_tasks.tail; #define CREATE_TYPE(m, tp, spec, base) \ do { \ @@ -3776,7 +3979,7 @@ module_exec(PyObject *mod) return -1; } - if (PyModule_AddObjectRef(mod, "_scheduled_tasks", state->scheduled_tasks) < 0) { + if (PyModule_AddObjectRef(mod, "_scheduled_tasks", state->non_asyncio_tasks) < 0) { return -1; } diff --git a/Modules/clinic/_asynciomodule.c.h b/Modules/clinic/_asynciomodule.c.h index 6a9c8ff6d8f..d619a124cce 100644 --- a/Modules/clinic/_asynciomodule.c.h +++ b/Modules/clinic/_asynciomodule.c.h @@ -1487,4 +1487,64 @@ _asyncio_current_task(PyObject *module, PyObject *const *args, Py_ssize_t nargs, exit: return return_value; } -/*[clinic end generated code: output=b26155080c82c472 input=a9049054013a1b77]*/ + +PyDoc_STRVAR(_asyncio_all_tasks__doc__, +"all_tasks($module, /, loop=None)\n" +"--\n" +"\n" +"Return a set of all tasks for the loop."); + +#define _ASYNCIO_ALL_TASKS_METHODDEF \ + {"all_tasks", _PyCFunction_CAST(_asyncio_all_tasks), METH_FASTCALL|METH_KEYWORDS, _asyncio_all_tasks__doc__}, + +static PyObject * +_asyncio_all_tasks_impl(PyObject *module, PyObject *loop); + +static PyObject * +_asyncio_all_tasks(PyObject *module, PyObject *const *args, Py_ssize_t nargs, PyObject *kwnames) +{ + PyObject *return_value = NULL; + #if defined(Py_BUILD_CORE) && !defined(Py_BUILD_CORE_MODULE) + + #define NUM_KEYWORDS 1 + static struct { + PyGC_Head _this_is_not_used; + PyObject_VAR_HEAD + PyObject *ob_item[NUM_KEYWORDS]; + } _kwtuple = { + .ob_base = PyVarObject_HEAD_INIT(&PyTuple_Type, NUM_KEYWORDS) + .ob_item = { &_Py_ID(loop), }, + }; + #undef NUM_KEYWORDS + #define KWTUPLE (&_kwtuple.ob_base.ob_base) + + #else // !Py_BUILD_CORE + # define KWTUPLE NULL + #endif // !Py_BUILD_CORE + + static const char * const _keywords[] = {"loop", NULL}; + static _PyArg_Parser _parser = { + .keywords = _keywords, + .fname = "all_tasks", + .kwtuple = KWTUPLE, + }; + #undef KWTUPLE + PyObject *argsbuf[1]; + Py_ssize_t noptargs = nargs + (kwnames ? PyTuple_GET_SIZE(kwnames) : 0) - 0; + PyObject *loop = Py_None; + + args = _PyArg_UnpackKeywords(args, nargs, NULL, kwnames, &_parser, 0, 1, 0, argsbuf); + if (!args) { + goto exit; + } + if (!noptargs) { + goto skip_optional_pos; + } + loop = args[0]; +skip_optional_pos: + return_value = _asyncio_all_tasks_impl(module, loop); + +exit: + return return_value; +} +/*[clinic end generated code: output=ffe9b71bc65888b3 input=a9049054013a1b77]*/