Skip to content

Commit

Permalink
Merge pull request #106 from faern/backport-libstd-style
Browse files Browse the repository at this point in the history
Backport libstd style
  • Loading branch information
Amanieu committed Dec 8, 2018
2 parents 604d343 + 01e0f8d commit 8bd0e79
Show file tree
Hide file tree
Showing 17 changed files with 263 additions and 226 deletions.
225 changes: 111 additions & 114 deletions core/src/parking_lot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,7 @@ struct ThreadData {
parked_with_timeout: Cell<bool>,

// Extra data for deadlock detection
// TODO: once supported in stable replace with #[cfg...] & remove dummy struct/impl
#[allow(dead_code)]
#[cfg(feature = "deadlock_detection")]
deadlock_data: deadlock::DeadlockData,
}

Expand All @@ -157,13 +156,17 @@ impl ThreadData {
unpark_token: Cell::new(DEFAULT_UNPARK_TOKEN),
park_token: Cell::new(DEFAULT_PARK_TOKEN),
parked_with_timeout: Cell::new(false),
#[cfg(feature = "deadlock_detection")]
deadlock_data: deadlock::DeadlockData::new(),
}
}
}

// Returns a ThreadData structure for the current thread
unsafe fn get_thread_data(local: &mut Option<ThreadData>) -> &ThreadData {
// Invokes the given closure with a reference to the current thread `ThreadData`.
fn with_thread_data<F, T>(f: F) -> T
where
F: FnOnce(&ThreadData) -> T,
{
// Try to read from thread-local storage, but return None if the TLS has
// already been destroyed.
#[cfg(has_localkey_try_with)]
Expand All @@ -177,14 +180,19 @@ unsafe fn get_thread_data(local: &mut Option<ThreadData>) -> &ThreadData {

// Unlike word_lock::ThreadData, parking_lot::ThreadData is always expensive
// to construct. Try to use a thread-local version if possible.
let mut thread_data_ptr = ptr::null();
thread_local!(static THREAD_DATA: ThreadData = ThreadData::new());
if let Some(tls) = try_get_tls(&THREAD_DATA) {
return &*tls;
if let Some(tls_thread_data) = try_get_tls(&THREAD_DATA) {
thread_data_ptr = tls_thread_data;
}

// Otherwise just create a ThreadData on the stack
*local = Some(ThreadData::new());
local.as_ref().unwrap()
let mut thread_data_storage = None;
if thread_data_ptr.is_null() {
thread_data_ptr = thread_data_storage.get_or_insert_with(ThreadData::new);
}

f(unsafe { &*thread_data_ptr })
}

impl Drop for ThreadData {
Expand Down Expand Up @@ -579,106 +587,105 @@ unsafe fn park_internal(
timeout: Option<Instant>,
) -> ParkResult {
// Grab our thread data, this also ensures that the hash table exists
let mut thread_data = None;
let thread_data = get_thread_data(&mut thread_data);
with_thread_data(|thread_data| {
// Lock the bucket for the given key
let bucket = lock_bucket(key);

// Lock the bucket for the given key
let bucket = lock_bucket(key);
// If the validation function fails, just return
if !validate() {
bucket.mutex.unlock();
return ParkResult::Invalid;
}

// If the validation function fails, just return
if !validate() {
// Append our thread data to the queue and unlock the bucket
thread_data.parked_with_timeout.set(timeout.is_some());
thread_data.next_in_queue.set(ptr::null());
thread_data.key.store(key, Ordering::Relaxed);
thread_data.park_token.set(park_token);
thread_data.parker.prepare_park();
if !bucket.queue_head.get().is_null() {
(*bucket.queue_tail.get()).next_in_queue.set(thread_data);
} else {
bucket.queue_head.set(thread_data);
}
bucket.queue_tail.set(thread_data);
bucket.mutex.unlock();
return ParkResult::Invalid;
}

// Append our thread data to the queue and unlock the bucket
thread_data.parked_with_timeout.set(timeout.is_some());
thread_data.next_in_queue.set(ptr::null());
thread_data.key.store(key, Ordering::Relaxed);
thread_data.park_token.set(park_token);
thread_data.parker.prepare_park();
if !bucket.queue_head.get().is_null() {
(*bucket.queue_tail.get()).next_in_queue.set(thread_data);
} else {
bucket.queue_head.set(thread_data);
}
bucket.queue_tail.set(thread_data);
bucket.mutex.unlock();
// Invoke the pre-sleep callback
before_sleep();

// Park our thread and determine whether we were woken up by an unpark or by
// our timeout. Note that this isn't precise: we can still be unparked since
// we are still in the queue.
let unparked = match timeout {
Some(timeout) => thread_data.parker.park_until(timeout),
None => {
thread_data.parker.park();
// call deadlock detection on_unpark hook
deadlock::on_unpark(thread_data);
true
}
};

// Invoke the pre-sleep callback
before_sleep();

// Park our thread and determine whether we were woken up by an unpark or by
// our timeout. Note that this isn't precise: we can still be unparked since
// we are still in the queue.
let unparked = match timeout {
Some(timeout) => thread_data.parker.park_until(timeout),
None => {
thread_data.parker.park();
// call deadlock detection on_unpark hook
deadlock::on_unpark(thread_data);
true
// If we were unparked, return now
if unparked {
return ParkResult::Unparked(thread_data.unpark_token.get());
}
};

// If we were unparked, return now
if unparked {
return ParkResult::Unparked(thread_data.unpark_token.get());
}

// Lock our bucket again. Note that the hashtable may have been rehashed in
// the meantime. Our key may also have changed if we were requeued.
let (key, bucket) = lock_bucket_checked(&thread_data.key);
// Lock our bucket again. Note that the hashtable may have been rehashed in
// the meantime. Our key may also have changed if we were requeued.
let (key, bucket) = lock_bucket_checked(&thread_data.key);

// Now we need to check again if we were unparked or timed out. Unlike the
// last check this is precise because we hold the bucket lock.
if !thread_data.parker.timed_out() {
bucket.mutex.unlock();
return ParkResult::Unparked(thread_data.unpark_token.get());
}
// Now we need to check again if we were unparked or timed out. Unlike the
// last check this is precise because we hold the bucket lock.
if !thread_data.parker.timed_out() {
bucket.mutex.unlock();
return ParkResult::Unparked(thread_data.unpark_token.get());
}

// We timed out, so we now need to remove our thread from the queue
let mut link = &bucket.queue_head;
let mut current = bucket.queue_head.get();
let mut previous = ptr::null();
while !current.is_null() {
if current == thread_data {
let next = (*current).next_in_queue.get();
link.set(next);
let mut was_last_thread = true;
if bucket.queue_tail.get() == current {
bucket.queue_tail.set(previous);
} else {
// Scan the rest of the queue to see if there are any other
// entries with the given key.
let mut scan = next;
while !scan.is_null() {
if (*scan).key.load(Ordering::Relaxed) == key {
was_last_thread = false;
break;
// We timed out, so we now need to remove our thread from the queue
let mut link = &bucket.queue_head;
let mut current = bucket.queue_head.get();
let mut previous = ptr::null();
while !current.is_null() {
if current == thread_data {
let next = (*current).next_in_queue.get();
link.set(next);
let mut was_last_thread = true;
if bucket.queue_tail.get() == current {
bucket.queue_tail.set(previous);
} else {
// Scan the rest of the queue to see if there are any other
// entries with the given key.
let mut scan = next;
while !scan.is_null() {
if (*scan).key.load(Ordering::Relaxed) == key {
was_last_thread = false;
break;
}
scan = (*scan).next_in_queue.get();
}
scan = (*scan).next_in_queue.get();
}
}

// Callback to indicate that we timed out, and whether we were the
// last thread on the queue.
timed_out(key, was_last_thread);
break;
} else {
link = &(*current).next_in_queue;
previous = current;
current = link.get();
// Callback to indicate that we timed out, and whether we were the
// last thread on the queue.
timed_out(key, was_last_thread);
break;
} else {
link = &(*current).next_in_queue;
previous = current;
current = link.get();
}
}
}

// There should be no way for our thread to have been removed from the queue
// if we timed out.
debug_assert!(!current.is_null());
// There should be no way for our thread to have been removed from the queue
// if we timed out.
debug_assert!(!current.is_null());

// Unlock the bucket, we are done
bucket.mutex.unlock();
ParkResult::TimedOut
// Unlock the bucket, we are done
bucket.mutex.unlock();
ParkResult::TimedOut
})
}

/// Unparks one thread from the queue associated with the given key.
Expand Down Expand Up @@ -1102,16 +1109,6 @@ pub mod deadlock {
#[cfg(feature = "deadlock_detection")]
pub(super) use super::deadlock_impl::DeadlockData;

#[cfg(not(feature = "deadlock_detection"))]
pub(super) struct DeadlockData {}

#[cfg(not(feature = "deadlock_detection"))]
impl DeadlockData {
pub(super) fn new() -> Self {
DeadlockData {}
}
}

/// Acquire a resource identified by key in the deadlock detector
/// Noop if deadlock_detection feature isn't enabled.
/// Note: Call after the resource is acquired
Expand Down Expand Up @@ -1149,7 +1146,7 @@ pub mod deadlock {

#[cfg(feature = "deadlock_detection")]
mod deadlock_impl {
use super::{get_hashtable, get_thread_data, lock_bucket, ThreadData, NUM_THREADS};
use super::{get_hashtable, lock_bucket, with_thread_data, ThreadData, NUM_THREADS};
use backtrace::Backtrace;
use petgraph;
use petgraph::graphmap::DiGraphMap;
Expand Down Expand Up @@ -1222,19 +1219,19 @@ mod deadlock_impl {
}

pub unsafe fn acquire_resource(key: usize) {
let mut thread_data = None;
let thread_data = get_thread_data(&mut thread_data);
(*thread_data.deadlock_data.resources.get()).push(key);
with_thread_data(|thread_data| {
(*thread_data.deadlock_data.resources.get()).push(key);
});
}

pub unsafe fn release_resource(key: usize) {
let mut thread_data = None;
let thread_data = get_thread_data(&mut thread_data);
let resources = &mut (*thread_data.deadlock_data.resources.get());
match resources.iter().rposition(|x| *x == key) {
Some(p) => resources.swap_remove(p),
None => panic!("key {} not found in thread resources", key),
};
with_thread_data(|thread_data| {
let resources = &mut (*thread_data.deadlock_data.resources.get());
match resources.iter().rposition(|x| *x == key) {
Some(p) => resources.swap_remove(p),
None => panic!("key {} not found in thread resources", key),
};
});
}

pub fn check_deadlock() -> Vec<Vec<DeadlockedThread>> {
Expand Down
61 changes: 5 additions & 56 deletions core/src/spinwait.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,53 +5,8 @@
// http://opensource.org/licenses/MIT>, at your option. This file may not be
// copied, modified, or distributed except according to those terms.

#[cfg(unix)]
use libc;
use std::sync::atomic::spin_loop_hint;
#[cfg(not(any(windows, unix)))]
use std::thread;
#[cfg(windows)]
use winapi;

// Yields the rest of the current timeslice to the OS
#[cfg(windows)]
#[inline]
fn thread_yield() {
// Note that this is manually defined here rather than using the definition
// through `winapi`. The `winapi` definition comes from the `synchapi`
// header which enables the "synchronization.lib" library. It turns out,
// however that `Sleep` comes from `kernel32.dll` so this activation isn't
// necessary.
//
// This was originally identified in rust-lang/rust where on MinGW the
// libsynchronization.a library pulls in a dependency on a newer DLL not
// present in older versions of Windows. (see rust-lang/rust#49438)
//
// This is a bit of a hack for now and ideally we'd fix MinGW's own import
// libraries, but that'll probably take a lot longer than patching this here
// and avoiding the `synchapi` feature entirely.
extern "system" {
fn Sleep(a: winapi::shared::minwindef::DWORD);
}
unsafe {
// We don't use SwitchToThread here because it doesn't consider all
// threads in the system and the thread we are waiting for may not get
// selected.
Sleep(0);
}
}
#[cfg(unix)]
#[inline]
fn thread_yield() {
unsafe {
libc::sched_yield();
}
}
#[cfg(not(any(windows, unix)))]
#[inline]
fn thread_yield() {
thread::yield_now();
}
use thread_parker;

// Wastes some CPU time for the given number of iterations,
// using a hint to indicate to the CPU that we are spinning.
Expand All @@ -63,15 +18,16 @@ fn cpu_relax(iterations: u32) {
}

/// A counter used to perform exponential backoff in spin loops.
#[derive(Default)]
pub struct SpinWait {
counter: u32,
}

impl SpinWait {
/// Creates a new `SpinWait`.
#[inline]
pub fn new() -> SpinWait {
SpinWait { counter: 0 }
pub fn new() -> Self {
Self::default()
}

/// Resets a `SpinWait` to its initial state.
Expand All @@ -97,7 +53,7 @@ impl SpinWait {
if self.counter <= 3 {
cpu_relax(1 << self.counter);
} else {
thread_yield();
thread_parker::thread_yield();
}
true
}
Expand All @@ -116,10 +72,3 @@ impl SpinWait {
cpu_relax(1 << self.counter);
}
}

impl Default for SpinWait {
#[inline]
fn default() -> SpinWait {
SpinWait::new()
}
}
Loading

0 comments on commit 8bd0e79

Please sign in to comment.