Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Reduce provisioner work #12718

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

27 changes: 21 additions & 6 deletions client/consensus/aura/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,12 +216,12 @@ where
PF::Proposer: Proposer<B, Error = Error, Transaction = sp_api::TransactionFor<C, B>>,
SO: SyncOracle + Send + Sync + Clone,
L: sc_consensus::JustificationSyncLink<B>,
CIDP: CreateInherentDataProviders<B, ()> + Send,
CIDP: CreateInherentDataProviders<B, ()> + Send + 'static,
CIDP::InherentDataProviders: InherentDataProviderExt + Send,
BS: BackoffAuthoringBlocksStrategy<NumberFor<B>> + Send + Sync + 'static,
Error: std::error::Error + Send + From<sp_consensus::Error> + 'static,
{
let worker = build_aura_worker::<P, _, _, _, _, _, _, _, _>(BuildAuraWorkerParams {
let worker = build_aura_worker::<P, _, _, _, _, _, _, _, _, _>(BuildAuraWorkerParams {
client,
block_import,
proposer_factory,
Expand Down Expand Up @@ -283,7 +283,7 @@ pub struct BuildAuraWorkerParams<C, I, PF, SO, L, BS, N> {
/// Build the aura worker.
///
/// The caller is responsible for running this worker, otherwise it will do nothing.
pub fn build_aura_worker<P, B, C, PF, I, SO, L, BS, Error>(
pub fn build_aura_worker<P, B, C, PF, I, SO, L, BS, Error, CIDP>(
BuildAuraWorkerParams {
client,
block_import,
Expand All @@ -300,6 +300,7 @@ pub fn build_aura_worker<P, B, C, PF, I, SO, L, BS, Error>(
}: BuildAuraWorkerParams<C, I, PF, SO, L, BS, NumberFor<B>>,
) -> impl sc_consensus_slots::SimpleSlotWorker<
B,
CIDP,
Proposer = PF::Proposer,
BlockImport = I,
SyncOracle = SO,
Expand All @@ -321,6 +322,8 @@ where
SO: SyncOracle + Send + Sync + Clone,
L: sc_consensus::JustificationSyncLink<B>,
BS: BackoffAuthoringBlocksStrategy<NumberFor<B>> + Send + Sync + 'static,
for<'async_trait> CIDP: CreateInherentDataProviders<B, ()> + Send + 'async_trait,
CIDP::InherentDataProviders: InherentDataProviderExt + Send,
{
AuraWorker {
client,
Expand Down Expand Up @@ -356,7 +359,7 @@ struct AuraWorker<C, E, I, P, SO, L, BS, N> {
}

#[async_trait::async_trait]
impl<B, C, E, I, P, Error, SO, L, BS> sc_consensus_slots::SimpleSlotWorker<B>
impl<B, C, E, I, P, Error, SO, L, BS, CIDP> sc_consensus_slots::SimpleSlotWorker<B, CIDP>
for AuraWorker<C, E, I, P, SO, L, BS, NumberFor<B>>
where
B: BlockT,
Expand All @@ -372,6 +375,8 @@ where
L: sc_consensus::JustificationSyncLink<B>,
BS: BackoffAuthoringBlocksStrategy<NumberFor<B>> + Send + Sync + 'static,
Error: std::error::Error + Send + From<sp_consensus::Error> + 'static,
for<'async_trait> CIDP: CreateInherentDataProviders<B, ()> + Send + 'async_trait,
CIDP::InherentDataProviders: InherentDataProviderExt + Send,
{
type BlockImport = I;
type SyncOracle = SO;
Expand Down Expand Up @@ -489,7 +494,16 @@ where
chain_head_slot,
self.client.info().finalized_number,
slot,
self.logging_target(),
<AuraWorker<
C,
E,
I,
P,
SO,
L,
BS,
<<B as BlockT>::Header as sp_api::HeaderT>::Number,
> as sc_consensus_slots::SimpleSlotWorker<B, CIDP>>::logging_target(self),
)
}
}
Expand Down Expand Up @@ -524,7 +538,8 @@ where
&self.block_proposal_slot_portion,
self.max_block_proposal_slot_portion.as_ref(),
sc_consensus_slots::SlotLenienceType::Exponential,
self.logging_target(),
"aura",
//<AuraWorker<C, E, I, P, SO, L, BS, <<B as BlockT>::Header as HeaderT>::Number> as sc_consensus_slots::SimpleSlotWorker<B, CIDP>>::logging_target(self),
)
}
}
Expand Down
11 changes: 8 additions & 3 deletions client/consensus/babe/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -715,7 +715,7 @@ struct BabeSlotWorker<B: BlockT, C, E, I, SO, L, BS> {
}

#[async_trait::async_trait]
impl<B, C, E, I, Error, SO, L, BS> sc_consensus_slots::SimpleSlotWorker<B>
impl<B, C, E, I, Error, SO, L, BS, CIDP> sc_consensus_slots::SimpleSlotWorker<B, CIDP>
for BabeSlotWorker<B, C, E, I, SO, L, BS>
where
B: BlockT,
Expand All @@ -728,6 +728,8 @@ where
L: sc_consensus::JustificationSyncLink<B>,
BS: BackoffAuthoringBlocksStrategy<NumberFor<B>> + Sync,
Error: std::error::Error + Send + From<ConsensusError> + From<I::Error> + 'static,
for<'async_trait> CIDP: CreateInherentDataProviders<B, ()> + Send + 'async_trait,
CIDP::InherentDataProviders: InherentDataProviderExt + Send,
{
type Claim = (PreDigest, AuthorityId);
type SyncOracle = SO;
Expand Down Expand Up @@ -872,7 +874,7 @@ where
chain_head_slot,
self.client.info().finalized_number,
slot,
self.logging_target(),
<BabeSlotWorker<B, C, E, I, SO, L, BS> as sc_consensus_slots::SimpleSlotWorker<B, CIDP>>::logging_target(self),
)
}
}
Expand Down Expand Up @@ -908,7 +910,10 @@ where
&self.block_proposal_slot_portion,
self.max_block_proposal_slot_portion.as_ref(),
sc_consensus_slots::SlotLenienceType::Exponential,
self.logging_target(),
<BabeSlotWorker<B, C, E, I, SO, L, BS> as sc_consensus_slots::SimpleSlotWorker<
B,
CIDP,
>>::logging_target(self),
)
}
}
Expand Down
2 changes: 2 additions & 0 deletions client/consensus/slots/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ log = "0.4.17"
thiserror = "1.0.30"
sc-client-api = { version = "4.0.0-dev", path = "../../api" }
sc-consensus = { version = "0.10.0-dev", path = "../../../client/consensus/common" }
sp-consensus-babe = { version = "0.10.0-dev", path = "../../../primitives/consensus/babe" }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should not depend on babe.

sc-telemetry = { version = "4.0.0-dev", path = "../../telemetry" }
sp-arithmetic = { version = "6.0.0", path = "../../../primitives/arithmetic" }
sp-blockchain = { version = "4.0.0-dev", path = "../../../primitives/blockchain" }
Expand All @@ -30,6 +31,7 @@ sp-consensus-slots = { version = "0.10.0-dev", path = "../../../primitives/conse
sp-core = { version = "7.0.0", path = "../../../primitives/core" }
sp-inherents = { version = "4.0.0-dev", path = "../../../primitives/inherents" }
sp-runtime = { version = "7.0.0", path = "../../../primitives/runtime" }
sp-timestamp = { version = "4.0.0-dev", path = "../../../primitives/timestamp" }
sp-state-machine = { version = "0.13.0", path = "../../../primitives/state-machine" }

[dev-dependencies]
Expand Down
81 changes: 68 additions & 13 deletions client/consensus/slots/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,15 @@ use sc_consensus::{BlockImport, JustificationSyncLink};
use sc_telemetry::{telemetry, TelemetryHandle, CONSENSUS_DEBUG, CONSENSUS_INFO, CONSENSUS_WARN};
use sp_arithmetic::traits::BaseArithmetic;
use sp_consensus::{Proposal, Proposer, SelectChain, SyncOracle};
use sp_consensus_babe::inherents::InherentType;
use sp_consensus_slots::{Slot, SlotDuration};
use sp_inherents::CreateInherentDataProviders;
use sp_inherents::{CreateInherentDataProviders, InherentData, InherentDataProvider};
use sp_runtime::traits::{Block as BlockT, HashFor, Header as HeaderT};
use std::{fmt::Debug, ops::Deref, time::Duration};
use std::{
fmt::Debug,
ops::Deref,
time::{Duration, Instant},
};

/// The changes that need to applied to the storage to create the state for a block.
///
Expand All @@ -64,19 +69,33 @@ pub struct SlotResult<Block: BlockT, Proof> {
/// The implementation should not make any assumptions of the slot being bound to the time or
/// similar. The only valid assumption is that the slot number is always increasing.
#[async_trait::async_trait]
pub trait SlotWorker<B: BlockT, Proof> {
pub trait SlotWorker<B, CIDP, Proof>
where
B: BlockT,
CIDP: CreateInherentDataProviders<B, ()> + Send,
CIDP::InherentDataProviders: InherentDataProviderExt + Send,
{
/// Called when a new slot is triggered.
///
/// Returns a future that resolves to a [`SlotResult`] iff a block was successfully built in
/// the slot. Otherwise `None` is returned.
async fn on_slot(&mut self, slot_info: SlotInfo<B>) -> Option<SlotResult<B, Proof>>;
async fn on_slot(
&mut self,
slot_info: SlotInfo<B>,
create_inherent_data_providers: &CIDP,
) -> Option<SlotResult<B, Proof>>;
}

/// A skeleton implementation for `SlotWorker` which tries to claim a slot at
/// its beginning and tries to produce a block if successfully claimed, timing
/// out if block production takes too long.
#[async_trait::async_trait]
pub trait SimpleSlotWorker<B: BlockT> {
pub trait SimpleSlotWorker<B, CIDP>
where
B: BlockT,
for<'async_trait> CIDP: CreateInherentDataProviders<B, ()> + Send + 'async_trait,
CIDP::InherentDataProviders: InherentDataProviderExt + Send,
{
/// A handle to a `BlockImport`.
type BlockImport: BlockImport<B, Transaction = <Self::Proposer as Proposer<B>>::Transaction>
+ Send
Expand Down Expand Up @@ -184,6 +203,7 @@ pub trait SimpleSlotWorker<B: BlockT> {
proposer: Self::Proposer,
claim: &Self::Claim,
slot_info: SlotInfo<B>,
inherent_data: sp_inherents::InherentData,
proposing_remaining: Delay,
) -> Option<
Proposal<
Expand All @@ -203,7 +223,7 @@ pub trait SimpleSlotWorker<B: BlockT> {
// the result to be returned.
let proposing = proposer
.propose(
slot_info.inherent_data,
inherent_data,
sp_runtime::generic::Digest { logs },
proposing_remaining_duration.mul_f32(0.98),
None,
Expand Down Expand Up @@ -246,6 +266,7 @@ pub trait SimpleSlotWorker<B: BlockT> {
async fn on_slot(
&mut self,
slot_info: SlotInfo<B>,
create_inherent_data_providers: &CIDP,
) -> Option<SlotResult<B, <Self::Proposer as Proposer<B>>::Proof>>
where
Self: Sync,
Expand Down Expand Up @@ -310,6 +331,10 @@ pub trait SimpleSlotWorker<B: BlockT> {

let claim = self.claim_slot(&slot_info.chain_head, slot, &aux_data).await?;

let (inherent_data, slot) =
Self::extract_inherent_data_and_slot(&slot_info, create_inherent_data_providers)
.await?;

if self.should_backoff(slot, &slot_info.chain_head) {
return None
}
Expand All @@ -335,7 +360,9 @@ pub trait SimpleSlotWorker<B: BlockT> {
},
};

let proposal = self.propose(proposer, &claim, slot_info, proposing_remaining).await?;
let proposal = self
.propose(proposer, &claim, slot_info, inherent_data, proposing_remaining)
.await?;

let (block, storage_proof) = (proposal.block, proposal.proof);
let (header, body) = block.deconstruct();
Expand Down Expand Up @@ -406,6 +433,28 @@ pub trait SimpleSlotWorker<B: BlockT> {

Some(SlotResult { block: B::new(header, body), storage_proof })
}

/// Creates inherent data and returns it and its slot.
async fn extract_inherent_data_and_slot(
slot_info: &SlotInfo<B>,
create_inherent_data_providers: &CIDP,
) -> Option<(InherentData, InherentType)> {
let inherent_data_providers = create_inherent_data_providers
.create_inherent_data_providers(slot_info.chain_head.hash(), ())
.await
.ok()?;

if Instant::now() > slot_info.ends_at {
log::warn!(
target: "slots",
"Creating inherent data providers took more time than we had left for the slot.",
);
}

let inherent_data = inherent_data_providers.create_inherent_data().ok()?;

Some((inherent_data, inherent_data_providers.slot()))
}
}

/// A type that implements [`SlotWorker`] for a type that implements [`SimpleSlotWorker`].
Expand All @@ -416,14 +465,20 @@ pub trait SimpleSlotWorker<B: BlockT> {
pub struct SimpleSlotWorkerToSlotWorker<T>(pub T);

#[async_trait::async_trait]
impl<T: SimpleSlotWorker<B> + Send + Sync, B: BlockT>
SlotWorker<B, <T::Proposer as Proposer<B>>::Proof> for SimpleSlotWorkerToSlotWorker<T>
impl<T, B, CIDP> SlotWorker<B, CIDP, <T::Proposer as Proposer<B>>::Proof>
for SimpleSlotWorkerToSlotWorker<T>
where
T: SimpleSlotWorker<B, CIDP> + Send + Sync,
B: BlockT,
for<'async_trait> CIDP: CreateInherentDataProviders<B, ()> + Send + 'async_trait,
CIDP::InherentDataProviders: InherentDataProviderExt + Send,
{
async fn on_slot(
&mut self,
slot_info: SlotInfo<B>,
create_inherent_data_providers: &CIDP,
) -> Option<SlotResult<B, <T::Proposer as Proposer<B>>::Proof>> {
self.0.on_slot(slot_info).await
self.0.on_slot(slot_info, create_inherent_data_providers).await
}
}

Expand Down Expand Up @@ -472,12 +527,12 @@ pub async fn start_slot_worker<B, C, W, SO, CIDP, Proof>(
) where
B: BlockT,
C: SelectChain<B>,
W: SlotWorker<B, Proof>,
W: SlotWorker<B, CIDP, Proof>,
SO: SyncOracle + Send,
CIDP: CreateInherentDataProviders<B, ()> + Send,
CIDP::InherentDataProviders: InherentDataProviderExt + Send,
{
let mut slots = Slots::new(slot_duration.as_duration(), create_inherent_data_providers, client);
let mut slots = Slots::new(slot_duration.as_duration(), client);

loop {
let slot_info = match slots.next_slot().await {
Expand All @@ -493,7 +548,7 @@ pub async fn start_slot_worker<B, C, W, SO, CIDP, Proof>(
continue
}

let _ = worker.on_slot(slot_info).await;
let _ = worker.on_slot(slot_info, &create_inherent_data_providers).await;
}
}

Expand Down
Loading