Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a GNU make jobserver implementation to Cargo #4110

Merged
merged 1 commit into from
Jun 2, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ fs2 = "0.4"
git2 = "0.6"
git2-curl = "0.7"
glob = "0.2"
jobserver = "0.1.2"
libc = "0.2"
libgit2-sys = "0.6"
log = "0.3"
Expand All @@ -37,8 +38,8 @@ rustc-serialize = "0.3"
semver = { version = "0.7.0", features = ["serde"] }
serde = "1.0"
serde_derive = "1.0"
serde_json = "1.0"
serde_ignored = "0.0.3"
serde_json = "1.0"
shell-escape = "0.1"
tar = { version = "0.4", default-features = false }
tempdir = "0.3"
Expand Down
1 change: 1 addition & 0 deletions src/cargo/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ extern crate flate2;
extern crate fs2;
extern crate git2;
extern crate glob;
extern crate jobserver;
extern crate libc;
extern crate libgit2_sys;
extern crate num_cpus;
Expand Down
1 change: 1 addition & 0 deletions src/cargo/ops/cargo_clean.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ pub fn clean(ws: &Workspace, opts: &CleanOptions) -> CargoResult<()> {
host_triple: host_triple,
requested_target: opts.target.map(|s| s.to_owned()),
release: opts.release,
jobs: 1,
..BuildConfig::default()
},
profiles)?;
Expand Down
5 changes: 5 additions & 0 deletions src/cargo/ops/cargo_compile.rs
Original file line number Diff line number Diff line change
Expand Up @@ -639,6 +639,11 @@ fn scrape_build_config(config: &Config,
jobs: Option<u32>,
target: Option<String>)
-> CargoResult<ops::BuildConfig> {
if jobs.is_some() && config.jobserver_from_env().is_some() {
config.shell().warn("a `-j` argument was passed to Cargo but Cargo is \
also configured with an external jobserver in \
its environment, ignoring the `-j` parameter")?;
}
let cfg_jobs = match config.get_i64("build.jobs")? {
Some(v) => {
if v.val <= 0 {
Expand Down
19 changes: 19 additions & 0 deletions src/cargo/ops/cargo_rustc/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ use std::path::{Path, PathBuf};
use std::str::{self, FromStr};
use std::sync::Arc;

use jobserver::Client;

use core::{Package, PackageId, PackageSet, Resolve, Target, Profile};
use core::{TargetKind, Profiles, Dependency, Workspace};
use core::dependency::Kind as DepKind;
Expand Down Expand Up @@ -43,6 +45,7 @@ pub struct Context<'a, 'cfg: 'a> {
pub build_scripts: HashMap<Unit<'a>, Arc<BuildScripts>>,
pub links: Links<'a>,
pub used_in_plugin: HashSet<Unit<'a>>,
pub jobserver: Client,

host: Layout,
target: Option<Layout>,
Expand Down Expand Up @@ -94,6 +97,21 @@ impl<'a, 'cfg> Context<'a, 'cfg> {
config.rustc()?.verbose_version.contains("-dev");
let incremental_enabled = incremental_enabled && is_nightly;

// Load up the jobserver that we'll use to manage our parallelism. This
// is the same as the GNU make implementation of a jobserver, and
// intentionally so! It's hoped that we can interact with GNU make and
// all share the same jobserver.
//
// Note that if we don't have a jobserver in our environment then we
// create our own, and we create it with `n-1` tokens because one token
// is ourself, a running process.
let jobserver = match config.jobserver_from_env() {
Some(c) => c.clone(),
None => Client::new(build_config.jobs as usize - 1).chain_err(|| {
"failed to create jobserver"
})?,
};

Ok(Context {
ws: ws,
host: host_layout,
Expand All @@ -114,6 +132,7 @@ impl<'a, 'cfg> Context<'a, 'cfg> {
links: Links::new(),
used_in_plugin: HashSet::new(),
incremental_enabled: incremental_enabled,
jobserver: jobserver,
})
}

Expand Down
3 changes: 2 additions & 1 deletion src/cargo/ops/cargo_rustc/custom_build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,8 @@ fn build_work<'a, 'cfg>(cx: &mut Context<'a, 'cfg>, unit: &Unit<'a>)
.env("PROFILE", if cx.build_config.release { "release" } else { "debug" })
.env("HOST", cx.host_triple())
.env("RUSTC", &cx.config.rustc()?.path)
.env("RUSTDOC", &*cx.config.rustdoc()?);
.env("RUSTDOC", &*cx.config.rustdoc()?)
.inherit_jobserver(&cx.jobserver);

if let Some(links) = unit.pkg.manifest().links() {
cmd.env("CARGO_MANIFEST_LINKS", links);
Expand Down
132 changes: 92 additions & 40 deletions src/cargo/ops/cargo_rustc/job_queue.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
use std::collections::HashSet;
use std::collections::hash_map::HashMap;
use std::fmt;
use std::io::Write;
use std::io::{self, Write};
use std::mem;
use std::sync::mpsc::{channel, Sender, Receiver};

use crossbeam::{self, Scope};
use jobserver::{Acquired, HelperThread};
use term::color::YELLOW;

use core::{PackageId, Target, Profile};
use util::{Config, DependencyQueue, Fresh, Dirty, Freshness};
use util::{CargoResult, ProcessBuilder, profile, internal};
use util::{CargoResult, ProcessBuilder, profile, internal, CargoResultExt};
use {handle_error};

use super::{Context, Kind, Unit};
Expand All @@ -21,10 +23,9 @@ use super::job::Job;
/// actual compilation step of each package. Packages enqueue units of work and
/// then later on the entire graph is processed and compiled.
pub struct JobQueue<'a> {
jobs: usize,
queue: DependencyQueue<Key<'a>, Vec<(Job, Freshness)>>,
tx: Sender<(Key<'a>, Message)>,
rx: Receiver<(Key<'a>, Message)>,
tx: Sender<Message<'a>>,
rx: Receiver<Message<'a>>,
active: usize,
pending: HashMap<Key<'a>, PendingBuild>,
compiled: HashSet<&'a PackageId>,
Expand All @@ -51,36 +52,35 @@ struct Key<'a> {
}

pub struct JobState<'a> {
tx: Sender<(Key<'a>, Message)>,
key: Key<'a>,
tx: Sender<Message<'a>>,
}

enum Message {
enum Message<'a> {
Run(String),
Stdout(String),
Stderr(String),
Finish(CargoResult<()>),
Token(io::Result<Acquired>),
Finish(Key<'a>, CargoResult<()>),
}

impl<'a> JobState<'a> {
pub fn running(&self, cmd: &ProcessBuilder) {
let _ = self.tx.send((self.key, Message::Run(cmd.to_string())));
let _ = self.tx.send(Message::Run(cmd.to_string()));
}

pub fn stdout(&self, out: &str) {
let _ = self.tx.send((self.key, Message::Stdout(out.to_string())));
let _ = self.tx.send(Message::Stdout(out.to_string()));
}

pub fn stderr(&self, err: &str) {
let _ = self.tx.send((self.key, Message::Stderr(err.to_string())));
let _ = self.tx.send(Message::Stderr(err.to_string()));
}
}

impl<'a> JobQueue<'a> {
pub fn new<'cfg>(cx: &Context<'a, 'cfg>) -> JobQueue<'a> {
let (tx, rx) = channel();
JobQueue {
jobs: cx.jobs() as usize,
queue: DependencyQueue::new(),
tx: tx,
rx: rx,
Expand Down Expand Up @@ -113,56 +113,100 @@ impl<'a> JobQueue<'a> {
pub fn execute(&mut self, cx: &mut Context) -> CargoResult<()> {
let _p = profile::start("executing the job graph");

// We need to give a handle to the send half of our message queue to the
// jobserver helper thrad. Unfortunately though we need the handle to be
// `'static` as that's typically what's required when spawning a
// thread!
//
// To work around this we transmute the `Sender` to a static lifetime.
// we're only sending "longer living" messages and we should also
// destroy all references to the channel before this function exits as
// the destructor for the `helper` object will ensure the associated
// thread i sno longer running.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/i sno/is no

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting, looks like this transmute is always safe, because Sender is contravariant.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's what I originally thought yeah but actually I don't think so (unfortunately). I believe Drop for Sender may run destructors for items in the internal queue (not received yet), which means that if you persist a Sender beyond the lifetime of the item I think it'll access data outside of its lifetime.

(not in this case though, the Sender should always go away with the stack frame.

//
// As a result, this `transmute` to a longer lifetime should be safe in
// practice.
let tx = self.tx.clone();
let tx = unsafe {
mem::transmute::<Sender<Message<'a>>, Sender<Message<'static>>>(tx)
};
let helper = cx.jobserver.clone().into_helper_thread(move |token| {
drop(tx.send(Message::Token(token)));
}).chain_err(|| {
"failed to create helper thread for jobserver management"
})?;

crossbeam::scope(|scope| {
self.drain_the_queue(cx, scope)
self.drain_the_queue(cx, scope, &helper)
})
}

fn drain_the_queue(&mut self, cx: &mut Context, scope: &Scope<'a>)
fn drain_the_queue(&mut self,
cx: &mut Context,
scope: &Scope<'a>,
jobserver_helper: &HelperThread)
-> CargoResult<()> {
use std::time::Instant;

let mut tokens = Vec::new();
let mut queue = Vec::new();
trace!("queue: {:#?}", self.queue);

// Iteratively execute the entire dependency graph. Each turn of the
// loop starts out by scheduling as much work as possible (up to the
// maximum number of parallel jobs). A local queue is maintained
// separately from the main dependency queue as one dequeue may actually
// dequeue quite a bit of work (e.g. 10 binaries in one project).
// maximum number of parallel jobs we have tokens for). A local queue
// is maintained separately from the main dependency queue as one
// dequeue may actually dequeue quite a bit of work (e.g. 10 binaries
// in one project).
//
// After a job has finished we update our internal state if it was
// successful and otherwise wait for pending work to finish if it failed
// and then immediately return.
let mut error = None;
let start_time = Instant::now();
loop {
while error.is_none() && self.active < self.jobs {
if !queue.is_empty() {
let (key, job, fresh) = queue.remove(0);
self.run(key, fresh, job, cx.config, scope)?;
} else if let Some((fresh, key, jobs)) = self.queue.dequeue() {
let total_fresh = jobs.iter().fold(fresh, |fresh, &(_, f)| {
f.combine(fresh)
});
self.pending.insert(key, PendingBuild {
amt: jobs.len(),
fresh: total_fresh,
});
queue.extend(jobs.into_iter().map(|(job, f)| {
(key, job, f.combine(fresh))
}));
} else {
break
// Dequeue as much work as we can, learning about everything
// possible that can run. Note that this is also the point where we
// start requesting job tokens. Each job after the first needs to
// request a token.
while let Some((fresh, key, jobs)) = self.queue.dequeue() {
let total_fresh = jobs.iter().fold(fresh, |fresh, &(_, f)| {
f.combine(fresh)
});
self.pending.insert(key, PendingBuild {
amt: jobs.len(),
fresh: total_fresh,
});
for (job, f) in jobs {
queue.push((key, job, f.combine(fresh)));
if self.active + queue.len() > 0 {
jobserver_helper.request_token();
}
}
}

// Now that we've learned of all possible work that we can execute
// try to spawn it so long as we've got a jobserver token which says
// we're able to perform some parallel work.
while error.is_none() && self.active < tokens.len() + 1 && !queue.is_empty() {
let (key, job, fresh) = queue.remove(0);
self.run(key, fresh, job, cx.config, scope)?;
}

// If after all that we're not actually running anything then we're
// done!
if self.active == 0 {
break
}

let (key, msg) = self.rx.recv().unwrap();
// And finally, before we block waiting for the next event, drop any
// excess tokens we may have accidentally acquired. Due to how our
// jobserver interface is architected we may acquire a token that we
// don't actually use, and if this happens just relinquish it back
// to the jobserver itself.
tokens.truncate(self.active - 1);

match msg {
match self.rx.recv().unwrap() {
Message::Run(cmd) => {
cx.config.shell().verbose(|c| c.status("Running", &cmd))?;
}
Expand All @@ -176,9 +220,13 @@ impl<'a> JobQueue<'a> {
writeln!(cx.config.shell().err(), "{}", err)?;
}
}
Message::Finish(result) => {
Message::Finish(key, result) => {
info!("end: {:?}", key);
self.active -= 1;
if self.active > 0 {
assert!(tokens.len() > 0);
drop(tokens.pop());
}
match result {
Ok(()) => self.finish(key, cx)?,
Err(e) => {
Expand All @@ -198,6 +246,11 @@ impl<'a> JobQueue<'a> {
}
}
}
Message::Token(acquired_token) => {
tokens.push(acquired_token.chain_err(|| {
"failed to acquire jobserver token"
})?);
}
}
}

Expand Down Expand Up @@ -244,9 +297,8 @@ impl<'a> JobQueue<'a> {
scope.spawn(move || {
let res = job.run(fresh, &JobState {
tx: my_tx.clone(),
key: key,
});
my_tx.send((key, Message::Finish(res))).unwrap();
my_tx.send(Message::Finish(key, res)).unwrap();
});

// Print out some nice progress information
Expand Down
Loading