Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf(engine): migrate to AsyncStateRoot #10927

Open
wants to merge 14 commits into
base: main
Choose a base branch
from
Open
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions crates/blockchain-tree-api/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,9 @@ pub enum InsertBlockErrorKindTwo {
/// Provider error.
#[error(transparent)]
Provider(#[from] ProviderError),
/// Other errors.
#[error(transparent)]
Other(#[from] Box<dyn std::error::Error + Send + Sync + 'static>),
}

impl InsertBlockErrorKindTwo {
Expand Down Expand Up @@ -365,6 +368,7 @@ impl InsertBlockErrorKindTwo {
}
}
Self::Provider(err) => Err(InsertBlockFatalError::Provider(err)),
Self::Other(err) => Err(InternalBlockExecutionError::Other(err).into()),
}
}
}
Expand Down
1 change: 1 addition & 0 deletions crates/engine/tree/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ metrics.workspace = true
reth-metrics = { workspace = true, features = ["common"] }

# misc
rayon.workspace = true
tracing.workspace = true

# optional deps for test-utils
Expand Down
40 changes: 31 additions & 9 deletions crates/engine/tree/src/tree/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use crate::{
};
use alloy_eips::BlockNumHash;
use alloy_primitives::{BlockNumber, B256, U256};
use rayon::ThreadPoolBuilder;
use reth_beacon_consensus::{
BeaconConsensusEngineEvent, BeaconEngineMessage, ForkchoiceStateTracker, InvalidHeaderCache,
OnForkChoiceUpdated, MIN_BLOCKS_FOR_PIPELINE_RUN,
Expand Down Expand Up @@ -42,8 +43,9 @@ use reth_rpc_types::{
ExecutionPayload,
};
use reth_stages_api::ControlFlow;
use reth_tasks::pool::BlockingTaskPool;
use reth_trie::{updates::TrieUpdates, HashedPostState, TrieInput};
use reth_trie_parallel::parallel_root::ParallelStateRoot;
use reth_trie_parallel::async_root::{AsyncStateRoot, AsyncStateRootError};
use std::{
cmp::Ordering,
collections::{btree_map, hash_map, BTreeMap, HashMap, HashSet, VecDeque},
Expand Down Expand Up @@ -497,6 +499,8 @@ pub struct EngineApiTreeHandler<P, E, T: EngineTypes, Spec> {
metrics: EngineApiMetrics,
/// An invalid block hook.
invalid_block_hook: Box<dyn InvalidBlockHook>,
/// Blocking task pool.
blocking_task_pool: BlockingTaskPool,
}

impl<P: Debug, E: Debug, T: EngineTypes + Debug, Spec: Debug> std::fmt::Debug
Expand Down Expand Up @@ -546,6 +550,9 @@ where
config: TreeConfig,
) -> Self {
let (incoming_tx, incoming) = std::sync::mpsc::channel();
let blocking_task_pool = BlockingTaskPool::new(
ThreadPoolBuilder::default().build().expect("failed to build blocking task pool"),
);
Self {
provider,
executor_provider,
Expand All @@ -563,6 +570,7 @@ where
metrics: Default::default(),
incoming_tx,
invalid_block_hook: Box::new(NoopInvalidBlockHook),
blocking_task_pool,
}
}

Expand Down Expand Up @@ -2190,14 +2198,14 @@ where
let persistence_in_progress = self.persistence_state.in_progress();
if !persistence_in_progress {
state_root_result = match self
.compute_state_root_in_parallel(block.parent_hash, &hashed_state)
.compute_state_root_async(block.parent_hash, &hashed_state)
{
Ok((state_root, trie_output)) => Some((state_root, trie_output)),
Err(ProviderError::ConsistentView(error)) => {
debug!(target: "engine::tree", %error, "Parallel state root computation failed consistency check, falling back");
Err(AsyncStateRootError::Provider(ProviderError::ConsistentView(error))) => {
debug!(target: "engine", %error, "Async state root computation failed consistency check, falling back");
None
}
Err(error) => return Err(error.into()),
Err(error) => return Err(InsertBlockErrorKindTwo::Other(Box::new(error))),
};
}

Expand Down Expand Up @@ -2260,19 +2268,20 @@ where
Ok(InsertPayloadOk2::Inserted(BlockStatus2::Valid))
}

/// Compute state root for the given hashed post state in parallel.
/// Compute state root for the given hashed post state asynchronously.
///
/// # Returns
///
/// Returns `Ok(_)` if computed successfully.
/// Returns `Err(_)` if error was encountered during computation.
/// `Err(ProviderError::ConsistentView(_))` can be safely ignored and fallback computation

/// should be used instead.
fn compute_state_root_in_parallel(
fn compute_state_root_async(
&self,
parent_hash: B256,
hashed_state: &HashedPostState,
) -> ProviderResult<(B256, TrieUpdates)> {
) -> Result<(B256, TrieUpdates), AsyncStateRootError> {
let consistent_view = ConsistentDbView::new_with_latest_tip(self.provider.clone())?;
let mut input = TrieInput::default();

Expand All @@ -2294,7 +2303,20 @@ where
// Extend with block we are validating root for.
input.append_ref(hashed_state);

Ok(ParallelStateRoot::new(consistent_view, input).incremental_root_with_updates()?)
let (tx, rx) = std::sync::mpsc::channel();
let blocking_task_pool = self.blocking_task_pool.clone();

rayon::spawn(move || {
let result: Result<(B256, TrieUpdates), AsyncStateRootError> =
futures::executor::block_on(async move {
AsyncStateRoot::new(consistent_view, blocking_task_pool, input)
.incremental_root_with_updates()
.await
});
let _ = tx.send(result);
});

rx.recv()?
}

/// Handles an error that occurred while inserting a block.
Expand Down
3 changes: 3 additions & 0 deletions crates/trie/parallel/src/async_root.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,9 @@ pub enum AsyncStateRootError {
/// The hashed address for which channel was closed.
hashed_address: B256,
},
/// Receive error
#[error(transparent)]
Receive(#[from] std::sync::mpsc::RecvError),
/// Error while calculating storage root.
#[error(transparent)]
StorageRoot(#[from] StorageRootError),
Expand Down
Loading