Skip to content

Commit

Permalink
auto merge of #12235 : huonw/rust/raii-lock, r=alexcrichton
Browse files Browse the repository at this point in the history
- adds a `LockGuard` type returned by `.lock` and `.trylock` that unlocks the mutex in the destructor
- renames `mutex::Mutex` to `StaticNativeMutex` 
- adds a `NativeMutex` type with a destructor
- removes `LittleLock`
- adds `#[must_use]` to `sync::mutex::Guard` to remind people to use it
  • Loading branch information
bors committed Feb 15, 2014
2 parents 6b025c8 + 4668cdf commit d98668a
Show file tree
Hide file tree
Showing 15 changed files with 334 additions and 223 deletions.
32 changes: 15 additions & 17 deletions src/libgreen/sched.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use std::rt::rtio::{RemoteCallback, PausableIdleCallback, Callback, EventLoop};
use std::rt::task::BlockedTask;
use std::rt::task::Task;
use std::sync::deque;
use std::unstable::mutex::Mutex;
use std::unstable::mutex::NativeMutex;
use std::unstable::raw;

use TaskState;
Expand Down Expand Up @@ -669,8 +669,7 @@ impl Scheduler {
// is acquired here. This is the resumption points and the "bounce"
// that it is referring to.
unsafe {
current_task.nasty_deschedule_lock.lock();
current_task.nasty_deschedule_lock.unlock();
let _guard = current_task.nasty_deschedule_lock.lock();
}
return current_task;
}
Expand Down Expand Up @@ -765,10 +764,11 @@ impl Scheduler {
// to it, but we're guaranteed that the task won't exit until we've
// unlocked the lock so there's no worry of this memory going away.
let cur = self.change_task_context(cur, next, |sched, mut task| {
let lock: *mut Mutex = &mut task.nasty_deschedule_lock;
unsafe { (*lock).lock() }
f(sched, BlockedTask::block(task.swap()));
unsafe { (*lock).unlock() }
let lock: *mut NativeMutex = &mut task.nasty_deschedule_lock;
unsafe {
let _guard = (*lock).lock();
f(sched, BlockedTask::block(task.swap()));
}
});
cur.put();
}
Expand Down Expand Up @@ -1453,8 +1453,8 @@ mod test {

#[test]
fn test_spawn_sched_blocking() {
use std::unstable::mutex::{Mutex, MUTEX_INIT};
static mut LOCK: Mutex = MUTEX_INIT;
use std::unstable::mutex::{StaticNativeMutex, NATIVE_MUTEX_INIT};
static mut LOCK: StaticNativeMutex = NATIVE_MUTEX_INIT;

// Testing that a task in one scheduler can block in foreign code
// without affecting other schedulers
Expand All @@ -1466,12 +1466,11 @@ mod test {
let mut handle = pool.spawn_sched();
handle.send(PinnedTask(pool.task(TaskOpts::new(), proc() {
unsafe {
LOCK.lock();
let mut guard = LOCK.lock();

start_ch.send(());
LOCK.wait(); // block the scheduler thread
LOCK.signal(); // let them know we have the lock
LOCK.unlock();
guard.wait(); // block the scheduler thread
guard.signal(); // let them know we have the lock
}

fin_ch.send(());
Expand Down Expand Up @@ -1503,10 +1502,9 @@ mod test {
child_ch.send(20);
pingpong(&parent_po, &child_ch);
unsafe {
LOCK.lock();
LOCK.signal(); // wakeup waiting scheduler
LOCK.wait(); // wait for them to grab the lock
LOCK.unlock();
let mut guard = LOCK.lock();
guard.signal(); // wakeup waiting scheduler
guard.wait(); // wait for them to grab the lock
}
})));
drop(handle);
Expand Down
10 changes: 5 additions & 5 deletions src/libgreen/simple.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@ use std::rt::local::Local;
use std::rt::rtio;
use std::rt::task::{Task, BlockedTask};
use std::task::TaskOpts;
use std::unstable::sync::LittleLock;
use std::unstable::mutex::NativeMutex;

struct SimpleTask {
lock: LittleLock,
lock: NativeMutex,
awoken: bool,
}

Expand Down Expand Up @@ -59,9 +59,9 @@ impl Runtime for SimpleTask {
to_wake.put_runtime(self as ~Runtime);
unsafe {
cast::forget(to_wake);
let _l = (*me).lock.lock();
let mut guard = (*me).lock.lock();
(*me).awoken = true;
(*me).lock.signal();
guard.signal();
}
}

Expand All @@ -83,7 +83,7 @@ impl Runtime for SimpleTask {
pub fn task() -> ~Task {
let mut task = ~Task::new();
task.put_runtime(~SimpleTask {
lock: LittleLock::new(),
lock: unsafe {NativeMutex::new()},
awoken: false,
} as ~Runtime);
return task;
Expand Down
17 changes: 5 additions & 12 deletions src/libgreen/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use std::rt::local::Local;
use std::rt::rtio;
use std::rt::task::{Task, BlockedTask, SendMessage};
use std::task::TaskOpts;
use std::unstable::mutex::Mutex;
use std::unstable::mutex::NativeMutex;
use std::unstable::raw;

use context::Context;
Expand Down Expand Up @@ -65,7 +65,7 @@ pub struct GreenTask {
pool_id: uint,

// See the comments in the scheduler about why this is necessary
nasty_deschedule_lock: Mutex,
nasty_deschedule_lock: NativeMutex,
}

pub enum TaskType {
Expand Down Expand Up @@ -163,7 +163,7 @@ impl GreenTask {
task_type: task_type,
sched: None,
handle: None,
nasty_deschedule_lock: unsafe { Mutex::new() },
nasty_deschedule_lock: unsafe { NativeMutex::new() },
task: Some(~Task::new()),
}
}
Expand Down Expand Up @@ -322,11 +322,10 @@ impl GreenTask {
// uncontended except for when the task is rescheduled).
fn reawaken_remotely(mut ~self) {
unsafe {
let mtx = &mut self.nasty_deschedule_lock as *mut Mutex;
let mtx = &mut self.nasty_deschedule_lock as *mut NativeMutex;
let handle = self.handle.get_mut_ref() as *mut SchedHandle;
(*mtx).lock();
let _guard = (*mtx).lock();
(*handle).send(RunOnce(self));
(*mtx).unlock();
}
}
}
Expand Down Expand Up @@ -479,12 +478,6 @@ impl Runtime for GreenTask {
fn wrap(~self) -> ~Any { self as ~Any }
}

impl Drop for GreenTask {
fn drop(&mut self) {
unsafe { self.nasty_deschedule_lock.destroy(); }
}
}

#[cfg(test)]
mod tests {
use std::rt::Runtime;
Expand Down
18 changes: 9 additions & 9 deletions src/libnative/bookkeeping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@
//! The green counterpart for this is bookkeeping on sched pools.

use std::sync::atomics;
use std::unstable::mutex::{Mutex, MUTEX_INIT};
use std::unstable::mutex::{StaticNativeMutex, NATIVE_MUTEX_INIT};

static mut TASK_COUNT: atomics::AtomicUint = atomics::INIT_ATOMIC_UINT;
static mut TASK_LOCK: Mutex = MUTEX_INIT;
static mut TASK_LOCK: StaticNativeMutex = NATIVE_MUTEX_INIT;

pub fn increment() {
let _ = unsafe { TASK_COUNT.fetch_add(1, atomics::SeqCst) };
Expand All @@ -29,9 +29,8 @@ pub fn increment() {
pub fn decrement() {
unsafe {
if TASK_COUNT.fetch_sub(1, atomics::SeqCst) == 1 {
TASK_LOCK.lock();
TASK_LOCK.signal();
TASK_LOCK.unlock();
let mut guard = TASK_LOCK.lock();
guard.signal();
}
}
}
Expand All @@ -40,11 +39,12 @@ pub fn decrement() {
/// the entry points of native programs
pub fn wait_for_other_tasks() {
unsafe {
TASK_LOCK.lock();
while TASK_COUNT.load(atomics::SeqCst) > 0 {
TASK_LOCK.wait();
{
let mut guard = TASK_LOCK.lock();
while TASK_COUNT.load(atomics::SeqCst) > 0 {
guard.wait();
}
}
TASK_LOCK.unlock();
TASK_LOCK.destroy();
}
}
7 changes: 3 additions & 4 deletions src/libnative/io/net.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,19 +218,18 @@ pub fn init() {
}

unsafe {
use std::unstable::mutex::{Mutex, MUTEX_INIT};
use std::unstable::mutex::{StaticNativeMutex, NATIVE_MUTEX_INIT};
static mut INITIALIZED: bool = false;
static mut LOCK: Mutex = MUTEX_INIT;
static mut LOCK: StaticNativeMutex = NATIVE_MUTEX_INIT;

LOCK.lock();
let _guard = LOCK.lock();
if !INITIALIZED {
let mut data: WSADATA = mem::init();
let ret = WSAStartup(0x202, // version 2.2
&mut data);
assert_eq!(ret, 0);
INITIALIZED = true;
}
LOCK.unlock();
}
}

Expand Down
7 changes: 3 additions & 4 deletions src/libnative/io/timer_helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

use std::cast;
use std::rt;
use std::unstable::mutex::{Mutex, MUTEX_INIT};
use std::unstable::mutex::{StaticNativeMutex, NATIVE_MUTEX_INIT};

use bookkeeping;
use io::timer::{Req, Shutdown};
Expand All @@ -37,11 +37,11 @@ static mut HELPER_CHAN: *mut Chan<Req> = 0 as *mut Chan<Req>;
static mut HELPER_SIGNAL: imp::signal = 0 as imp::signal;

pub fn boot(helper: fn(imp::signal, Port<Req>)) {
static mut LOCK: Mutex = MUTEX_INIT;
static mut LOCK: StaticNativeMutex = NATIVE_MUTEX_INIT;
static mut INITIALIZED: bool = false;

unsafe {
LOCK.lock();
let mut _guard = LOCK.lock();
if !INITIALIZED {
let (msgp, msgc) = Chan::new();
// promote this to a shared channel
Expand All @@ -58,7 +58,6 @@ pub fn boot(helper: fn(imp::signal, Port<Req>)) {
rt::at_exit(proc() { shutdown() });
INITIALIZED = true;
}
LOCK.unlock();
}
}

Expand Down
27 changes: 9 additions & 18 deletions src/libnative/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use std::rt::task::{Task, BlockedTask, SendMessage};
use std::rt::thread::Thread;
use std::rt;
use std::task::TaskOpts;
use std::unstable::mutex::Mutex;
use std::unstable::mutex::NativeMutex;
use std::unstable::stack;

use io;
Expand All @@ -40,7 +40,7 @@ pub fn new(stack_bounds: (uint, uint)) -> ~Task {

fn ops() -> ~Ops {
~Ops {
lock: unsafe { Mutex::new() },
lock: unsafe { NativeMutex::new() },
awoken: false,
io: io::IoFactory::new(),
// these *should* get overwritten
Expand Down Expand Up @@ -109,7 +109,7 @@ pub fn spawn_opts(opts: TaskOpts, f: proc()) {
// This structure is the glue between channels and the 1:1 scheduling mode. This
// structure is allocated once per task.
struct Ops {
lock: Mutex, // native synchronization
lock: NativeMutex, // native synchronization
awoken: bool, // used to prevent spurious wakeups
io: io::IoFactory, // local I/O factory

Expand Down Expand Up @@ -191,20 +191,19 @@ impl rt::Runtime for Ops {
let task = BlockedTask::block(cur_task);

if times == 1 {
(*me).lock.lock();
let mut guard = (*me).lock.lock();
(*me).awoken = false;
match f(task) {
Ok(()) => {
while !(*me).awoken {
(*me).lock.wait();
guard.wait();
}
}
Err(task) => { cast::forget(task.wake()); }
}
(*me).lock.unlock();
} else {
let mut iter = task.make_selectable(times);
(*me).lock.lock();
let mut guard = (*me).lock.lock();
(*me).awoken = false;
let success = iter.all(|task| {
match f(task) {
Expand All @@ -216,9 +215,8 @@ impl rt::Runtime for Ops {
}
});
while success && !(*me).awoken {
(*me).lock.wait();
guard.wait();
}
(*me).lock.unlock();
}
// re-acquire ownership of the task
cur_task = cast::transmute::<uint, ~Task>(cur_task_dupe);
Expand All @@ -235,10 +233,9 @@ impl rt::Runtime for Ops {
let me = &mut *self as *mut Ops;
to_wake.put_runtime(self as ~rt::Runtime);
cast::forget(to_wake);
(*me).lock.lock();
let mut guard = (*me).lock.lock();
(*me).awoken = true;
(*me).lock.signal();
(*me).lock.unlock();
guard.signal();
}
}

Expand All @@ -254,12 +251,6 @@ impl rt::Runtime for Ops {
}
}

impl Drop for Ops {
fn drop(&mut self) {
unsafe { self.lock.destroy() }
}
}

#[cfg(test)]
mod tests {
use std::rt::Runtime;
Expand Down
6 changes: 3 additions & 3 deletions src/librustuv/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
use std::cast;
use std::libc::{c_void, c_int};
use std::rt::task::BlockedTask;
use std::unstable::sync::LittleLock;
use std::unstable::mutex::NativeMutex;
use std::sync::arc::UnsafeArc;
use mpsc = std::sync::mpsc_queue;

Expand All @@ -39,7 +39,7 @@ enum Message {

struct State {
handle: *uvll::uv_async_t,
lock: LittleLock, // see comments in async_cb for why this is needed
lock: NativeMutex, // see comments in async_cb for why this is needed
queue: mpsc::Queue<Message>,
}

Expand Down Expand Up @@ -112,7 +112,7 @@ impl QueuePool {
let handle = UvHandle::alloc(None::<AsyncWatcher>, uvll::UV_ASYNC);
let state = UnsafeArc::new(State {
handle: handle,
lock: LittleLock::new(),
lock: unsafe {NativeMutex::new()},
queue: mpsc::Queue::new(),
});
let q = ~QueuePool {
Expand Down
Loading

0 comments on commit d98668a

Please sign in to comment.