diff --git a/tokio/src/runtime/coop.rs b/tokio/src/runtime/coop.rs index 0ba137ab67a..03839d208e2 100644 --- a/tokio/src/runtime/coop.rs +++ b/tokio/src/runtime/coop.rs @@ -36,6 +36,11 @@ use crate::runtime::context; #[derive(Debug, Copy, Clone)] pub(crate) struct Budget(Option); +pub(crate) struct BudgetDecrement { + success: bool, + hit_zero: bool, +} + impl Budget { /// Budget assigned to a task on each poll. /// @@ -172,9 +177,17 @@ cfg_coop! { context::budget(|cell| { let mut budget = cell.get(); - if budget.decrement() { + let decrement = budget.decrement(); + + if decrement.success { let restore = RestoreOnPending(Cell::new(cell.get())); cell.set(budget); + + // avoid double counting + if decrement.hit_zero { + inc_budget_forced_yield_count(); + } + Poll::Ready(restore) } else { cx.waker().wake_by_ref(); @@ -183,19 +196,43 @@ cfg_coop! { }).unwrap_or(Poll::Ready(RestoreOnPending(Cell::new(Budget::unconstrained())))) } + cfg_rt! { + cfg_metrics! { + #[inline(always)] + fn inc_budget_forced_yield_count() { + if let Ok(handle) = context::try_current() { + handle.scheduler_metrics().inc_budget_forced_yield_count(); + } + } + } + + cfg_not_metrics! { + #[inline(always)] + fn inc_budget_forced_yield_count() {} + } + } + + cfg_not_rt! { + #[inline(always)] + fn inc_budget_forced_yield_count() {} + } + impl Budget { /// Decrements the budget. Returns `true` if successful. Decrementing fails /// when there is not enough remaining budget. - fn decrement(&mut self) -> bool { + fn decrement(&mut self) -> BudgetDecrement { if let Some(num) = &mut self.0 { if *num > 0 { *num -= 1; - true + + let hit_zero = *num == 0; + + BudgetDecrement { success: true, hit_zero } } else { - false + BudgetDecrement { success: false, hit_zero: false } } } else { - true + BudgetDecrement { success: true, hit_zero: false } } } diff --git a/tokio/src/runtime/metrics/runtime.rs b/tokio/src/runtime/metrics/runtime.rs index d29cb3d48ff..fe576bb0c6d 100644 --- a/tokio/src/runtime/metrics/runtime.rs +++ b/tokio/src/runtime/metrics/runtime.rs @@ -124,6 +124,21 @@ impl RuntimeMetrics { .load(Relaxed) } + /// Returns the number of times that tasks have been forced to yield back to the scheduler + /// after exhausting their task budgets. + /// + /// This count starts at zero when the runtime is created and increases by one each time a task yields due to exhausting its budget. + /// + /// The counter is monotonically increasing. It is never decremented or + /// reset to zero. + pub fn budget_forced_yield_count(&self) -> u64 { + self.handle + .inner + .scheduler_metrics() + .budget_forced_yield_count + .load(Relaxed) + } + /// Returns the total number of times the given worker thread has parked. /// /// The worker park count starts at zero when the runtime is created and diff --git a/tokio/src/runtime/metrics/scheduler.rs b/tokio/src/runtime/metrics/scheduler.rs index d1ba3b64420..d9f8edfaabc 100644 --- a/tokio/src/runtime/metrics/scheduler.rs +++ b/tokio/src/runtime/metrics/scheduler.rs @@ -11,12 +11,14 @@ use crate::loom::sync::atomic::{AtomicU64, Ordering::Relaxed}; pub(crate) struct SchedulerMetrics { /// Number of tasks that are scheduled from outside the runtime. pub(super) remote_schedule_count: AtomicU64, + pub(super) budget_forced_yield_count: AtomicU64, } impl SchedulerMetrics { pub(crate) fn new() -> SchedulerMetrics { SchedulerMetrics { remote_schedule_count: AtomicU64::new(0), + budget_forced_yield_count: AtomicU64::new(0), } } @@ -24,4 +26,9 @@ impl SchedulerMetrics { pub(crate) fn inc_remote_schedule_count(&self) { self.remote_schedule_count.fetch_add(1, Relaxed); } + + /// Increment the number of tasks forced to yield due to budget exhaustion + pub(crate) fn inc_budget_forced_yield_count(&self) { + self.budget_forced_yield_count.fetch_add(1, Relaxed); + } } diff --git a/tokio/tests/rt_metrics.rs b/tokio/tests/rt_metrics.rs index fdb2fb5f551..d238808c8ed 100644 --- a/tokio/tests/rt_metrics.rs +++ b/tokio/tests/rt_metrics.rs @@ -1,9 +1,13 @@ #![warn(rust_2018_idioms)] #![cfg(all(feature = "full", tokio_unstable, not(tokio_wasi)))] +use std::future::Future; use std::sync::{Arc, Mutex}; +use std::task::Poll; +use tokio::macros::support::poll_fn; use tokio::runtime::Runtime; +use tokio::task::consume_budget; use tokio::time::{self, Duration}; #[test] @@ -433,6 +437,78 @@ fn worker_local_queue_depth() { }); } +#[test] +fn budget_exhaustion_yield() { + let rt = current_thread(); + let metrics = rt.metrics(); + + assert_eq!(0, metrics.budget_forced_yield_count()); + + let mut did_yield = false; + + // block on a task which consumes budget until it yields + rt.block_on(poll_fn(|cx| loop { + if did_yield { + return Poll::Ready(()); + } + + let fut = consume_budget(); + tokio::pin!(fut); + + if fut.poll(cx).is_pending() { + did_yield = true; + return Poll::Pending; + } + })); + + assert_eq!(1, rt.metrics().budget_forced_yield_count()); +} + +#[test] +fn budget_exhaustion_yield_with_joins() { + let rt = current_thread(); + let metrics = rt.metrics(); + + assert_eq!(0, metrics.budget_forced_yield_count()); + + let mut did_yield_1 = false; + let mut did_yield_2 = false; + + // block on a task which consumes budget until it yields + rt.block_on(async { + tokio::join!( + poll_fn(|cx| loop { + if did_yield_1 { + return Poll::Ready(()); + } + + let fut = consume_budget(); + tokio::pin!(fut); + + if fut.poll(cx).is_pending() { + did_yield_1 = true; + return Poll::Pending; + } + }), + poll_fn(|cx| loop { + if did_yield_2 { + return Poll::Ready(()); + } + + let fut = consume_budget(); + tokio::pin!(fut); + + if fut.poll(cx).is_pending() { + did_yield_2 = true; + return Poll::Pending; + } + }) + ) + }); + + assert_eq!(1, rt.metrics().budget_forced_yield_count()); +} + #[cfg(any(target_os = "linux", target_os = "macos"))] #[test] fn io_driver_fd_count() {