Skip to content

Commit

Permalink
task: fix LocalSet having a single shared task budget (#2462)
Browse files Browse the repository at this point in the history
## Motivation

Currently, an issue exists where a `LocalSet` has a single cooperative
task budget that's shared across all futures spawned on the `LocalSet`
_and_ by any future passed to `LocalSet::run_until` or
`LocalSet::block_on`. Because these methods will poll the `run_until`
future before polling spawned tasks, it is possible for that task to
_always_ deterministically starve the entire `LocalSet` so that no local
tasks can proceed. When the completion of that future _itself_ depends
on other tasks on the `LocalSet`, this will then result in a deadlock,
as in issue #2460.

A detailed description of why this is the case, taken from [this 
comment][1]:

`LocalSet` wraps each time a local task is run in `budget`:
https://github.com/tokio-rs/tokio/blob/947045b9445f15fb9314ba0892efa2251076ae73/tokio/src/task/local.rs#L406

This is identical to what tokio's other schedulers do when running
tasks, and in theory should give each task its own budget every time
it's polled. 

_However_, `LocalSet` is different from other schedulers. Unlike the
runtime schedulers, a `LocalSet` is itself a future that's run on
another scheduler, in `block_on`.  `block_on` _also_ sets a budget:
https://github.com/tokio-rs/tokio/blob/947045b9445f15fb9314ba0892efa2251076ae73/tokio/src/runtime/basic_scheduler.rs#L131

The docs for `budget` state that:
https://github.com/tokio-rs/tokio/blob/947045b9445f15fb9314ba0892efa2251076ae73/tokio/src/coop.rs#L73

This means that inside of a `LocalSet`, the calls to `budget` are
no-ops. Instead, each future polled by the `LocalSet` is subtracting
from a single global budget.

`LocalSet`'s `RunUntil` future polls the provided future before polling
any other tasks spawned on the local set:
https://github.com/tokio-rs/tokio/blob/947045b9445f15fb9314ba0892efa2251076ae73/tokio/src/task/local.rs#L525-L535

In this case, the provided future is `JoinAll`. Unfortunately, every
time a `JoinAll` is polled, it polls _every_ joined future that has not
yet completed. When the number of futures in the `JoinAll` is >= 128,
this means that the `JoinAll` immediately exhausts the task budget. This
would, in theory, be a _good_ thing --- if the `JoinAll` had a huge
number of `JoinHandle`s in it and none of them are ready, it would limit
the time we spend polling those join handles. 

However, because the `LocalSet` _actually_ has a single shared task
budget, this means polling the `JoinAll` _always_ exhausts the entire
budget. There is now no budget remaining to poll any other tasks spawned
on the `LocalSet`, and they are never able to complete.

[1]: #2460 (comment)

## Solution

This branch solves this issue by resetting the task budget when polling
a `LocalSet`. I've added a new function to `coop` for resetting the task
budget to `UNCONSTRAINED` for the duration of a closure, and thus
allowing the `budget` calls in `LocalSet` to _actually_ create a new
budget for each spawned local task. Additionally, I've changed
`LocalSet` to _also_ ensure that a separate task budget is applied to
any future passed to `block_on`/`run_until`.

Additionally, I've added a test reproducing the issue described in
#2460. This test fails prior to this change, and passes after it.

Fixes #2460

Signed-off-by: Eliza Weisman <eliza@buoyant.io>
  • Loading branch information
hawkw authored Apr 30, 2020
1 parent fa9743f commit 20b5df9
Show file tree
Hide file tree
Showing 4 changed files with 147 additions and 55 deletions.
49 changes: 41 additions & 8 deletions tokio/src/coop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,15 +85,11 @@ where
return f();
}

struct Guard<'a>(&'a Cell<usize>);
impl<'a> Drop for Guard<'a> {
fn drop(&mut self) {
self.0.set(UNCONSTRAINED);
}
}

hits.set(BUDGET);
let _guard = Guard(hits);
let _guard = ResetGuard {
hits,
prev: UNCONSTRAINED,
};
f()
})
}
Expand All @@ -114,6 +110,32 @@ cfg_blocking_impl! {
}
}

cfg_rt_core! {
cfg_rt_util! {
/// Run the given closure with a new task budget, resetting the previous
/// budget when the closure finishes.
///
/// This is intended for internal use by `LocalSet` and (potentially) other
/// similar schedulers which are themselves futures, and need a fresh budget
/// for each of their children.
#[inline(always)]
pub(crate) fn reset<F, R>(f: F) -> R
where
F: FnOnce() -> R,
{
HITS.with(move |hits| {
let prev = hits.get();
hits.set(UNCONSTRAINED);
let _guard = ResetGuard {
hits,
prev,
};
f()
})
}
}
}

/// Invoke `f` with a subset of the remaining budget.
///
/// This is useful if you have sub-futures that you need to poll, but that you want to restrict
Expand Down Expand Up @@ -289,6 +311,11 @@ pin_project_lite::pin_project! {
}
}

struct ResetGuard<'a> {
hits: &'a Cell<usize>,
prev: usize,
}

impl<F: Future> Future for CoopFuture<F> {
type Output = F::Output;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
Expand Down Expand Up @@ -327,6 +354,12 @@ cfg_sync! {
impl<F> CoopFutureExt for F where F: Future {}
}

impl<'a> Drop for ResetGuard<'a> {
fn drop(&mut self) {
self.hits.set(self.prev);
}
}

#[cfg(all(test, not(loom)))]
mod test {
use super::*;
Expand Down
17 changes: 17 additions & 0 deletions tokio/src/macros/cfg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,23 @@ macro_rules! cfg_blocking_impl {
}
}

/// Enables blocking API internals
macro_rules! cfg_blocking_impl_or_task {
($($item:item)*) => {
$(
#[cfg(any(
feature = "blocking",
feature = "fs",
feature = "dns",
feature = "io-std",
feature = "rt-threaded",
feature = "task",
))]
$item
)*
}
}

/// Enables enter::block_on
macro_rules! cfg_block_on {
($($item:item)*) => {
Expand Down
61 changes: 35 additions & 26 deletions tokio/src/task/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -454,20 +454,24 @@ impl Future for LocalSet {
// Register the waker before starting to work
self.context.shared.waker.register_by_ref(cx.waker());

if self.with(|| self.tick()) {
// If `tick` returns true, we need to notify the local future again:
// there are still tasks remaining in the run queue.
cx.waker().wake_by_ref();
Poll::Pending
} else if self.context.tasks.borrow().owned.is_empty() {
// If the scheduler has no remaining futures, we're done!
Poll::Ready(())
} else {
// There are still futures in the local set, but we've polled all the
// futures in the run queue. Therefore, we can just return Pending
// since the remaining futures will be woken from somewhere else.
Poll::Pending
}
// Reset any previous task budget while polling tasks spawned on the
// `LocalSet`, ensuring that each has its own separate budget.
crate::coop::reset(|| {
if self.with(|| self.tick()) {
// If `tick` returns true, we need to notify the local future again:
// there are still tasks remaining in the run queue.
cx.waker().wake_by_ref();
Poll::Pending
} else if self.context.tasks.borrow().owned.is_empty() {
// If the scheduler has no remaining futures, we're done!
Poll::Ready(())
} else {
// There are still futures in the local set, but we've polled all the
// futures in the run queue. Therefore, we can just return Pending
// since the remaining futures will be woken from somewhere else.
Poll::Pending
}
})
}
}

Expand Down Expand Up @@ -521,18 +525,23 @@ impl<T: Future> Future for RunUntil<'_, T> {
.register_by_ref(cx.waker());

let _no_blocking = crate::runtime::enter::disallow_blocking();

if let Poll::Ready(output) = me.future.poll(cx) {
return Poll::Ready(output);
}

if me.local_set.tick() {
// If `tick` returns `true`, we need to notify the local future again:
// there are still tasks remaining in the run queue.
cx.waker().wake_by_ref();
}

Poll::Pending
// Reset any previous task budget so that the future passed to
// `run_until` and any tasks spawned on the `LocalSet` have their
// own budgets.
crate::coop::reset(|| {
let f = me.future;
if let Poll::Ready(output) = crate::coop::budget(|| f.poll(cx)) {
return Poll::Ready(output);
}

if me.local_set.tick() {
// If `tick` returns `true`, we need to notify the local future again:
// there are still tasks remaining in the run queue.
cx.waker().wake_by_ref();
}

Poll::Pending
})
})
}
}
Expand Down
75 changes: 54 additions & 21 deletions tokio/tests/task_local_set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -312,28 +312,17 @@ fn drop_cancels_tasks() {
assert_eq!(1, Rc::strong_count(&rc1));
}

#[test]
fn drop_cancels_remote_tasks() {
// This test reproduces issue #1885.
/// Runs a test function in a separate thread, and panics if the test does not
/// complete within the specified timeout, or if the test function panics.
///
/// This is intended for running tests whose failure mode is a hang or infinite
/// loop that cannot be detected otherwise.
fn with_timeout(timeout: Duration, f: impl FnOnce() + Send + 'static) {
use std::sync::mpsc::RecvTimeoutError;

let (done_tx, done_rx) = std::sync::mpsc::channel();
let thread = std::thread::spawn(move || {
let (tx, mut rx) = mpsc::channel::<()>(1024);

let mut rt = rt();

let local = LocalSet::new();
local.spawn_local(async move { while let Some(_) = rx.recv().await {} });
local.block_on(&mut rt, async {
time::delay_for(Duration::from_millis(1)).await;
});

drop(tx);

// This enters an infinite loop if the remote notified tasks are not
// properly cancelled.
drop(local);
f();

// Send a message on the channel so that the test thread can
// determine if we have entered an infinite loop:
Expand All @@ -349,10 +338,11 @@ fn drop_cancels_remote_tasks() {
//
// Note that it should definitely complete in under a minute, but just
// in case CI is slow, we'll give it a long timeout.
match done_rx.recv_timeout(Duration::from_secs(60)) {
match done_rx.recv_timeout(timeout) {
Err(RecvTimeoutError::Timeout) => panic!(
"test did not complete within 60 seconds, \
we have (probably) entered an infinite loop!"
"test did not complete within {:?} seconds, \
we have (probably) entered an infinite loop!",
timeout,
),
// Did the test thread panic? We'll find out for sure when we `join`
// with it.
Expand All @@ -366,6 +356,49 @@ fn drop_cancels_remote_tasks() {
thread.join().expect("test thread should not panic!")
}

#[test]
fn drop_cancels_remote_tasks() {
// This test reproduces issue #1885.
with_timeout(Duration::from_secs(60), || {
let (tx, mut rx) = mpsc::channel::<()>(1024);

let mut rt = rt();

let local = LocalSet::new();
local.spawn_local(async move { while let Some(_) = rx.recv().await {} });
local.block_on(&mut rt, async {
time::delay_for(Duration::from_millis(1)).await;
});

drop(tx);

// This enters an infinite loop if the remote notified tasks are not
// properly cancelled.
drop(local);
});
}

#[test]
fn local_tasks_wake_join_all() {
// This test reproduces issue #2460.
with_timeout(Duration::from_secs(60), || {
use futures::future::join_all;
use tokio::task::LocalSet;

let mut rt = rt();
let set = LocalSet::new();
let mut handles = Vec::new();

for _ in 1..=128 {
handles.push(set.spawn_local(async move {
tokio::task::spawn_local(async move {}).await.unwrap();
}));
}

rt.block_on(set.run_until(join_all(handles)));
});
}

#[tokio::test]
async fn local_tasks_are_polled_after_tick() {
// Reproduces issues #1899 and #1900
Expand Down

0 comments on commit 20b5df9

Please sign in to comment.