Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

refactor out validation hosts to pool struct #972

Merged
merged 5 commits into from
Apr 4, 2020
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions network/src/protocol/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,7 @@ fn consensus_instances_cleaned_up() {
signing_context,
AvailabilityStore::new_in_memory(service.clone()),
None,
None,
));

pool.spawner().spawn_local(worker_task).unwrap();
Expand Down Expand Up @@ -329,6 +330,7 @@ fn collation_is_received_with_dropped_router() {
signing_context,
AvailabilityStore::new_in_memory(service.clone()),
None,
None,
));

pool.spawner().spawn_local(worker_task).unwrap();
Expand Down Expand Up @@ -550,6 +552,7 @@ fn fetches_pov_block_from_gossip() {
signing_context,
AvailabilityStore::new_in_memory(service.clone()),
None,
None,
));

let spawner = pool.spawner();
Expand Down
37 changes: 26 additions & 11 deletions parachain/src/wasm_executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use sp_core::traits::CallInWasm;
use sp_wasm_interface::HostFunctions as _;

#[cfg(not(target_os = "unknown"))]
pub use validation_host::{run_worker, EXECUTION_TIMEOUT_SEC};
pub use validation_host::{run_worker, ValidationPool, EXECUTION_TIMEOUT_SEC};

mod validation_host;

Expand All @@ -48,16 +48,31 @@ impl ParachainExt {
}
}

/// A stub validation-pool defined when compiling for WASM.
#[cfg(target_os = "unknown")]
#[derive(Clone)]
pub struct ValidationPool {
_inner: (), // private field means not publicly-instantiable
}

#[cfg(target_os = "unknown")]
impl ValidationPool {
/// Create a new `ValidationPool`.
pub fn new() -> Self {
ValidationPool { _inner: () }
}
}

/// WASM code execution mode.
///
/// > Note: When compiling for WASM, the `Remote` variants are not available.
pub enum ExecutionMode {
pub enum ExecutionMode<'a> {
/// Execute in-process. The execution can not be interrupted or aborted.
Local,
/// Remote execution in a spawned process.
Remote,
Remote(&'a ValidationPool),
/// Remote execution in a spawned test runner.
RemoteTest,
RemoteTest(&'a ValidationPool),
}

/// Error type for the wasm executor
Expand Down Expand Up @@ -115,27 +130,27 @@ pub fn validate_candidate<E: Externalities + 'static>(
validation_code: &[u8],
params: ValidationParams,
ext: E,
options: ExecutionMode,
options: ExecutionMode<'_>,
) -> Result<ValidationResult, Error> {
match options {
ExecutionMode::Local => {
validate_candidate_internal(validation_code, &params.encode(), ext)
},
#[cfg(not(target_os = "unknown"))]
ExecutionMode::Remote => {
validation_host::validate_candidate(validation_code, params, ext, false)
ExecutionMode::Remote(pool) => {
pool.validate_candidate(validation_code, params, ext, false)
},
#[cfg(not(target_os = "unknown"))]
ExecutionMode::RemoteTest => {
validation_host::validate_candidate(validation_code, params, ext, true)
ExecutionMode::RemoteTest(pool) => {
pool.validate_candidate(validation_code, params, ext, true)
},
#[cfg(target_os = "unknown")]
ExecutionMode::Remote =>
ExecutionMode::Remote(pool) =>
Err(Error::System(Box::<dyn std::error::Error + Send + Sync>::from(
"Remote validator not available".to_string()
) as Box<_>)),
#[cfg(target_os = "unknown")]
ExecutionMode::RemoteTest =>
ExecutionMode::RemoteTest(pool) =>
Err(Error::System(Box::<dyn std::error::Error + Send + Sync>::from(
"Remote validator not available".to_string()
) as Box<_>)),
Expand Down
61 changes: 39 additions & 22 deletions parachain/src/wasm_executor/validation_host.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@ const WORKER_ARGS_TEST: &[&'static str] = &["--nocapture", "validation_worker"];
const WORKER_ARG: &'static str = "validation-worker";
const WORKER_ARGS: &[&'static str] = &[WORKER_ARG];

const NUM_HOSTS: usize = 8;

/// Execution timeout in seconds;
#[cfg(debug_assertions)]
pub const EXECUTION_TIMEOUT_SEC: u64 = 30;
Expand Down Expand Up @@ -70,7 +68,45 @@ enum Event {
}

lazy_static::lazy_static! {
static ref HOSTS: [Mutex<ValidationHost>; NUM_HOSTS] = Default::default();
static ref HOSTS: [Mutex<ValidationHost>; DEFAULT_NUM_HOSTS] = Default::default();
rphmeier marked this conversation as resolved.
Show resolved Hide resolved
}

/// A pool of hosts.
#[derive(Clone)]
pub struct ValidationPool {
hosts: Arc<Vec<Mutex<ValidationHost>>>,
}

const DEFAULT_NUM_HOSTS: usize = 8;

impl ValidationPool {
/// Creates a validation pool with the default configuration.
pub fn new() -> ValidationPool {
ValidationPool {
hosts: Arc::new((0..DEFAULT_NUM_HOSTS).map(|_| Default::default()).collect()),
}
}

/// Validate a candidate under the given validation code using the next
/// free validation host.
///
/// This will fail if the validation code is not a proper parachain validation module.
pub fn validate_candidate<E: Externalities>(
&self,
validation_code: &[u8],
params: ValidationParams,
externalities: E,
test_mode: bool,
) -> Result<ValidationResult, Error> {
for host in self.hosts.iter() {
if let Some(mut host) = host.try_lock() {
return host.validate_candidate(validation_code, params, externalities, test_mode);
}
}

// all workers are busy, just wait for the first one
self.hosts[0].lock().validate_candidate(validation_code, params, externalities, test_mode)
}
}

/// Validation worker process entry point. Runs a loop waiting for candidates to validate
Expand Down Expand Up @@ -184,25 +220,6 @@ struct ValidationHost {
id: u32,
}

/// Validate a candidate under the given validation code.
///
/// This will fail if the validation code is not a proper parachain validation module.
pub fn validate_candidate<E: Externalities>(
validation_code: &[u8],
params: ValidationParams,
externalities: E,
test_mode: bool,
) -> Result<ValidationResult, Error> {
for host in HOSTS.iter() {
if let Some(mut host) = host.try_lock() {
return host.validate_candidate(validation_code, params, externalities, test_mode);
}
}

// all workers are busy, just wait for the first one
HOSTS[0].lock().validate_candidate(validation_code, params, externalities, test_mode)
}

impl Drop for ValidationHost {
fn drop(&mut self) {
if let Some(ref mut worker) = &mut self.worker {
Expand Down
11 changes: 8 additions & 3 deletions parachain/tests/adder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,14 +70,16 @@ pub fn execute_good_on_parent() {
add: 512,
};

let pool = parachain::wasm_executor::ValidationPool::new();

let ret = parachain::wasm_executor::validate_candidate(
TEST_CODE,
ValidationParams {
parent_head: parent_head.encode(),
block_data: block_data.encode(),
},
DummyExt,
parachain::wasm_executor::ExecutionMode::RemoteTest,
parachain::wasm_executor::ExecutionMode::RemoteTest(&pool),
).unwrap();

let new_head = HeadData::decode(&mut &ret.head_data[..]).unwrap();
Expand All @@ -92,6 +94,7 @@ fn execute_good_chain_on_parent() {
let mut number = 0;
let mut parent_hash = [0; 32];
let mut last_state = 0;
let pool = parachain::wasm_executor::ValidationPool::new();

for add in 0..10 {
let parent_head = HeadData {
Expand All @@ -112,7 +115,7 @@ fn execute_good_chain_on_parent() {
block_data: block_data.encode(),
},
DummyExt,
parachain::wasm_executor::ExecutionMode::RemoteTest,
parachain::wasm_executor::ExecutionMode::RemoteTest(&pool),
).unwrap();

let new_head = HeadData::decode(&mut &ret.head_data[..]).unwrap();
Expand All @@ -129,6 +132,8 @@ fn execute_good_chain_on_parent() {

#[test]
fn execute_bad_on_parent() {
let pool = parachain::wasm_executor::ValidationPool::new();

let parent_head = HeadData {
number: 0,
parent_hash: [0; 32],
Expand All @@ -147,6 +152,6 @@ fn execute_bad_on_parent() {
block_data: block_data.encode(),
},
DummyExt,
parachain::wasm_executor::ExecutionMode::RemoteTest,
parachain::wasm_executor::ExecutionMode::RemoteTest(&pool),
).unwrap_err();
}
12 changes: 9 additions & 3 deletions parachain/tests/wasm_executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,16 @@ const INFINITE_LOOP_CODE: &[u8] = halt::WASM_BINARY;

#[test]
fn terminates_on_timeout() {
let pool = parachain::wasm_executor::ValidationPool::new();

let result = parachain::wasm_executor::validate_candidate(
INFINITE_LOOP_CODE,
ValidationParams {
parent_head: Default::default(),
block_data: Vec::new(),
},
DummyExt,
parachain::wasm_executor::ExecutionMode::RemoteTest,
parachain::wasm_executor::ExecutionMode::RemoteTest(&pool),
);
match result {
Err(parachain::wasm_executor::Error::Timeout) => {},
Expand All @@ -45,7 +47,11 @@ fn terminates_on_timeout() {

#[test]
fn parallel_execution() {
let pool = parachain::wasm_executor::ValidationPool::new();

let start = std::time::Instant::now();

let pool2 = pool.clone();
let thread = std::thread::spawn(move ||
parachain::wasm_executor::validate_candidate(
INFINITE_LOOP_CODE,
Expand All @@ -54,7 +60,7 @@ fn parallel_execution() {
block_data: Vec::new(),
},
DummyExt,
parachain::wasm_executor::ExecutionMode::RemoteTest,
parachain::wasm_executor::ExecutionMode::RemoteTest(&pool2),
).ok());
let _ = parachain::wasm_executor::validate_candidate(
INFINITE_LOOP_CODE,
Expand All @@ -63,7 +69,7 @@ fn parallel_execution() {
block_data: Vec::new(),
},
DummyExt,
parachain::wasm_executor::ExecutionMode::RemoteTest,
parachain::wasm_executor::ExecutionMode::RemoteTest(&pool),
);
thread.join().unwrap();
// total time should be < 2 x EXECUTION_TIMEOUT_SEC
Expand Down
2 changes: 2 additions & 0 deletions validation/src/collation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ pub trait Collators: Clone {

/// A future which resolves when a collation is available.
pub async fn collation_fetch<C: Collators, P>(
validation_pool: Option<crate::pipeline::ValidationPool>,
parachain: ParaId,
relay_parent: Hash,
collators: C,
Expand All @@ -76,6 +77,7 @@ pub async fn collation_fetch<C: Collators, P>(
let collation = collators.collate(parachain, relay_parent).await?;
let Collation { info, pov } = collation;
let res = crate::pipeline::full_output_validation_with_api(
validation_pool.as_ref(),
&*client,
&info,
&pov,
Expand Down
11 changes: 10 additions & 1 deletion validation/src/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ use sp_api::ProvideRuntimeApi;
use parking_lot::Mutex;
use crate::Error;

pub use parachain::wasm_executor::ValidationPool;

/// Does basic checks of a collation. Provide the encoded PoV-block.
pub fn basic_checks(
collation: &CollationInfo,
Expand Down Expand Up @@ -227,6 +229,7 @@ impl<'a> ValidatedCandidate<'a> {

/// Does full checks of a collation, with provided PoV-block and contextual data.
pub fn validate<'a>(
validation_pool: Option<&'_ ValidationPool>,
collation: &'a CollationInfo,
pov_block: &'a PoVBlock,
local_validation: &'a LocalValidationData,
Expand All @@ -251,12 +254,16 @@ pub fn validate<'a>(
per_byte: 0,
};

let execution_mode = validation_pool
.map(ExecutionMode::Remote)
.unwrap_or(ExecutionMode::Local);

let ext = Externalities::new(local_validation.balance, fee_schedule);
match wasm_executor::validate_candidate(
&validation_code,
params,
ext.clone(),
ExecutionMode::Remote,
execution_mode,
) {
Ok(result) => {
if result.head_data == collation.head_data.0 {
Expand Down Expand Up @@ -306,6 +313,7 @@ where

/// Does full-pipeline validation of a collation with provided contextual parameters.
pub fn full_output_validation_with_api<P>(
validation_pool: Option<&ValidationPool>,
api: &P,
collation: &CollationInfo,
pov_block: &PoVBlock,
Expand All @@ -330,6 +338,7 @@ pub fn full_output_validation_with_api<P>(
&encoded_pov,
)
.and_then(|()| validate(
validation_pool,
&collation,
&pov_block,
&local_validation,
Expand Down
Loading