Skip to content

Commit

Permalink
Add a GNU make jobserver implementation to Cargo
Browse files Browse the repository at this point in the history
This commit adds a GNU make jobserver implementation to Cargo, both as a client
of existing jobservers and also a creator of new jobservers. The jobserver is
actually just an IPC semaphore which manifests itself as a pipe with N bytes
of tokens on Unix and a literal IPC semaphore on Windows. The rough protocol
is then if you want to run a job you read acquire the semaphore (read a byte on
Unix or wait on the semaphore on Windows) and then you release it when you're
done.

All the hairy details of the jobserver implementation are housed in the
`jobserver` crate on crates.io instead of Cargo. This should hopefully make it
much easier for the compiler to also share a jobserver implementation
eventually.

The main tricky bit here is that on Unix and Windows acquiring a jobserver token
will block the calling thread. We need to either way for a running job to exit
or to acquire a new token when we want to spawn a new job. To handle this the
current implementation spawns a helper thread that does the blocking and sends a
message back to Cargo when it receives a token. It's a little trickier with
shutting down this thread gracefully as well but more details can be found in
the `jobserver` crate.

Unfortunately crates are unlikely to see an immediate benefit of this once
implemented. Most crates are run with a manual `make -jN` and this overrides the
jobserver in the environment, creating a new jobserver in the sub-make. If the
`-jN` argument is removed, however, then `make` will share Cargo's jobserver and
properly limit parallelism.

Closes rust-lang#1744
  • Loading branch information
alexcrichton committed Jun 2, 2017
1 parent 03c0a41 commit cbf25a9
Show file tree
Hide file tree
Showing 14 changed files with 376 additions and 49 deletions.
13 changes: 13 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ fs2 = "0.4"
git2 = "0.6"
git2-curl = "0.7"
glob = "0.2"
jobserver = "0.1.2"
libc = "0.2"
libgit2-sys = "0.6"
log = "0.3"
Expand All @@ -37,8 +38,8 @@ rustc-serialize = "0.3"
semver = { version = "0.7.0", features = ["serde"] }
serde = "1.0"
serde_derive = "1.0"
serde_json = "1.0"
serde_ignored = "0.0.3"
serde_json = "1.0"
shell-escape = "0.1"
tar = { version = "0.4", default-features = false }
tempdir = "0.3"
Expand Down
1 change: 1 addition & 0 deletions src/cargo/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ extern crate flate2;
extern crate fs2;
extern crate git2;
extern crate glob;
extern crate jobserver;
extern crate libc;
extern crate libgit2_sys;
extern crate num_cpus;
Expand Down
1 change: 1 addition & 0 deletions src/cargo/ops/cargo_clean.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ pub fn clean(ws: &Workspace, opts: &CleanOptions) -> CargoResult<()> {
host_triple: host_triple,
requested_target: opts.target.map(|s| s.to_owned()),
release: opts.release,
jobs: 1,
..BuildConfig::default()
},
profiles)?;
Expand Down
5 changes: 5 additions & 0 deletions src/cargo/ops/cargo_compile.rs
Original file line number Diff line number Diff line change
Expand Up @@ -639,6 +639,11 @@ fn scrape_build_config(config: &Config,
jobs: Option<u32>,
target: Option<String>)
-> CargoResult<ops::BuildConfig> {
if jobs.is_some() && config.jobserver_from_env().is_some() {
config.shell().warn("a `-j` argument was passed to Cargo but Cargo is \
also configured with an external jobserver in \
its environment, ignoring the `-j` parameter")?;
}
let cfg_jobs = match config.get_i64("build.jobs")? {
Some(v) => {
if v.val <= 0 {
Expand Down
19 changes: 19 additions & 0 deletions src/cargo/ops/cargo_rustc/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ use std::path::{Path, PathBuf};
use std::str::{self, FromStr};
use std::sync::Arc;

use jobserver::Client;

use core::{Package, PackageId, PackageSet, Resolve, Target, Profile};
use core::{TargetKind, Profiles, Dependency, Workspace};
use core::dependency::Kind as DepKind;
Expand Down Expand Up @@ -43,6 +45,7 @@ pub struct Context<'a, 'cfg: 'a> {
pub build_scripts: HashMap<Unit<'a>, Arc<BuildScripts>>,
pub links: Links<'a>,
pub used_in_plugin: HashSet<Unit<'a>>,
pub jobserver: Client,

host: Layout,
target: Option<Layout>,
Expand Down Expand Up @@ -94,6 +97,21 @@ impl<'a, 'cfg> Context<'a, 'cfg> {
config.rustc()?.verbose_version.contains("-dev");
let incremental_enabled = incremental_enabled && is_nightly;

// Load up the jobserver that we'll use to manage our parallelism. This
// is the same as the GNU make implementation of a jobserver, and
// intentionally so! It's hoped that we can interact with GNU make and
// all share the same jobserver.
//
// Note that if we don't have a jobserver in our environment then we
// create our own, and we create it with `n-1` tokens because one token
// is ourself, a running process.
let jobserver = match config.jobserver_from_env() {
Some(c) => c.clone(),
None => Client::new(build_config.jobs as usize - 1).chain_err(|| {
"failed to create jobserver"
})?,
};

Ok(Context {
ws: ws,
host: host_layout,
Expand All @@ -114,6 +132,7 @@ impl<'a, 'cfg> Context<'a, 'cfg> {
links: Links::new(),
used_in_plugin: HashSet::new(),
incremental_enabled: incremental_enabled,
jobserver: jobserver,
})
}

Expand Down
3 changes: 2 additions & 1 deletion src/cargo/ops/cargo_rustc/custom_build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,8 @@ fn build_work<'a, 'cfg>(cx: &mut Context<'a, 'cfg>, unit: &Unit<'a>)
.env("PROFILE", if cx.build_config.release { "release" } else { "debug" })
.env("HOST", cx.host_triple())
.env("RUSTC", &cx.config.rustc()?.path)
.env("RUSTDOC", &*cx.config.rustdoc()?);
.env("RUSTDOC", &*cx.config.rustdoc()?)
.inherit_jobserver(&cx.jobserver);

if let Some(links) = unit.pkg.manifest().links() {
cmd.env("CARGO_MANIFEST_LINKS", links);
Expand Down
132 changes: 92 additions & 40 deletions src/cargo/ops/cargo_rustc/job_queue.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
use std::collections::HashSet;
use std::collections::hash_map::HashMap;
use std::fmt;
use std::io::Write;
use std::io::{self, Write};
use std::mem;
use std::sync::mpsc::{channel, Sender, Receiver};

use crossbeam::{self, Scope};
use jobserver::{Acquired, HelperThread};
use term::color::YELLOW;

use core::{PackageId, Target, Profile};
use util::{Config, DependencyQueue, Fresh, Dirty, Freshness};
use util::{CargoResult, ProcessBuilder, profile, internal};
use util::{CargoResult, ProcessBuilder, profile, internal, CargoResultExt};
use {handle_error};

use super::{Context, Kind, Unit};
Expand All @@ -21,10 +23,9 @@ use super::job::Job;
/// actual compilation step of each package. Packages enqueue units of work and
/// then later on the entire graph is processed and compiled.
pub struct JobQueue<'a> {
jobs: usize,
queue: DependencyQueue<Key<'a>, Vec<(Job, Freshness)>>,
tx: Sender<(Key<'a>, Message)>,
rx: Receiver<(Key<'a>, Message)>,
tx: Sender<Message<'a>>,
rx: Receiver<Message<'a>>,
active: usize,
pending: HashMap<Key<'a>, PendingBuild>,
compiled: HashSet<&'a PackageId>,
Expand All @@ -51,36 +52,35 @@ struct Key<'a> {
}

pub struct JobState<'a> {
tx: Sender<(Key<'a>, Message)>,
key: Key<'a>,
tx: Sender<Message<'a>>,
}

enum Message {
enum Message<'a> {
Run(String),
Stdout(String),
Stderr(String),
Finish(CargoResult<()>),
Token(io::Result<Acquired>),
Finish(Key<'a>, CargoResult<()>),
}

impl<'a> JobState<'a> {
pub fn running(&self, cmd: &ProcessBuilder) {
let _ = self.tx.send((self.key, Message::Run(cmd.to_string())));
let _ = self.tx.send(Message::Run(cmd.to_string()));
}

pub fn stdout(&self, out: &str) {
let _ = self.tx.send((self.key, Message::Stdout(out.to_string())));
let _ = self.tx.send(Message::Stdout(out.to_string()));
}

pub fn stderr(&self, err: &str) {
let _ = self.tx.send((self.key, Message::Stderr(err.to_string())));
let _ = self.tx.send(Message::Stderr(err.to_string()));
}
}

impl<'a> JobQueue<'a> {
pub fn new<'cfg>(cx: &Context<'a, 'cfg>) -> JobQueue<'a> {
let (tx, rx) = channel();
JobQueue {
jobs: cx.jobs() as usize,
queue: DependencyQueue::new(),
tx: tx,
rx: rx,
Expand Down Expand Up @@ -113,56 +113,100 @@ impl<'a> JobQueue<'a> {
pub fn execute(&mut self, cx: &mut Context) -> CargoResult<()> {
let _p = profile::start("executing the job graph");

// We need to give a handle to the send half of our message queue to the
// jobserver helper thrad. Unfortunately though we need the handle to be
// `'static` as that's typically what's required when spawning a
// thread!
//
// To work around this we transmute the `Sender` to a static lifetime.
// we're only sending "longer living" messages and we should also
// destroy all references to the channel before this function exits as
// the destructor for the `helper` object will ensure the associated
// thread i sno longer running.
//
// As a result, this `transmute` to a longer lifetime should be safe in
// practice.
let tx = self.tx.clone();
let tx = unsafe {
mem::transmute::<Sender<Message<'a>>, Sender<Message<'static>>>(tx)
};
let helper = cx.jobserver.clone().into_helper_thread(move |token| {
drop(tx.send(Message::Token(token)));
}).chain_err(|| {
"failed to create helper thread for jobserver management"
})?;

crossbeam::scope(|scope| {
self.drain_the_queue(cx, scope)
self.drain_the_queue(cx, scope, &helper)
})
}

fn drain_the_queue(&mut self, cx: &mut Context, scope: &Scope<'a>)
fn drain_the_queue(&mut self,
cx: &mut Context,
scope: &Scope<'a>,
jobserver_helper: &HelperThread)
-> CargoResult<()> {
use std::time::Instant;

let mut tokens = Vec::new();
let mut queue = Vec::new();
trace!("queue: {:#?}", self.queue);

// Iteratively execute the entire dependency graph. Each turn of the
// loop starts out by scheduling as much work as possible (up to the
// maximum number of parallel jobs). A local queue is maintained
// separately from the main dependency queue as one dequeue may actually
// dequeue quite a bit of work (e.g. 10 binaries in one project).
// maximum number of parallel jobs we have tokens for). A local queue
// is maintained separately from the main dependency queue as one
// dequeue may actually dequeue quite a bit of work (e.g. 10 binaries
// in one project).
//
// After a job has finished we update our internal state if it was
// successful and otherwise wait for pending work to finish if it failed
// and then immediately return.
let mut error = None;
let start_time = Instant::now();
loop {
while error.is_none() && self.active < self.jobs {
if !queue.is_empty() {
let (key, job, fresh) = queue.remove(0);
self.run(key, fresh, job, cx.config, scope)?;
} else if let Some((fresh, key, jobs)) = self.queue.dequeue() {
let total_fresh = jobs.iter().fold(fresh, |fresh, &(_, f)| {
f.combine(fresh)
});
self.pending.insert(key, PendingBuild {
amt: jobs.len(),
fresh: total_fresh,
});
queue.extend(jobs.into_iter().map(|(job, f)| {
(key, job, f.combine(fresh))
}));
} else {
break
// Dequeue as much work as we can, learning about everything
// possible that can run. Note that this is also the point where we
// start requesting job tokens. Each job after the first needs to
// request a token.
while let Some((fresh, key, jobs)) = self.queue.dequeue() {
let total_fresh = jobs.iter().fold(fresh, |fresh, &(_, f)| {
f.combine(fresh)
});
self.pending.insert(key, PendingBuild {
amt: jobs.len(),
fresh: total_fresh,
});
for (job, f) in jobs {
queue.push((key, job, f.combine(fresh)));
if self.active + queue.len() > 0 {
jobserver_helper.request_token();
}
}
}

// Now that we've learned of all possible work that we can execute
// try to spawn it so long as we've got a jobserver token which says
// we're able to perform some parallel work.
while error.is_none() && self.active < tokens.len() + 1 && !queue.is_empty() {
let (key, job, fresh) = queue.remove(0);
self.run(key, fresh, job, cx.config, scope)?;
}

// If after all that we're not actually running anything then we're
// done!
if self.active == 0 {
break
}

let (key, msg) = self.rx.recv().unwrap();
// And finally, before we block waiting for the next event, drop any
// excess tokens we may have accidentally acquired. Due to how our
// jobserver interface is architected we may acquire a token that we
// don't actually use, and if this happens just relinquish it back
// to the jobserver itself.
tokens.truncate(self.active - 1);

match msg {
match self.rx.recv().unwrap() {
Message::Run(cmd) => {
cx.config.shell().verbose(|c| c.status("Running", &cmd))?;
}
Expand All @@ -176,9 +220,13 @@ impl<'a> JobQueue<'a> {
writeln!(cx.config.shell().err(), "{}", err)?;
}
}
Message::Finish(result) => {
Message::Finish(key, result) => {
info!("end: {:?}", key);
self.active -= 1;
if self.active > 0 {
assert!(tokens.len() > 0);
drop(tokens.pop());
}
match result {
Ok(()) => self.finish(key, cx)?,
Err(e) => {
Expand All @@ -198,6 +246,11 @@ impl<'a> JobQueue<'a> {
}
}
}
Message::Token(acquired_token) => {
tokens.push(acquired_token.chain_err(|| {
"failed to acquire jobserver token"
})?);
}
}
}

Expand Down Expand Up @@ -244,9 +297,8 @@ impl<'a> JobQueue<'a> {
scope.spawn(move || {
let res = job.run(fresh, &JobState {
tx: my_tx.clone(),
key: key,
});
my_tx.send((key, Message::Finish(res))).unwrap();
my_tx.send(Message::Finish(key, res)).unwrap();
});

// Print out some nice progress information
Expand Down
Loading

0 comments on commit cbf25a9

Please sign in to comment.