From 20b5df90372ac97d817d2e3666773dd9561f057f Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Thu, 30 Apr 2020 15:19:17 -0700 Subject: [PATCH] task: fix LocalSet having a single shared task budget (#2462) ## Motivation Currently, an issue exists where a `LocalSet` has a single cooperative task budget that's shared across all futures spawned on the `LocalSet` _and_ by any future passed to `LocalSet::run_until` or `LocalSet::block_on`. Because these methods will poll the `run_until` future before polling spawned tasks, it is possible for that task to _always_ deterministically starve the entire `LocalSet` so that no local tasks can proceed. When the completion of that future _itself_ depends on other tasks on the `LocalSet`, this will then result in a deadlock, as in issue #2460. A detailed description of why this is the case, taken from [this comment][1]: `LocalSet` wraps each time a local task is run in `budget`: https://github.com/tokio-rs/tokio/blob/947045b9445f15fb9314ba0892efa2251076ae73/tokio/src/task/local.rs#L406 This is identical to what tokio's other schedulers do when running tasks, and in theory should give each task its own budget every time it's polled. _However_, `LocalSet` is different from other schedulers. Unlike the runtime schedulers, a `LocalSet` is itself a future that's run on another scheduler, in `block_on`. `block_on` _also_ sets a budget: https://github.com/tokio-rs/tokio/blob/947045b9445f15fb9314ba0892efa2251076ae73/tokio/src/runtime/basic_scheduler.rs#L131 The docs for `budget` state that: https://github.com/tokio-rs/tokio/blob/947045b9445f15fb9314ba0892efa2251076ae73/tokio/src/coop.rs#L73 This means that inside of a `LocalSet`, the calls to `budget` are no-ops. Instead, each future polled by the `LocalSet` is subtracting from a single global budget. `LocalSet`'s `RunUntil` future polls the provided future before polling any other tasks spawned on the local set: https://github.com/tokio-rs/tokio/blob/947045b9445f15fb9314ba0892efa2251076ae73/tokio/src/task/local.rs#L525-L535 In this case, the provided future is `JoinAll`. Unfortunately, every time a `JoinAll` is polled, it polls _every_ joined future that has not yet completed. When the number of futures in the `JoinAll` is >= 128, this means that the `JoinAll` immediately exhausts the task budget. This would, in theory, be a _good_ thing --- if the `JoinAll` had a huge number of `JoinHandle`s in it and none of them are ready, it would limit the time we spend polling those join handles. However, because the `LocalSet` _actually_ has a single shared task budget, this means polling the `JoinAll` _always_ exhausts the entire budget. There is now no budget remaining to poll any other tasks spawned on the `LocalSet`, and they are never able to complete. [1]: https://github.com/tokio-rs/tokio/issues/2460#issuecomment-621403122 ## Solution This branch solves this issue by resetting the task budget when polling a `LocalSet`. I've added a new function to `coop` for resetting the task budget to `UNCONSTRAINED` for the duration of a closure, and thus allowing the `budget` calls in `LocalSet` to _actually_ create a new budget for each spawned local task. Additionally, I've changed `LocalSet` to _also_ ensure that a separate task budget is applied to any future passed to `block_on`/`run_until`. Additionally, I've added a test reproducing the issue described in #2460. This test fails prior to this change, and passes after it. Fixes #2460 Signed-off-by: Eliza Weisman --- tokio/src/coop.rs | 49 +++++++++++++++++++---- tokio/src/macros/cfg.rs | 17 ++++++++ tokio/src/task/local.rs | 61 ++++++++++++++++------------ tokio/tests/task_local_set.rs | 75 +++++++++++++++++++++++++---------- 4 files changed, 147 insertions(+), 55 deletions(-) diff --git a/tokio/src/coop.rs b/tokio/src/coop.rs index 1d624591667..606ba3a7395 100644 --- a/tokio/src/coop.rs +++ b/tokio/src/coop.rs @@ -85,15 +85,11 @@ where return f(); } - struct Guard<'a>(&'a Cell); - impl<'a> Drop for Guard<'a> { - fn drop(&mut self) { - self.0.set(UNCONSTRAINED); - } - } - hits.set(BUDGET); - let _guard = Guard(hits); + let _guard = ResetGuard { + hits, + prev: UNCONSTRAINED, + }; f() }) } @@ -114,6 +110,32 @@ cfg_blocking_impl! { } } +cfg_rt_core! { + cfg_rt_util! { + /// Run the given closure with a new task budget, resetting the previous + /// budget when the closure finishes. + /// + /// This is intended for internal use by `LocalSet` and (potentially) other + /// similar schedulers which are themselves futures, and need a fresh budget + /// for each of their children. + #[inline(always)] + pub(crate) fn reset(f: F) -> R + where + F: FnOnce() -> R, + { + HITS.with(move |hits| { + let prev = hits.get(); + hits.set(UNCONSTRAINED); + let _guard = ResetGuard { + hits, + prev, + }; + f() + }) + } + } +} + /// Invoke `f` with a subset of the remaining budget. /// /// This is useful if you have sub-futures that you need to poll, but that you want to restrict @@ -289,6 +311,11 @@ pin_project_lite::pin_project! { } } +struct ResetGuard<'a> { + hits: &'a Cell, + prev: usize, +} + impl Future for CoopFuture { type Output = F::Output; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { @@ -327,6 +354,12 @@ cfg_sync! { impl CoopFutureExt for F where F: Future {} } +impl<'a> Drop for ResetGuard<'a> { + fn drop(&mut self) { + self.hits.set(self.prev); + } +} + #[cfg(all(test, not(loom)))] mod test { use super::*; diff --git a/tokio/src/macros/cfg.rs b/tokio/src/macros/cfg.rs index 0679aa73744..85f95cbd3d4 100644 --- a/tokio/src/macros/cfg.rs +++ b/tokio/src/macros/cfg.rs @@ -35,6 +35,23 @@ macro_rules! cfg_blocking_impl { } } +/// Enables blocking API internals +macro_rules! cfg_blocking_impl_or_task { + ($($item:item)*) => { + $( + #[cfg(any( + feature = "blocking", + feature = "fs", + feature = "dns", + feature = "io-std", + feature = "rt-threaded", + feature = "task", + ))] + $item + )* + } +} + /// Enables enter::block_on macro_rules! cfg_block_on { ($($item:item)*) => { diff --git a/tokio/src/task/local.rs b/tokio/src/task/local.rs index 9af50cee4e6..346fe437f45 100644 --- a/tokio/src/task/local.rs +++ b/tokio/src/task/local.rs @@ -454,20 +454,24 @@ impl Future for LocalSet { // Register the waker before starting to work self.context.shared.waker.register_by_ref(cx.waker()); - if self.with(|| self.tick()) { - // If `tick` returns true, we need to notify the local future again: - // there are still tasks remaining in the run queue. - cx.waker().wake_by_ref(); - Poll::Pending - } else if self.context.tasks.borrow().owned.is_empty() { - // If the scheduler has no remaining futures, we're done! - Poll::Ready(()) - } else { - // There are still futures in the local set, but we've polled all the - // futures in the run queue. Therefore, we can just return Pending - // since the remaining futures will be woken from somewhere else. - Poll::Pending - } + // Reset any previous task budget while polling tasks spawned on the + // `LocalSet`, ensuring that each has its own separate budget. + crate::coop::reset(|| { + if self.with(|| self.tick()) { + // If `tick` returns true, we need to notify the local future again: + // there are still tasks remaining in the run queue. + cx.waker().wake_by_ref(); + Poll::Pending + } else if self.context.tasks.borrow().owned.is_empty() { + // If the scheduler has no remaining futures, we're done! + Poll::Ready(()) + } else { + // There are still futures in the local set, but we've polled all the + // futures in the run queue. Therefore, we can just return Pending + // since the remaining futures will be woken from somewhere else. + Poll::Pending + } + }) } } @@ -521,18 +525,23 @@ impl Future for RunUntil<'_, T> { .register_by_ref(cx.waker()); let _no_blocking = crate::runtime::enter::disallow_blocking(); - - if let Poll::Ready(output) = me.future.poll(cx) { - return Poll::Ready(output); - } - - if me.local_set.tick() { - // If `tick` returns `true`, we need to notify the local future again: - // there are still tasks remaining in the run queue. - cx.waker().wake_by_ref(); - } - - Poll::Pending + // Reset any previous task budget so that the future passed to + // `run_until` and any tasks spawned on the `LocalSet` have their + // own budgets. + crate::coop::reset(|| { + let f = me.future; + if let Poll::Ready(output) = crate::coop::budget(|| f.poll(cx)) { + return Poll::Ready(output); + } + + if me.local_set.tick() { + // If `tick` returns `true`, we need to notify the local future again: + // there are still tasks remaining in the run queue. + cx.waker().wake_by_ref(); + } + + Poll::Pending + }) }) } } diff --git a/tokio/tests/task_local_set.rs b/tokio/tests/task_local_set.rs index 1a10fefa68e..38c7c939238 100644 --- a/tokio/tests/task_local_set.rs +++ b/tokio/tests/task_local_set.rs @@ -312,28 +312,17 @@ fn drop_cancels_tasks() { assert_eq!(1, Rc::strong_count(&rc1)); } -#[test] -fn drop_cancels_remote_tasks() { - // This test reproduces issue #1885. +/// Runs a test function in a separate thread, and panics if the test does not +/// complete within the specified timeout, or if the test function panics. +/// +/// This is intended for running tests whose failure mode is a hang or infinite +/// loop that cannot be detected otherwise. +fn with_timeout(timeout: Duration, f: impl FnOnce() + Send + 'static) { use std::sync::mpsc::RecvTimeoutError; let (done_tx, done_rx) = std::sync::mpsc::channel(); let thread = std::thread::spawn(move || { - let (tx, mut rx) = mpsc::channel::<()>(1024); - - let mut rt = rt(); - - let local = LocalSet::new(); - local.spawn_local(async move { while let Some(_) = rx.recv().await {} }); - local.block_on(&mut rt, async { - time::delay_for(Duration::from_millis(1)).await; - }); - - drop(tx); - - // This enters an infinite loop if the remote notified tasks are not - // properly cancelled. - drop(local); + f(); // Send a message on the channel so that the test thread can // determine if we have entered an infinite loop: @@ -349,10 +338,11 @@ fn drop_cancels_remote_tasks() { // // Note that it should definitely complete in under a minute, but just // in case CI is slow, we'll give it a long timeout. - match done_rx.recv_timeout(Duration::from_secs(60)) { + match done_rx.recv_timeout(timeout) { Err(RecvTimeoutError::Timeout) => panic!( - "test did not complete within 60 seconds, \ - we have (probably) entered an infinite loop!" + "test did not complete within {:?} seconds, \ + we have (probably) entered an infinite loop!", + timeout, ), // Did the test thread panic? We'll find out for sure when we `join` // with it. @@ -366,6 +356,49 @@ fn drop_cancels_remote_tasks() { thread.join().expect("test thread should not panic!") } +#[test] +fn drop_cancels_remote_tasks() { + // This test reproduces issue #1885. + with_timeout(Duration::from_secs(60), || { + let (tx, mut rx) = mpsc::channel::<()>(1024); + + let mut rt = rt(); + + let local = LocalSet::new(); + local.spawn_local(async move { while let Some(_) = rx.recv().await {} }); + local.block_on(&mut rt, async { + time::delay_for(Duration::from_millis(1)).await; + }); + + drop(tx); + + // This enters an infinite loop if the remote notified tasks are not + // properly cancelled. + drop(local); + }); +} + +#[test] +fn local_tasks_wake_join_all() { + // This test reproduces issue #2460. + with_timeout(Duration::from_secs(60), || { + use futures::future::join_all; + use tokio::task::LocalSet; + + let mut rt = rt(); + let set = LocalSet::new(); + let mut handles = Vec::new(); + + for _ in 1..=128 { + handles.push(set.spawn_local(async move { + tokio::task::spawn_local(async move {}).await.unwrap(); + })); + } + + rt.block_on(set.run_until(join_all(handles))); + }); +} + #[tokio::test] async fn local_tasks_are_polled_after_tick() { // Reproduces issues #1899 and #1900