Skip to content

Commit

Permalink
rt: fix bug in work-stealing queue (#2387)
Browse files Browse the repository at this point in the history
Fixes a couple bugs in the work-stealing queue introduced as
part of #2315. First, the cursor needs to be able to represent more
values than the size of the buffer. This is to be able to track if
`tail` is ahead of `head` or if they are identical. This bug resulted in
the "overflow" path being taken before the buffer was full.

The second bug can happen when a queue is being stolen from concurrently
with stealing into. In this case, it is possible for buffer slots to be
overwritten before they are released by the stealer. This is harder to
happen in practice due to the first bug preventing the queue from
filling up 100%, but could still happen. It triggered an assertion in
`steal_into`. This bug slipped through due to a bug in loom not
correctly catching the case. The loom bug is fixed as part of
tokio-rs/loom#119.

Fixes: #2382
  • Loading branch information
carllerche authored Apr 9, 2020
1 parent de8326a commit 58ba45a
Show file tree
Hide file tree
Showing 8 changed files with 387 additions and 44 deletions.
2 changes: 1 addition & 1 deletion tokio/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ tempfile = "3.1.0"
# loom is currently not compiling on windows.
# See: https://github.com/Xudong-Huang/generator-rs/issues/19
[target.'cfg(not(windows))'.dev-dependencies]
loom = { version = "0.3.0", features = ["futures", "checkpoint"] }
loom = { version = "0.3.1", features = ["futures", "checkpoint"] }

[package.metadata.docs.rs]
all-features = true
Expand Down
44 changes: 44 additions & 0 deletions tokio/src/loom/std/atomic_u16.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
use std::cell::UnsafeCell;
use std::fmt;
use std::ops::Deref;

/// `AtomicU16` providing an additional `load_unsync` function.
pub(crate) struct AtomicU16 {
inner: UnsafeCell<std::sync::atomic::AtomicU16>,
}

unsafe impl Send for AtomicU16 {}
unsafe impl Sync for AtomicU16 {}

impl AtomicU16 {
pub(crate) fn new(val: u16) -> AtomicU16 {
let inner = UnsafeCell::new(std::sync::atomic::AtomicU16::new(val));
AtomicU16 { inner }
}

/// Performs an unsynchronized load.
///
/// # Safety
///
/// All mutations must have happened before the unsynchronized load.
/// Additionally, there must be no concurrent mutations.
pub(crate) unsafe fn unsync_load(&self) -> u16 {
*(*self.inner.get()).get_mut()
}
}

impl Deref for AtomicU16 {
type Target = std::sync::atomic::AtomicU16;

fn deref(&self) -> &Self::Target {
// safety: it is always safe to access `&self` fns on the inner value as
// we never perform unsafe mutations.
unsafe { &*self.inner.get() }
}
}

impl fmt::Debug for AtomicU16 {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
self.deref().fmt(fmt)
}
}
10 changes: 0 additions & 10 deletions tokio/src/loom/std/atomic_u32.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,6 @@ impl AtomicU32 {
let inner = UnsafeCell::new(std::sync::atomic::AtomicU32::new(val));
AtomicU32 { inner }
}

/// Performs an unsynchronized load.
///
/// # Safety
///
/// All mutations must have happened before the unsynchronized load.
/// Additionally, there must be no concurrent mutations.
pub(crate) unsafe fn unsync_load(&self) -> u32 {
*(*self.inner.get()).get_mut()
}
}

impl Deref for AtomicU32 {
Expand Down
10 changes: 0 additions & 10 deletions tokio/src/loom/std/atomic_u8.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,6 @@ impl AtomicU8 {
let inner = UnsafeCell::new(std::sync::atomic::AtomicU8::new(val));
AtomicU8 { inner }
}

/// Performs an unsynchronized load.
///
/// # Safety
///
/// All mutations must have happened before the unsynchronized load.
/// Additionally, there must be no concurrent mutations.
pub(crate) unsafe fn unsync_load(&self) -> u8 {
*(*self.inner.get()).get_mut()
}
}

impl Deref for AtomicU8 {
Expand Down
5 changes: 4 additions & 1 deletion tokio/src/loom/std/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
#![cfg_attr(any(not(feature = "full"), loom), allow(unused_imports, dead_code))]

mod atomic_ptr;
mod atomic_u16;
mod atomic_u32;
mod atomic_u64;
mod atomic_u8;
mod atomic_usize;
Expand Down Expand Up @@ -60,11 +62,12 @@ pub(crate) mod sync {

pub(crate) mod atomic {
pub(crate) use crate::loom::std::atomic_ptr::AtomicPtr;
pub(crate) use crate::loom::std::atomic_u16::AtomicU16;
pub(crate) use crate::loom::std::atomic_u32::AtomicU32;
pub(crate) use crate::loom::std::atomic_u64::AtomicU64;
pub(crate) use crate::loom::std::atomic_u8::AtomicU8;
pub(crate) use crate::loom::std::atomic_usize::AtomicUsize;

pub(crate) use std::sync::atomic::AtomicU16;
pub(crate) use std::sync::atomic::{spin_loop_hint, AtomicBool};
}
}
Expand Down
62 changes: 40 additions & 22 deletions tokio/src/runtime/queue.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! Run-queue structures to support a work-stealing scheduler

use crate::loom::cell::UnsafeCell;
use crate::loom::sync::atomic::{AtomicU16, AtomicU8, AtomicUsize};
use crate::loom::sync::atomic::{AtomicU16, AtomicU32, AtomicUsize};
use crate::loom::sync::{Arc, Mutex};
use crate::runtime::task;

Expand Down Expand Up @@ -34,17 +34,19 @@ pub(super) struct Inject<T: 'static> {
pub(super) struct Inner<T: 'static> {
/// Concurrently updated by many threads.
///
/// Contains two `u8` values. The LSB byte is the "real" head of the queue.
/// The `u8` in the MSB is set by a stealer in process of stealing values.
/// It represents the first value being stolen in the batch.
/// Contains two `u16` values. The LSB byte is the "real" head of the queue.
/// The `u16` in the MSB is set by a stealer in process of stealing values.
/// It represents the first value being stolen in the batch. `u16` is used
/// in order to distinguish between `head == tail` and `head == tail -
/// capacity`.
///
/// When both `u8` values are the same, there is no active stealer.
/// When both `u16` values are the same, there is no active stealer.
///
/// Tracking an in-progress stealer prevents a wrapping scenario.
head: AtomicU16,
head: AtomicU32,

/// Only updated by producer thread but read by many threads.
tail: AtomicU8,
tail: AtomicU16,

/// Elements
buffer: Box<[UnsafeCell<MaybeUninit<task::Notified<T>>>]>,
Expand Down Expand Up @@ -86,8 +88,8 @@ pub(super) fn local<T: 'static>() -> (Steal<T>, Local<T>) {
}

let inner = Arc::new(Inner {
head: AtomicU16::new(0),
tail: AtomicU8::new(0),
head: AtomicU32::new(0),
tail: AtomicU16::new(0),
buffer: buffer.into(),
});

Expand Down Expand Up @@ -115,7 +117,7 @@ impl<T> Local<T> {
// safety: this is the **only** thread that updates this cell.
let tail = unsafe { self.inner.tail.unsync_load() };

if steal as usize & MASK != tail.wrapping_add(1) as usize & MASK {
if tail.wrapping_sub(steal) < LOCAL_QUEUE_CAPACITY as u16 {
// There is capacity for the task
break tail;
} else if steal != real {
Expand Down Expand Up @@ -165,16 +167,16 @@ impl<T> Local<T> {
fn push_overflow(
&mut self,
task: task::Notified<T>,
head: u8,
tail: u8,
head: u16,
tail: u16,
inject: &Inject<T>,
) -> Result<(), task::Notified<T>> {
const BATCH_LEN: usize = LOCAL_QUEUE_CAPACITY / 2 + 1;

let n = (LOCAL_QUEUE_CAPACITY / 2) as u8;
let n = (LOCAL_QUEUE_CAPACITY / 2) as u16;
assert_eq!(
tail.wrapping_sub(head) as usize,
LOCAL_QUEUE_CAPACITY - 1,
LOCAL_QUEUE_CAPACITY,
"queue is not full; tail = {}; head = {}",
tail,
head
Expand Down Expand Up @@ -261,10 +263,12 @@ impl<T> Local<T> {

let next_real = real.wrapping_add(1);

// Only update `steal` component if it differs from `real`.
// If `steal == real` there are no concurrent stealers. Both `steal`
// and `real` are updated.
let next = if steal == real {
pack(next_real, next_real)
} else {
assert_ne!(steal, next_real);
pack(steal, next_real)
};

Expand Down Expand Up @@ -295,6 +299,17 @@ impl<T> Steal<T> {
// holds a mutable reference.
let dst_tail = unsafe { dst.inner.tail.unsync_load() };

// To the caller, `dst` may **look** empty but still have values
// contained in the buffer. If another thread is concurrently stealing
// from `dst` there may not be enough capacity to steal.
let (steal, _) = unpack(dst.inner.head.load(Acquire));

if dst_tail.wrapping_sub(steal) > LOCAL_QUEUE_CAPACITY as u16 / 2 {
// we *could* try to steal less here, but for simplicity, we're just
// going to abort.
return None;
}

// Steal the tasks into `dst`'s buffer. This does not yet expose the
// tasks in `dst`.
let mut n = self.steal_into2(dst, dst_tail);
Expand Down Expand Up @@ -327,7 +342,7 @@ impl<T> Steal<T> {

// Steal tasks from `self`, placing them into `dst`. Returns the number of
// tasks that were stolen.
fn steal_into2(&self, dst: &mut Local<T>, dst_tail: u8) -> u8 {
fn steal_into2(&self, dst: &mut Local<T>, dst_tail: u16) -> u16 {
let mut prev_packed = self.0.head.load(Acquire);
let mut next_packed;

Expand All @@ -352,6 +367,7 @@ impl<T> Steal<T> {

// Update the real head index to acquire the tasks.
let steal_to = src_head_real.wrapping_add(n);
assert_ne!(src_head_steal, steal_to);
next_packed = pack(src_head_steal, steal_to);

// Claim all those tasks. This is done by incrementing the "real"
Expand All @@ -368,6 +384,8 @@ impl<T> Steal<T> {
}
};

assert!(n <= LOCAL_QUEUE_CAPACITY as u16 / 2, "actual = {}", n);

let (first, _) = unpack(next_packed);

// Take all the tasks
Expand Down Expand Up @@ -594,16 +612,16 @@ fn set_next(header: NonNull<task::Header>, val: Option<NonNull<task::Header>>) {

/// Split the head value into the real head and the index a stealer is working
/// on.
fn unpack(n: u16) -> (u8, u8) {
let real = n & u8::max_value() as u16;
let steal = n >> 8;
fn unpack(n: u32) -> (u16, u16) {
let real = n & u16::max_value() as u32;
let steal = n >> 16;

(steal as u8, real as u8)
(steal as u16, real as u16)
}

/// Join the two head values
fn pack(steal: u8, real: u8) -> u16 {
(real as u16) | ((steal as u16) << 8)
fn pack(steal: u16, real: u16) -> u32 {
(real as u32) | ((steal as u32) << 16)
}

#[test]
Expand Down
Loading

0 comments on commit 58ba45a

Please sign in to comment.