Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BEEFY: Support compatibility with Warp Sync - Allow Warp Sync for Validators #2689

Merged
merged 12 commits into from
Dec 27, 2023
Merged
19 changes: 2 additions & 17 deletions polkadot/cli/src/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
use crate::cli::{Cli, Subcommand, NODE_VERSION};
use frame_benchmarking_cli::{BenchmarkCmd, ExtrinsicFactory, SUBSTRATE_REFERENCE_HARDWARE};
use futures::future::TryFutureExt;
use log::{info, warn};
use log::info;
use sc_cli::SubstrateCli;
use service::{
self,
Expand Down Expand Up @@ -196,22 +196,7 @@ where
let chain_spec = &runner.config().chain_spec;

// By default, enable BEEFY on all networks, unless explicitly disabled through CLI.
let mut enable_beefy = !cli.run.no_beefy;
// BEEFY doesn't (yet) support warp sync:
// Until we implement https://github.com/paritytech/substrate/issues/14756
// - disallow warp sync for validators,
// - disable BEEFY when warp sync for non-validators.
if enable_beefy && runner.config().network.sync_mode.is_warp() {
if runner.config().role.is_authority() {
return Err(Error::Other(
"Warp sync not supported for validator nodes running BEEFY.".into(),
))
} else {
// disable BEEFY for non-validator nodes that are warp syncing
warn!("🥩 BEEFY not supported when warp syncing. Disabling BEEFY.");
enable_beefy = false;
}
}
let enable_beefy = !cli.run.no_beefy;

set_default_ss58_version(chain_spec);

Expand Down
13 changes: 13 additions & 0 deletions prdoc/pr_2689.prdoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Schema: Parity PR Documentation Schema (prdoc)
# See doc at https://github.com/paritytech/prdoc

title: BEEFY: Support compatibility with Warp Sync - Allow Warp Sync for Validators

doc:
- audience: Node Operator
description: |
BEEFY can now sync itself even when using Warp Sync to sync the node. This removes the limitation of not
being able to run BEEFY when warp syncing. Validators are now again able to warp sync.

crates:
- name: sc-consensus-beefy
3 changes: 2 additions & 1 deletion substrate/client/consensus/beefy/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,12 @@ sp-core = { path = "../../../primitives/core" }
sp-keystore = { path = "../../../primitives/keystore" }
sp-mmr-primitives = { path = "../../../primitives/merkle-mountain-range" }
sp-runtime = { path = "../../../primitives/runtime" }
tokio = "1.22.0"


[dev-dependencies]
serde = "1.0.193"
tempfile = "3.1.0"
tokio = "1.22.0"
sc-block-builder = { path = "../../block-builder" }
sc-network-test = { path = "../../network/test" }
sp-consensus-grandpa = { path = "../../../primitives/consensus/grandpa" }
Expand Down
6 changes: 5 additions & 1 deletion substrate/client/consensus/beefy/src/communication/gossip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,11 @@ where
///
/// Only votes for `set_id` and rounds `start <= round <= end` will be accepted.
pub(crate) fn update_filter(&self, filter: GossipFilterCfg<B>) {
debug!(target: LOG_TARGET, "🥩 New gossip filter {:?}", filter);
debug!(
target: LOG_TARGET,
"🥩 New gossip filter: start {:?}, end {:?}, validator set id {:?}",
filter.start, filter.end, filter.validator_set.id()
);
self.gossip_filter.write().update(filter);
}

Expand Down
10 changes: 10 additions & 0 deletions substrate/client/consensus/beefy/src/import.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,16 @@ where
// Run inner block import.
let inner_import_result = self.inner.import_block(block).await?;

match self.backend.state_at(hash) {
acatangiu marked this conversation as resolved.
Show resolved Hide resolved
Ok(_) => {},
Err(_) => {
// The block is imported as part of some chain sync.
// The voter doesn't need to process it now.
// It will be detected and processed as part of the voter state init.
return Ok(inner_import_result);
},
}

match (beefy_encoded, &inner_import_result) {
(Some(encoded), ImportResult::Imported(_)) => {
match self.decode_and_verify(&encoded, number, hash) {
Expand Down
123 changes: 89 additions & 34 deletions substrate/client/consensus/beefy/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use crate::{
worker::PersistedState,
};
use futures::{stream::Fuse, StreamExt};
use log::{debug, error, info};
use log::{debug, error, info, warn};
use parking_lot::Mutex;
use prometheus::Registry;
use sc_client_api::{Backend, BlockBackend, BlockchainEvents, FinalityNotifications, Finalizer};
Expand All @@ -56,6 +56,7 @@ use std::{
collections::{BTreeMap, VecDeque},
marker::PhantomData,
sync::Arc,
time::Duration,
};

mod aux_schema;
Expand All @@ -78,6 +79,8 @@ mod tests;

const LOG_TARGET: &str = "beefy";

const HEADER_SYNC_DELAY: Duration = Duration::from_secs(60);

/// A convenience BEEFY client trait that defines all the type bounds a BEEFY client
/// has to satisfy. Ideally that should actually be a trait alias. Unfortunately as
/// of today, Rust does not allow a type alias to be used as a trait bound. Tracking
Expand Down Expand Up @@ -292,21 +295,29 @@ pub async fn start_beefy_gadget<B, BE, C, N, P, R, S>(
// select recoverable errors.
loop {
// Wait for BEEFY pallet to be active before starting voter.
let persisted_state = match wait_for_runtime_pallet(
let (beefy_genesis, best_grandpa) = match wait_for_runtime_pallet(
&*runtime,
&mut beefy_comms.gossip_engine,
&mut finality_notifications,
)
.await
.and_then(|(beefy_genesis, best_grandpa)| {
load_or_init_voter_state(
&*backend,
&*runtime,
beefy_genesis,
best_grandpa,
min_block_delta,
)
}) {
{
Ok(res) => res,
Err(e) => {
error!(target: LOG_TARGET, "Error: {:?}. Terminating.", e);
return
},
};

let persisted_state = match load_or_init_voter_state(
&*backend,
&*runtime,
beefy_genesis,
best_grandpa,
min_block_delta,
)
.await
{
Ok(state) => state,
Err(e) => {
error!(target: LOG_TARGET, "Error: {:?}. Terminating.", e);
Expand Down Expand Up @@ -357,7 +368,7 @@ pub async fn start_beefy_gadget<B, BE, C, N, P, R, S>(
}
}

fn load_or_init_voter_state<B, BE, R>(
async fn load_or_init_voter_state<B, BE, R>(
backend: &BE,
runtime: &R,
beefy_genesis: NumberFor<B>,
Expand All @@ -371,28 +382,70 @@ where
R::Api: BeefyApi<B, AuthorityId>,
{
// Initialize voter state from AUX DB if compatible.
crate::aux_schema::load_persistent(backend)?
if let Some(mut state) = crate::aux_schema::load_persistent(backend)?
// Verify state pallet genesis matches runtime.
.filter(|state| state.pallet_genesis() == beefy_genesis)
.and_then(|mut state| {
// Overwrite persisted state with current best GRANDPA block.
state.set_best_grandpa(best_grandpa.clone());
// Overwrite persisted data with newly provided `min_block_delta`.
state.set_min_block_delta(min_block_delta);
info!(target: LOG_TARGET, "🥩 Loading BEEFY voter state from db: {:?}.", state);
Some(Ok(state))
})
// No valid voter-state persisted, re-initialize from pallet genesis.
.unwrap_or_else(|| {
initialize_voter_state(backend, runtime, beefy_genesis, best_grandpa, min_block_delta)
})
{
// Overwrite persisted state with current best GRANDPA block.
state.set_best_grandpa(best_grandpa.clone());
// Overwrite persisted data with newly provided `min_block_delta`.
state.set_min_block_delta(min_block_delta);
info!(target: LOG_TARGET, "🥩 Loading BEEFY voter state from db: {:?}.", state);

// Make sure that all the headers that we need have been synced.
let mut header = best_grandpa.clone();
while *header.number() > state.best_beefy() {
header =
wait_for_parent_header(backend.blockchain(), header, HEADER_SYNC_DELAY).await?;
}
return Ok(state);
}

// No valid voter-state persisted, re-initialize from pallet genesis.
initialize_voter_state(backend, runtime, beefy_genesis, best_grandpa, min_block_delta).await
}

/// Waits until the parent header of `current` is available and returns it.
///
/// When the node uses GRANDPA warp sync it initially downloads only the mandatory GRANDPA headers.
/// The rest of the headers (gap sync) are lazily downloaded later. But the BEEFY voter also needs
/// the headers in range `[beefy_genesis..=best_grandpa]` to be available. This helper method
/// enables us to wait until these headers have been synced.
async fn wait_for_parent_header<B, BC>(
blockchain: &BC,
current: <B as Block>::Header,
delay: Duration,
) -> ClientResult<<B as Block>::Header>
where
B: Block,
BC: BlockchainBackend<B>,
{
if *current.number() == Zero::zero() {
let msg = format!("header {} is Genesis, there is no parent for it", current.hash());
warn!(target: LOG_TARGET, "{}", msg);
return Err(ClientError::UnknownBlock(msg))
}
loop {
match blockchain.header(*current.parent_hash())? {
Some(parent) => return Ok(parent),
None => {
info!(
target: LOG_TARGET,
"🥩 Parent of header number {} not found. \
BEEFY gadget waiting for header sync to finish ...",
current.number()
);
tokio::time::sleep(delay).await;
},
}
}
}

// If no persisted state present, walk back the chain from first GRANDPA notification to either:
// - latest BEEFY finalized block, or if none found on the way,
// - BEEFY pallet genesis;
// Enqueue any BEEFY mandatory blocks (session boundaries) found on the way, for voter to finalize.
fn initialize_voter_state<B, BE, R>(
async fn initialize_voter_state<B, BE, R>(
backend: &BE,
runtime: &R,
beefy_genesis: NumberFor<B>,
Expand All @@ -405,6 +458,8 @@ where
R: ProvideRuntimeApi<B>,
R::Api: BeefyApi<B, AuthorityId>,
{
let blockchain = backend.blockchain();

let beefy_genesis = runtime
.runtime_api()
.beefy_genesis(best_grandpa.hash())
Expand All @@ -414,7 +469,6 @@ where
.ok_or_else(|| ClientError::Backend("BEEFY pallet expected to be active.".into()))?;
// Walk back the imported blocks and initialize voter either, at the last block with
// a BEEFY justification, or at pallet genesis block; voter will resume from there.
let blockchain = backend.blockchain();
let mut sessions = VecDeque::new();
let mut header = best_grandpa.clone();
let state = loop {
Expand All @@ -432,7 +486,7 @@ where
let best_beefy = *header.number();
// If no session boundaries detected so far, just initialize new rounds here.
if sessions.is_empty() {
let active_set = expect_validator_set(runtime, backend, &header)?;
let active_set = expect_validator_set(runtime, backend, &header).await?;
let mut rounds = Rounds::new(best_beefy, active_set);
// Mark the round as already finalized.
rounds.conclude(best_beefy);
Expand All @@ -451,7 +505,7 @@ where

if *header.number() == beefy_genesis {
// We've reached BEEFY genesis, initialize voter here.
let genesis_set = expect_validator_set(runtime, backend, &header)?;
let genesis_set = expect_validator_set(runtime, backend, &header).await?;
info!(
target: LOG_TARGET,
"🥩 Loading BEEFY voter state from genesis on what appears to be first startup. \
Expand Down Expand Up @@ -481,7 +535,7 @@ where
}

// Move up the chain.
header = blockchain.expect_header(*header.parent_hash())?;
header = wait_for_parent_header(blockchain, header, HEADER_SYNC_DELAY).await?;
};

aux_schema::write_current_version(backend)?;
Expand Down Expand Up @@ -532,7 +586,7 @@ where
Err(ClientError::Backend(err_msg))
}

fn expect_validator_set<B, BE, R>(
async fn expect_validator_set<B, BE, R>(
acatangiu marked this conversation as resolved.
Show resolved Hide resolved
runtime: &R,
backend: &BE,
at_header: &B::Header,
Expand All @@ -550,15 +604,16 @@ where
debug!(target: LOG_TARGET, "🥩 Trying to find validator set active at header: {:?}", at_header);
let mut header = at_header.clone();
loop {
debug!(target: LOG_TARGET, "🥩 Looking for auth set change at block number: {:?}", *header.number());
if let Ok(Some(active)) = runtime.runtime_api().validator_set(header.hash()) {
return Ok(active)
} else {
debug!(target: LOG_TARGET, "🥩 Looking for auth set change at block number: {:?}", *header.number());
match worker::find_authorities_change::<B>(&header) {
Some(active) => return Ok(active),
// Move up the chain. Ultimately we'll get it from chain genesis state, or error out
// here.
None => header = blockchain.expect_header(*header.parent_hash())?,
// there.
None =>
header = wait_for_parent_header(blockchain, header, HEADER_SYNC_DELAY).await?,
}
}
}
Expand Down
8 changes: 6 additions & 2 deletions substrate/client/consensus/beefy/src/round.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
use crate::LOG_TARGET;

use codec::{Decode, Encode};
use log::debug;
use log::{debug, info};
use sp_consensus_beefy::{
ecdsa_crypto::{AuthorityId, Signature},
Commitment, EquivocationProof, SignedCommitment, ValidatorSet, ValidatorSetId, VoteMessage,
Expand Down Expand Up @@ -194,7 +194,11 @@ where
self.previous_votes.retain(|&(_, number), _| number > round_num);
self.mandatory_done = self.mandatory_done || round_num == self.session_start;
self.best_done = self.best_done.max(Some(round_num));
debug!(target: LOG_TARGET, "🥩 Concluded round #{}", round_num);
if round_num == self.session_start {
info!(target: LOG_TARGET, "🥩 Concluded mandatory round #{}", round_num);
} else {
debug!(target: LOG_TARGET, "🥩 Concluded optional round #{}", round_num);
}
}
}

Expand Down
Loading
Loading