Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
Replace async-std with tokio in PVF subsystem (#6419)
Browse files Browse the repository at this point in the history
* Replace async-std with tokio in PVF subsystem

* Rework workers to use `select!` instead of a mutex

The improvement in code readability is more important than the thread overhead.

* Remove unnecessary `fuse`

* Add explanation for `expect()`

* Update node/core/pvf/src/worker_common.rs

Co-authored-by: Bastian Köcher <info@kchr.de>

* Update node/core/pvf/src/worker_common.rs

Co-authored-by: Bastian Köcher <info@kchr.de>

* Address some review comments

* Shutdown tokio runtime

* Run cargo fmt

* Add a small note about retries

* Fix up merge

* Rework `cpu_time_monitor_loop` to return when other thread finishes

* Add error string to PrepareError::IoErr variant

* Log when artifacts fail to prepare

* Fix `cpu_time_monitor_loop`; fix test

* Fix text

* Fix a couple of potential minor data races.

First data race was due to logging in the CPU monitor thread even if the
job (other thread) finished. It can technically finish before or after the log.

Maybe best would be to move this log to the `select!`s, where we are guaranteed
to have chosen the timed-out branch, although there would be a bit of
duplication.

Also, it was possible for this thread to complete before we executed
`finished_tx.send` in the other thread, which would trigger an error as the
receiver has already been dropped. And right now, such a spurious error from
`send` would be returned even if the job otherwise succeeded.

* Update Cargo.lock

Co-authored-by: Bastian Köcher <info@kchr.de>
  • Loading branch information
mrcnski and bkchr committed Jan 10, 2023
1 parent 7b3eb68 commit be487ae
Show file tree
Hide file tree
Showing 18 changed files with 302 additions and 476 deletions.
149 changes: 1 addition & 148 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 5 additions & 4 deletions node/core/candidate-validation/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -627,7 +627,8 @@ trait ValidationBackend {
self.validate_candidate(pvf.clone(), timeout, params.encode()).await;

// If we get an AmbiguousWorkerDeath error, retry once after a brief delay, on the
// assumption that the conditions that caused this error may have been transient.
// assumption that the conditions that caused this error may have been transient. Note that
// this error is only a result of execution itself and not of preparation.
if let Err(ValidationError::InvalidCandidate(WasmInvalidCandidate::AmbiguousWorkerDeath)) =
validation_result
{
Expand Down Expand Up @@ -676,12 +677,12 @@ impl ValidationBackend for ValidationHost {

async fn precheck_pvf(&mut self, pvf: Pvf) -> Result<Duration, PrepareError> {
let (tx, rx) = oneshot::channel();
if let Err(_) = self.precheck_pvf(pvf, tx).await {
if let Err(err) = self.precheck_pvf(pvf, tx).await {
// Return an IO error if there was an error communicating with the host.
return Err(PrepareError::IoErr)
return Err(PrepareError::IoErr(err))
}

let precheck_result = rx.await.or(Err(PrepareError::IoErr))?;
let precheck_result = rx.await.map_err(|err| PrepareError::IoErr(err.to_string()))?;

precheck_result
}
Expand Down
2 changes: 1 addition & 1 deletion node/core/candidate-validation/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1053,5 +1053,5 @@ fn precheck_properly_classifies_outcomes() {
inner(Err(PrepareError::Panic("baz".to_owned())), PreCheckOutcome::Invalid);

inner(Err(PrepareError::TimedOut), PreCheckOutcome::Failed);
inner(Err(PrepareError::IoErr), PreCheckOutcome::Failed);
inner(Err(PrepareError::IoErr("fizz".to_owned())), PreCheckOutcome::Failed);
}
3 changes: 1 addition & 2 deletions node/core/pvf/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@ path = "bin/puppet_worker.rs"

[dependencies]
always-assert = "0.1"
async-std = { version = "1.11.0", features = ["attributes"] }
async-process = "1.3.0"
assert_matches = "1.4.0"
cpu-time = "1.0.0"
futures = "0.3.21"
Expand All @@ -21,6 +19,7 @@ gum = { package = "tracing-gum", path = "../../gum" }
pin-project = "1.0.9"
rand = "0.8.5"
tempfile = "3.3.0"
tokio = { version = "1.22.0", features = ["fs", "process"] }
rayon = "1.5.1"

parity-scale-codec = { version = "3.1.5", default-features = false, features = ["derive"] }
Expand Down
19 changes: 8 additions & 11 deletions node/core/pvf/src/artifacts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@

use crate::{error::PrepareError, host::PrepareResultSender};
use always_assert::always;
use async_std::path::{Path, PathBuf};
use polkadot_parachain::primitives::ValidationCodeHash;
use std::{
collections::HashMap,
path::{Path, PathBuf},
time::{Duration, SystemTime},
};

Expand Down Expand Up @@ -136,8 +136,8 @@ impl Artifacts {
pub async fn new(cache_path: &Path) -> Self {
// Make sure that the cache path directory and all its parents are created.
// First delete the entire cache. Nodes are long-running so this should populate shortly.
let _ = async_std::fs::remove_dir_all(cache_path).await;
let _ = async_std::fs::create_dir_all(cache_path).await;
let _ = tokio::fs::remove_dir_all(cache_path).await;
let _ = tokio::fs::create_dir_all(cache_path).await;

Self { artifacts: HashMap::new() }
}
Expand Down Expand Up @@ -214,9 +214,8 @@ impl Artifacts {
#[cfg(test)]
mod tests {
use super::{ArtifactId, Artifacts};
use async_std::path::Path;
use sp_core::H256;
use std::str::FromStr;
use std::{path::Path, str::FromStr};

#[test]
fn from_file_name() {
Expand Down Expand Up @@ -252,11 +251,9 @@ mod tests {
);
}

#[test]
fn artifacts_removes_cache_on_startup() {
let fake_cache_path = async_std::task::block_on(async move {
crate::worker_common::tmpfile("test-cache").await.unwrap()
});
#[tokio::test]
async fn artifacts_removes_cache_on_startup() {
let fake_cache_path = crate::worker_common::tmpfile("test-cache").await.unwrap();
let fake_artifact_path = {
let mut p = fake_cache_path.clone();
p.push("wasmtime_0x1234567890123456789012345678901234567890123456789012345678901234");
Expand All @@ -271,7 +268,7 @@ mod tests {
// this should remove it and re-create.

let p = &fake_cache_path;
async_std::task::block_on(async { Artifacts::new(p).await });
Artifacts::new(p).await;

assert_eq!(std::fs::read_dir(&fake_cache_path).unwrap().count(), 0);

Expand Down
6 changes: 3 additions & 3 deletions node/core/pvf/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ pub enum PrepareError {
TimedOut,
/// An IO error occurred while receiving the result from the worker process. This state is reported by the
/// validation host (not by the worker).
IoErr,
IoErr(String),
/// The temporary file for the artifact could not be created at the given cache path. This state is reported by the
/// validation host (not by the worker).
CreateTmpFileErr(String),
Expand All @@ -54,7 +54,7 @@ impl PrepareError {
use PrepareError::*;
match self {
Prevalidation(_) | Preparation(_) | Panic(_) => true,
TimedOut | IoErr | CreateTmpFileErr(_) | RenameTmpFileErr(_) => false,
TimedOut | IoErr(_) | CreateTmpFileErr(_) | RenameTmpFileErr(_) => false,
}
}
}
Expand All @@ -67,7 +67,7 @@ impl fmt::Display for PrepareError {
Preparation(err) => write!(f, "preparation: {}", err),
Panic(err) => write!(f, "panic: {}", err),
TimedOut => write!(f, "prepare: timeout"),
IoErr => write!(f, "prepare: io error while receiving response"),
IoErr(err) => write!(f, "prepare: io error while receiving response: {}", err),
CreateTmpFileErr(err) => write!(f, "prepare: error creating tmp file: {}", err),
RenameTmpFileErr(err) => write!(f, "prepare: error renaming tmp file: {}", err),
}
Expand Down
3 changes: 1 addition & 2 deletions node/core/pvf/src/execute/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,14 @@ use crate::{
worker_common::{IdleWorker, WorkerHandle},
InvalidCandidate, ValidationError, LOG_TARGET,
};
use async_std::path::PathBuf;
use futures::{
channel::mpsc,
future::BoxFuture,
stream::{FuturesUnordered, StreamExt as _},
Future, FutureExt,
};
use slotmap::HopSlotMap;
use std::{collections::VecDeque, fmt, time::Duration};
use std::{collections::VecDeque, fmt, path::PathBuf, time::Duration};

slotmap::new_key_type! { struct Worker; }

Expand Down
Loading

0 comments on commit be487ae

Please sign in to comment.