Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Replace async-std with tokio in PVF subsystem #6419

Merged
merged 23 commits into from
Jan 10, 2023
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
ec204fa
Replace async-std with tokio in PVF subsystem
mrcnski Dec 11, 2022
3abc804
Rework workers to use `select!` instead of a mutex
mrcnski Dec 13, 2022
5fcc67d
Remove unnecessary `fuse`
mrcnski Dec 13, 2022
1574561
Merge branch 'master' into m-cat/replace-async-std-pvf
mrcnski Dec 13, 2022
451fae0
Add explanation for `expect()`
mrcnski Dec 13, 2022
fc4c28b
Update node/core/pvf/src/worker_common.rs
mrcnski Dec 18, 2022
1dde78b
Update node/core/pvf/src/worker_common.rs
mrcnski Dec 18, 2022
da31a48
Address some review comments
mrcnski Dec 18, 2022
35a0c79
Merge remote-tracking branch 'origin/m-cat/replace-async-std-pvf' int…
mrcnski Dec 18, 2022
e1c2cf3
Shutdown tokio runtime
mrcnski Dec 18, 2022
077a123
Run cargo fmt
mrcnski Dec 19, 2022
e0d4b9e
Add a small note about retries
mrcnski Dec 19, 2022
2353747
Merge branch 'master' into m-cat/replace-async-std-pvf
mrcnski Dec 20, 2022
28d4062
Fix up merge
mrcnski Dec 20, 2022
3964aca
Rework `cpu_time_monitor_loop` to return when other thread finishes
mrcnski Dec 20, 2022
7057518
Add error string to PrepareError::IoErr variant
mrcnski Dec 20, 2022
e6ba098
Log when artifacts fail to prepare
mrcnski Dec 20, 2022
e094f80
Fix `cpu_time_monitor_loop`; fix test
mrcnski Dec 20, 2022
c09377a
Fix text
mrcnski Dec 20, 2022
05d1865
Fix a couple of potential minor data races.
mrcnski Dec 22, 2022
b0c2434
Merge branch 'master' into m-cat/replace-async-std-pvf
mrcnski Jan 5, 2023
5cc477c
Merge branch 'master' into m-cat/replace-async-std-pvf
mrcnski Jan 9, 2023
0f4ac06
Update Cargo.lock
mrcnski Jan 9, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 1 addition & 78 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 5 additions & 4 deletions node/core/candidate-validation/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -624,7 +624,8 @@ trait ValidationBackend {
self.validate_candidate(pvf.clone(), timeout, params.encode()).await;

// If we get an AmbiguousWorkerDeath error, retry once after a brief delay, on the
// assumption that the conditions that caused this error may have been transient.
// assumption that the conditions that caused this error may have been transient. Note that
// this error is only a result of execution itself and not of preparation.
if let Err(ValidationError::InvalidCandidate(WasmInvalidCandidate::AmbiguousWorkerDeath)) =
validation_result
{
Expand Down Expand Up @@ -666,12 +667,12 @@ impl ValidationBackend for ValidationHost {

async fn precheck_pvf(&mut self, pvf: Pvf) -> Result<Duration, PrepareError> {
let (tx, rx) = oneshot::channel();
if let Err(_) = self.precheck_pvf(pvf, tx).await {
if let Err(err) = self.precheck_pvf(pvf, tx).await {
// Return an IO error if there was an error communicating with the host.
return Err(PrepareError::IoErr)
return Err(PrepareError::IoErr(err))
}

let precheck_result = rx.await.or(Err(PrepareError::IoErr))?;
let precheck_result = rx.await.map_err(|err| PrepareError::IoErr(err.to_string()))?;

precheck_result
}
Expand Down
2 changes: 1 addition & 1 deletion node/core/candidate-validation/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1053,5 +1053,5 @@ fn precheck_properly_classifies_outcomes() {
inner(Err(PrepareError::Panic("baz".to_owned())), PreCheckOutcome::Invalid);

inner(Err(PrepareError::TimedOut), PreCheckOutcome::Failed);
inner(Err(PrepareError::IoErr), PreCheckOutcome::Failed);
inner(Err(PrepareError::IoErr("fizz".to_owned())), PreCheckOutcome::Failed);
}
3 changes: 1 addition & 2 deletions node/core/pvf/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@ path = "bin/puppet_worker.rs"

[dependencies]
always-assert = "0.1"
async-std = { version = "1.11.0", features = ["attributes"] }
async-process = "1.3.0"
assert_matches = "1.4.0"
cpu-time = "1.0.0"
futures = "0.3.21"
Expand All @@ -21,6 +19,7 @@ gum = { package = "tracing-gum", path = "../../gum" }
pin-project = "1.0.9"
rand = "0.8.5"
tempfile = "3.3.0"
tokio = { version = "1.22.0", features = ["fs", "process"] }
rayon = "1.5.1"

parity-scale-codec = { version = "3.1.5", default-features = false, features = ["derive"] }
Expand Down
19 changes: 8 additions & 11 deletions node/core/pvf/src/artifacts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@

use crate::{error::PrepareError, host::PrepareResultSender};
use always_assert::always;
use async_std::path::{Path, PathBuf};
use polkadot_parachain::primitives::ValidationCodeHash;
use std::{
collections::HashMap,
path::{Path, PathBuf},
time::{Duration, SystemTime},
};

Expand Down Expand Up @@ -136,8 +136,8 @@ impl Artifacts {
pub async fn new(cache_path: &Path) -> Self {
// Make sure that the cache path directory and all its parents are created.
// First delete the entire cache. Nodes are long-running so this should populate shortly.
let _ = async_std::fs::remove_dir_all(cache_path).await;
let _ = async_std::fs::create_dir_all(cache_path).await;
let _ = tokio::fs::remove_dir_all(cache_path).await;
let _ = tokio::fs::create_dir_all(cache_path).await;

Self { artifacts: HashMap::new() }
}
Expand Down Expand Up @@ -214,9 +214,8 @@ impl Artifacts {
#[cfg(test)]
mod tests {
use super::{ArtifactId, Artifacts};
use async_std::path::Path;
use sp_core::H256;
use std::str::FromStr;
use std::{path::Path, str::FromStr};

#[test]
fn from_file_name() {
Expand Down Expand Up @@ -252,11 +251,9 @@ mod tests {
);
}

#[test]
fn artifacts_removes_cache_on_startup() {
let fake_cache_path = async_std::task::block_on(async move {
crate::worker_common::tmpfile("test-cache").await.unwrap()
});
#[tokio::test]
async fn artifacts_removes_cache_on_startup() {
let fake_cache_path = crate::worker_common::tmpfile("test-cache").await.unwrap();
let fake_artifact_path = {
let mut p = fake_cache_path.clone();
p.push("wasmtime_0x1234567890123456789012345678901234567890123456789012345678901234");
Expand All @@ -271,7 +268,7 @@ mod tests {
// this should remove it and re-create.

let p = &fake_cache_path;
async_std::task::block_on(async { Artifacts::new(p).await });
Artifacts::new(p).await;

assert_eq!(std::fs::read_dir(&fake_cache_path).unwrap().count(), 0);

Expand Down
6 changes: 3 additions & 3 deletions node/core/pvf/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ pub enum PrepareError {
TimedOut,
/// An IO error occurred while receiving the result from the worker process. This state is reported by the
/// validation host (not by the worker).
IoErr,
IoErr(String),
/// The temporary file for the artifact could not be created at the given cache path. This state is reported by the
/// validation host (not by the worker).
CreateTmpFileErr(String),
Expand All @@ -54,7 +54,7 @@ impl PrepareError {
use PrepareError::*;
match self {
Prevalidation(_) | Preparation(_) | Panic(_) => true,
TimedOut | IoErr | CreateTmpFileErr(_) | RenameTmpFileErr(_) => false,
TimedOut | IoErr(_) | CreateTmpFileErr(_) | RenameTmpFileErr(_) => false,
}
}
}
Expand All @@ -67,7 +67,7 @@ impl fmt::Display for PrepareError {
Preparation(err) => write!(f, "preparation: {}", err),
Panic(err) => write!(f, "panic: {}", err),
TimedOut => write!(f, "prepare: timeout"),
IoErr => write!(f, "prepare: io error while receiving response"),
IoErr(err) => write!(f, "prepare: io error while receiving response: {}", err),
CreateTmpFileErr(err) => write!(f, "prepare: error creating tmp file: {}", err),
RenameTmpFileErr(err) => write!(f, "prepare: error renaming tmp file: {}", err),
}
Expand Down
3 changes: 1 addition & 2 deletions node/core/pvf/src/execute/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,14 @@ use crate::{
worker_common::{IdleWorker, WorkerHandle},
InvalidCandidate, ValidationError, LOG_TARGET,
};
use async_std::path::PathBuf;
use futures::{
channel::mpsc,
future::BoxFuture,
stream::{FuturesUnordered, StreamExt as _},
Future, FutureExt,
};
use slotmap::HopSlotMap;
use std::{collections::VecDeque, fmt, time::Duration};
use std::{collections::VecDeque, fmt, path::PathBuf, time::Duration};

slotmap::new_key_type! { struct Worker; }

Expand Down
Loading