Skip to content

Commit

Permalink
Don't deadlock when failing to acquire a jobserver token
Browse files Browse the repository at this point in the history
  • Loading branch information
bjorn3 committed Apr 9, 2023
1 parent fd4e1d5 commit 9970b04
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 13 deletions.
67 changes: 55 additions & 12 deletions src/concurrency_limiter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,18 @@ impl ConcurrencyLimiter {
.clone()
.into_helper_thread(move |token| {
let mut state = state_helper.lock().unwrap();
state.add_new_token(token.unwrap());
available_token_condvar_helper.notify_one();
match token {
Ok(token) => {
state.add_new_token(token);
available_token_condvar_helper.notify_one();
}
Err(err) => {
state.poison(format!("failed to acquire jobserver token: {}", err));
// Notify all threads waiting for a token to give them a chance to
// gracefully exit.
available_token_condvar_helper.notify_all();
}
}
})
.unwrap();
ConcurrencyLimiter {
Expand All @@ -37,16 +47,31 @@ impl ConcurrencyLimiter {
}
}

pub(super) fn acquire(&mut self) -> ConcurrencyLimiterToken {
pub(super) fn acquire(&mut self, handler: &rustc_errors::Handler) -> ConcurrencyLimiterToken {
let mut state = self.state.lock().unwrap();
loop {
state.assert_invariants();

if state.try_start_job() {
return ConcurrencyLimiterToken {
state: self.state.clone(),
available_token_condvar: self.available_token_condvar.clone(),
};
match state.try_start_job() {
Ok(true) => {
return ConcurrencyLimiterToken {
state: self.state.clone(),
available_token_condvar: self.available_token_condvar.clone(),
};
}
Ok(false) => {}
Err(err) => {
// An error happened when acquiring the token. Raise it as fatal error.
// Make sure to drop the mutex guard first to prevent poisoning the mutex.
drop(state);
if let Some(err) = err {
handler.fatal(&err).raise();
} else {
// The error was already emitted, but compilation continued. Raise a silent
// fatal error.
rustc_errors::FatalError.raise();
}
}
}

self.helper_thread.as_mut().unwrap().request_token();
Expand Down Expand Up @@ -100,13 +125,22 @@ mod state {
pending_jobs: usize,
active_jobs: usize,

poisoned: bool,
stored_error: Option<String>,

// None is used to represent the implicit token, Some to represent explicit tokens
tokens: Vec<Option<Acquired>>,
}

impl ConcurrencyLimiterState {
pub(super) fn new(pending_jobs: usize) -> Self {
ConcurrencyLimiterState { pending_jobs, active_jobs: 0, tokens: vec![None] }
ConcurrencyLimiterState {
pending_jobs,
active_jobs: 0,
poisoned: false,
stored_error: None,
tokens: vec![None],
}
}

pub(super) fn assert_invariants(&self) {
Expand All @@ -127,14 +161,18 @@ mod state {
self.drop_excess_capacity();
}

pub(super) fn try_start_job(&mut self) -> bool {
pub(super) fn try_start_job(&mut self) -> Result<bool, Option<String>> {
if self.poisoned {
return Err(self.stored_error.take());
}

if self.active_jobs < self.tokens.len() {
// Using existing token
self.job_started();
return true;
return Ok(true);
}

false
Ok(false)
}

pub(super) fn job_started(&mut self) {
Expand All @@ -161,6 +199,11 @@ mod state {
self.assert_invariants();
}

pub(super) fn poison(&mut self, error: String) {
self.poisoned = true;
self.stored_error = Some(error);
}

fn drop_excess_capacity(&mut self) {
self.assert_invariants();

Expand Down
2 changes: 1 addition & 1 deletion src/driver/aot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,7 @@ pub(crate) fn run_aot(
backend_config.clone(),
global_asm_config.clone(),
cgu.name(),
concurrency_limiter.acquire(),
concurrency_limiter.acquire(tcx.sess.diagnostic()),
),
module_codegen,
Some(rustc_middle::dep_graph::hash_result),
Expand Down

0 comments on commit 9970b04

Please sign in to comment.