Skip to content

Commit

Permalink
preemption: Add ability to sub-budget
Browse files Browse the repository at this point in the history
  • Loading branch information
jonhoo committed Jan 24, 2020
1 parent b33ce77 commit 9d10726
Showing 1 changed file with 108 additions and 0 deletions.
108 changes: 108 additions & 0 deletions tokio/src/league/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,114 @@ pub(crate) fn opt_in() {
});
}

/// Invoke `f` with a subset of the remaining budget.
///
/// This is useful if you have sub-futures that you need to poll, but that you want to restrict
/// from using up your entire budget. For example, imagine the following future:
///
/// ```rust
/// # use std::{future::Future, pin::Pin, task::{Context, Poll}};
/// use futures::stream::FuturesUnordered;
/// struct MyFuture<F1, F2> {
/// big: FuturesUnordered<F1>,
/// small: F2,
/// }
///
/// use tokio::stream::Stream;
/// impl<F1, F2> Future for MyFuture<F1, F2>
/// where F1: Future, F2: Future
/// # , F1: Unpin, F2: Unpin
/// {
/// type Output = F2::Output;
///
/// // fn poll(...)
/// # fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<F2::Output> {
/// # let this = &mut *self;
/// let mut big = // something to pin self.big
/// # Pin::new(&mut this.big);
/// let small = // something to pin self.small
/// # Pin::new(&mut this.small);
///
/// // see if any of the big futures have finished
/// while let Some(e) = futures::ready!(big.as_mut().poll_next(cx)) {
/// // do something with e
/// # let _ = e;
/// }
///
/// // see if the small future has finished
/// small.poll(cx)
/// }
/// # }
/// ```
///
/// It could be that every time `poll` gets called, `big` ends up spending the entire budget, and
/// `small` never gets polled. That would be sad. If you want to stick up for the little future,
/// that's what `grant` is for. It lets you portion out a smaller part of the yield budget to a
/// particular segment of your code. In the code above, you would write
///
/// ```rust,ignore
/// # use std::{future::Future, pin::Pin, task::{Context, Poll}};
/// # use futures::stream::FuturesUnordered;
/// # struct MyFuture<F1, F2> {
/// # big: FuturesUnordered<F1>,
/// # small: F2,
/// # }
/// #
/// # use tokio::stream::Stream;
/// # impl<F1, F2> Future for MyFuture<F1, F2>
/// # where F1: Future, F2: Future
/// # , F1: Unpin, F2: Unpin
/// # {
/// # type Output = F2::Output;
/// # fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<F2::Output> {
/// # let this = &mut *self;
/// # let mut big = Pin::new(&mut this.big);
/// # let small = Pin::new(&mut this.small);
/// #
/// // see if any of the big futures have finished
/// while let Some(e) = futures::ready!(tokio::league::grant(64, || big.as_mut().poll_next(cx))) {
/// # // do something with e
/// # let _ = e;
/// # }
/// # small.poll(cx)
/// # }
/// # }
/// ```
///
/// Now, even if `big` spends its entire budget, `small` will likely be left with some budget left
/// to also do useful work. In particular, if the remaining budget was `N` at the start of `poll`,
/// `small` will have at least a budget of `N - 64`. It may be more if `big` did not spend its
/// entire budget.
///
/// Note that you cannot _increase_ your budget by calling `grant`. The budget granted to the code
/// inside the buget is the _minimum_ of the _current_ budget and the bound.
///
#[allow(unreachable_pub, dead_code)]
pub fn grant<F, R>(bound: usize, f: F) -> R
where
F: FnOnce() -> R,
{
HITS.with(|hits| {
let budget = hits.get();
// with_bound cannot _increase_ the remaining budget
let bound = std::cmp::min(budget, bound);
// When f() exits, how much should we add to what is left?
let floor = budget.saturating_sub(bound);
// Make sure we restore the remaining budget even on panic
struct RestoreBudget<'a>(&'a Cell<usize>, usize);
impl<'a> Drop for RestoreBudget<'a> {
fn drop(&mut self) {
let left = self.0.get();
self.0.set(self.1 + left);
}
}
// Time to restrict!
hits.set(bound);
let _restore = RestoreBudget(&hits, floor);
f()
})
}

/// Returns `Poll::Pending` if the current task has exceeded its budget and should yield.
#[allow(unreachable_pub, dead_code)]
pub fn poll_cooperate(cx: &mut Context<'_>) -> Poll<()> {
Expand Down

0 comments on commit 9d10726

Please sign in to comment.