Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

task/future: support spawning locally #24

Merged
merged 7 commits into from
Jan 2, 2020
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 78 additions & 37 deletions src/task/future.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,16 @@ use std::{fmt, mem};
/// details.
const DEFAULT_REPOLL_LIMIT: usize = 5;

struct TaskExtras {
extras: Extras,
remote: Option<Remote<TaskCell>>,
}

/// A [`Future`] task.
pub struct Task {
status: AtomicU8,
extras: UnsafeCell<TaskExtras>,
future: UnsafeCell<Pin<Box<dyn Future<Output = ()> + Send + 'static>>>,
remote: Remote<TaskCell>,
extras: UnsafeCell<Extras>,
}

/// A [`Future`] task cell.
Expand Down Expand Up @@ -54,23 +58,21 @@ const COMPLETED: u8 = 4;

impl TaskCell {
/// Creates a [`Future`] task cell that is ready to be polled.
pub fn new<F: Future<Output = ()> + Send + 'static>(
future: F,
remote: Remote<TaskCell>,
extras: Extras,
) -> Self {
pub fn new<F: Future<Output = ()> + Send + 'static>(future: F, extras: Extras) -> Self {
TaskCell(Arc::new(Task {
status: AtomicU8::new(NOTIFIED),
future: UnsafeCell::new(Box::pin(future)),
remote,
extras: UnsafeCell::new(extras),
extras: UnsafeCell::new(TaskExtras {
extras,
remote: None,
}),
}))
}
}

impl crate::queue::TaskCell for TaskCell {
fn mut_extras(&mut self) -> &mut Extras {
unsafe { &mut *self.0.extras.get() }
unsafe { &mut (*self.0.extras.get()).extras }
}
}

Expand Down Expand Up @@ -107,7 +109,7 @@ unsafe fn wake_impl(task_cell: &TaskCell) {
.compare_exchange_weak(IDLE, NOTIFIED, SeqCst, SeqCst)
{
Ok(_) => {
task.remote.spawn(clone_task(&**task));
wake_task(task, false);
break;
}
Err(cur) => status = cur,
Expand Down Expand Up @@ -147,10 +149,57 @@ unsafe fn task_cell(task: *const Task) -> TaskCell {
#[inline]
unsafe fn clone_task(task: *const Task) -> TaskCell {
let task_cell = task_cell(task);
let extras = { &mut *task_cell.0.extras.get() };
// `remote` is none only when it has been constructed but never been polled.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On the first poll, remote also seems None.

if extras.remote.is_none() {
// So `clone_task` has to be called from `poll`, `LOCAL` can't be NULL.
LOCAL.with(|l| {
extras.remote = Some((&*l.get()).remote());
})
}
mem::forget(task_cell.0.clone());
task_cell
}

thread_local! {
/// Local queue reference that is set before polling and unset after polled.
static LOCAL: Cell<*mut Local<TaskCell>> = Cell::new(std::ptr::null_mut());
}

unsafe fn wake_task(task: &Arc<Task>, reschedule: bool) {
LOCAL.with(|ptr| {
if ptr.get().is_null() {
// It's out of polling process, has to be spawn to global queue.
(*task.extras.get())
.remote
.as_ref()
.unwrap()
.spawn(clone_task(&**task));
} else if reschedule {
// It's requested explicitly to schedule to global queue.
(*ptr.get()).spawn_remote(clone_task(&**task));
} else {
// Otherwise spawns to local queue for best locality.
(*ptr.get()).spawn(clone_task(&**task));
}
})
}

struct Scope<'a>(&'a mut Local<TaskCell>);

impl<'a> Scope<'a> {
fn new(l: &'a mut Local<TaskCell>) -> Scope<'a> {
LOCAL.with(|c| c.set(l));
Scope(l)
}
}

impl<'a> Drop for Scope<'a> {
fn drop(&mut self) {
LOCAL.with(|c| c.set(std::ptr::null_mut()));
}
}

/// [`Future`] task runner.
#[derive(Clone)]
pub struct Runner {
Expand Down Expand Up @@ -182,7 +231,8 @@ thread_local! {
impl crate::pool::Runner for Runner {
type TaskCell = TaskCell;

fn handle(&mut self, _local: &mut Local<TaskCell>, task_cell: TaskCell) -> bool {
fn handle(&mut self, local: &mut Local<TaskCell>, task_cell: TaskCell) -> bool {
let _scope = Scope::new(local);
let task = task_cell.0;
unsafe {
let waker = ManuallyDrop::new(waker(&*task));
Expand All @@ -197,10 +247,9 @@ impl crate::pool::Runner for Runner {
match task.status.compare_exchange(POLLING, IDLE, SeqCst, SeqCst) {
Ok(_) => return false,
Err(NOTIFIED) => {
if repoll_times >= self.repoll_limit
|| NEED_RESCHEDULE.with(|r| r.replace(false))
{
task.remote.spawn(clone_task(&*task));
let need_reschedule = NEED_RESCHEDULE.with(|r| r.replace(false));
if repoll_times >= self.repoll_limit || need_reschedule {
wake_task(&task, need_reschedule);
return false;
} else {
repoll_times += 1;
Expand All @@ -213,7 +262,7 @@ impl crate::pool::Runner for Runner {
}
}

/// Gives up a timeslice to the task scheduler.
/// Gives up a time slice to the task scheduler.
///
/// It is only guaranteed to work in yatp.
pub async fn reschedule() {
Expand Down Expand Up @@ -321,11 +370,9 @@ mod tests {
WakeLater::new(waker_tx.clone()).await;
res_tx.send(2).unwrap();
};
local.remote.spawn(TaskCell::new(
fut,
local.remote.clone(),
Extras::single_level(),
));
local
.remote
.spawn(TaskCell::new(fut, Extras::single_level()));

local.handle_once();
assert_eq!(res_rx.recv().unwrap(), 1);
Expand Down Expand Up @@ -386,11 +433,9 @@ mod tests {
PendingOnce::new().await;
res_tx.send(2).unwrap();
};
local.remote.spawn(TaskCell::new(
fut,
local.remote.clone(),
Extras::single_level(),
));
local
.remote
.spawn(TaskCell::new(fut, Extras::single_level()));

local.handle_once();
assert_eq!(res_rx.recv().unwrap(), 1);
Expand All @@ -411,11 +456,9 @@ mod tests {
PendingOnce::new().await;
res_tx.send(4).unwrap();
};
local.remote.spawn(TaskCell::new(
fut,
local.remote.clone(),
Extras::single_level(),
));
local
.remote
.spawn(TaskCell::new(fut, Extras::single_level()));

local.handle_once();
assert_eq!(res_rx.recv().unwrap(), 1);
Expand All @@ -439,11 +482,9 @@ mod tests {
PendingOnce::new().await;
res_tx.send(3).unwrap();
};
local.remote.spawn(TaskCell::new(
fut,
local.remote.clone(),
Extras::single_level(),
));
local
.remote
.spawn(TaskCell::new(fut, Extras::single_level()));

local.handle_once();
assert_eq!(res_rx.recv().unwrap(), 1);
Expand Down