Skip to content

Commit

Permalink
Reduce provisioner work (paritytech#12749)
Browse files Browse the repository at this point in the history
* Move create_inherent_data call to use side

* Make provide_inherent_data async

* Fix tests

* Apply suggestions from code review

Co-authored-by: Bastian Köcher <git@kchr.de>

* Log errors

* Fix test

* Fix test

* fix

* Deduplicate test code

* fix

* flag

* Update client/consensus/slots/src/lib.rs

Co-authored-by: Bastian Köcher <git@kchr.de>

* Revert "Deduplicate test code"

This reverts commit ba46adb.

* Fix test

* remove commented out code

* minor to start CI run

* start CI

* Update client/consensus/slots/src/lib.rs

Co-authored-by: Marcin S. <marcin@bytedude.com>

* Apply PR suggestions

* Apply PR suggestions

* Update client/consensus/slots/src/lib.rs

Co-authored-by: Bastian Köcher <git@kchr.de>

* minor

* kickoff CI

* PR suggestions

* Compute remaining duration instead of using slot_info.duration

* Don't rely on sub implementation for Instant

* Apply PR suggestions

* Use saturating_duration_since

Co-authored-by: Bastian Köcher <git@kchr.de>
Co-authored-by: Marcin S. <marcin@bytedude.com>
Co-authored-by: parity-processbot <>
  • Loading branch information
3 people authored and ltfschoen committed Feb 22, 2023
1 parent 51b31e9 commit c2669a0
Show file tree
Hide file tree
Showing 21 changed files with 97 additions and 53 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions bin/node-template/node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ name = "node-template"

[dependencies]
clap = { version = "4.0.9", features = ["derive"] }
futures = { version = "0.3.21", features = ["thread-pool"]}

sc-cli = { version = "0.10.0-dev", path = "../../../client/cli" }
sp-core = { version = "7.0.0", path = "../../../primitives/core" }
Expand Down
3 changes: 1 addition & 2 deletions bin/node-template/node/src/benchmarking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,8 +176,7 @@ pub fn inherent_benchmark_data() -> Result<InherentData> {
let d = Duration::from_millis(0);
let timestamp = sp_timestamp::InherentDataProvider::new(d.into());

timestamp
.provide_inherent_data(&mut inherent_data)
futures::executor::block_on(timestamp.provide_inherent_data(&mut inherent_data))
.map_err(|e| format!("creating inherent data: {:?}", e))?;
Ok(inherent_data)
}
4 changes: 3 additions & 1 deletion bin/node/bench/src/construct.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,8 +150,10 @@ impl core::Benchmark for ConstructionBenchmark {
)
.expect("Proposer initialization failed");

let inherent_data = futures::executor::block_on(timestamp_provider.create_inherent_data())
.expect("Create inherent data failed");
let _block = futures::executor::block_on(proposer.propose(
timestamp_provider.create_inherent_data().expect("Create inherent data failed"),
inherent_data,
Default::default(),
std::time::Duration::from_secs(20),
None,
Expand Down
3 changes: 1 addition & 2 deletions bin/node/cli/src/benchmarking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,7 @@ pub fn inherent_benchmark_data() -> Result<InherentData> {
let d = Duration::from_millis(0);
let timestamp = sp_timestamp::InherentDataProvider::new(d.into());

timestamp
.provide_inherent_data(&mut inherent_data)
futures::executor::block_on(timestamp.provide_inherent_data(&mut inherent_data))
.map_err(|e| format!("creating inherent data: {:?}", e))?;
Ok(inherent_data)
}
16 changes: 9 additions & 7 deletions bin/node/cli/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -692,14 +692,16 @@ mod tests {
slot += 1;
};

let inherent_data = (
sp_timestamp::InherentDataProvider::new(
std::time::Duration::from_millis(SLOT_DURATION * slot).into(),
),
sp_consensus_babe::inherents::InherentDataProvider::new(slot.into()),
let inherent_data = futures::executor::block_on(
(
sp_timestamp::InherentDataProvider::new(
std::time::Duration::from_millis(SLOT_DURATION * slot).into(),
),
sp_consensus_babe::inherents::InherentDataProvider::new(slot.into()),
)
.create_inherent_data(),
)
.create_inherent_data()
.expect("Creates inherent data");
.expect("Creates inherent data");

digest.push(<DigestItem as CompatibleDigestItem>::babe_pre_digest(babe_pre_digest));

Expand Down
1 change: 1 addition & 0 deletions client/consensus/aura/src/import_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ where

let mut inherent_data = create_inherent_data_providers
.create_inherent_data()
.await
.map_err(Error::<B>::Inherent)?;

let slot_now = create_inherent_data_providers.slot();
Expand Down
4 changes: 2 additions & 2 deletions client/consensus/aura/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ where
PF::Proposer: Proposer<B, Error = Error, Transaction = sp_api::TransactionFor<C, B>>,
SO: SyncOracle + Send + Sync + Clone,
L: sc_consensus::JustificationSyncLink<B>,
CIDP: CreateInherentDataProviders<B, ()> + Send,
CIDP: CreateInherentDataProviders<B, ()> + Send + 'static,
CIDP::InherentDataProviders: InherentDataProviderExt + Send,
BS: BackoffAuthoringBlocksStrategy<NumberFor<B>> + Send + Sync + 'static,
Error: std::error::Error + Send + From<sp_consensus::Error> + 'static,
Expand Down Expand Up @@ -960,7 +960,7 @@ mod tests {
let res = executor::block_on(worker.on_slot(SlotInfo {
slot: 0.into(),
ends_at: Instant::now() + Duration::from_secs(100),
inherent_data: InherentData::new(),
create_inherent_data: Box::new(()),
duration: Duration::from_millis(1000),
chain_head: head,
block_size_limit: None,
Expand Down
1 change: 1 addition & 0 deletions client/consensus/babe/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1215,6 +1215,7 @@ where
// timestamp in the inherents actually matches the slot set in the seal.
let mut inherent_data = create_inherent_data_providers
.create_inherent_data()
.await
.map_err(Error::<Block>::CreateInherents)?;
inherent_data.babe_replace_inherent_data(slot);

Expand Down
2 changes: 1 addition & 1 deletion client/consensus/manual-seal/src/consensus/timestamp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ impl SlotTimestampProvider {

#[async_trait::async_trait]
impl InherentDataProvider for SlotTimestampProvider {
fn provide_inherent_data(
async fn provide_inherent_data(
&self,
inherent_data: &mut InherentData,
) -> Result<(), sp_inherents::Error> {
Expand Down
2 changes: 1 addition & 1 deletion client/consensus/manual-seal/src/seal_block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ pub async fn seal_block<B, BI, SC, C, E, TP, CIDP, P>(
.await
.map_err(|e| Error::Other(e))?;

let inherent_data = inherent_data_providers.create_inherent_data()?;
let inherent_data = inherent_data_providers.create_inherent_data().await?;

let proposer = env.init(&parent).map_err(|err| Error::StringError(err.to_string())).await?;
let inherents_len = inherent_data.len();
Expand Down
3 changes: 2 additions & 1 deletion client/consensus/pow/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,7 @@ where

let inherent_data = inherent_data_providers
.create_inherent_data()
.await
.map_err(|e| Error::CreateInherents(e))?;

let inherent_res = self
Expand Down Expand Up @@ -585,7 +586,7 @@ where
},
};

let inherent_data = match inherent_data_providers.create_inherent_data() {
let inherent_data = match inherent_data_providers.create_inherent_data().await {
Ok(r) => r,
Err(e) => {
warn!(
Expand Down
50 changes: 46 additions & 4 deletions client/consensus/slots/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,11 @@ use sp_consensus::{Proposal, Proposer, SelectChain, SyncOracle};
use sp_consensus_slots::{Slot, SlotDuration};
use sp_inherents::CreateInherentDataProviders;
use sp_runtime::traits::{Block as BlockT, HashFor, Header as HeaderT};
use std::{fmt::Debug, ops::Deref, time::Duration};
use std::{
fmt::Debug,
ops::Deref,
time::{Duration, Instant},
};

/// The changes that need to applied to the storage to create the state for a block.
///
Expand Down Expand Up @@ -195,6 +199,9 @@ pub trait SimpleSlotWorker<B: BlockT> {
let slot = slot_info.slot;
let telemetry = self.telemetry();
let logging_target = self.logging_target();

let inherent_data = Self::create_inherent_data(&slot_info, &logging_target).await?;

let proposing_remaining_duration = self.proposing_remaining_duration(&slot_info);
let logs = self.pre_digest_data(slot, claim);

Expand All @@ -203,7 +210,7 @@ pub trait SimpleSlotWorker<B: BlockT> {
// the result to be returned.
let proposing = proposer
.propose(
slot_info.inherent_data,
inherent_data,
sp_runtime::generic::Digest { logs },
proposing_remaining_duration.mul_f32(0.98),
None,
Expand Down Expand Up @@ -242,6 +249,41 @@ pub trait SimpleSlotWorker<B: BlockT> {
Some(proposal)
}

/// Calls `create_inherent_data` and handles errors.
async fn create_inherent_data(
slot_info: &SlotInfo<B>,
logging_target: &str,
) -> Option<sp_inherents::InherentData> {
let remaining_duration = slot_info.ends_at.saturating_duration_since(Instant::now());
let delay = Delay::new(remaining_duration);
let cid = slot_info.create_inherent_data.create_inherent_data();
let inherent_data = match futures::future::select(delay, cid).await {
Either::Right((Ok(data), _)) => data,
Either::Right((Err(err), _)) => {
warn!(
target: logging_target,
"Unable to create inherent data for block {:?}: {}",
slot_info.chain_head.hash(),
err,
);

return None
},
Either::Left(_) => {
warn!(
target: logging_target,
"Creating inherent data took more time than we had left for slot {} for block {:?}.",
slot_info.slot,
slot_info.chain_head.hash(),
);

return None
},
};

Some(inherent_data)
}

/// Implements [`SlotWorker::on_slot`].
async fn on_slot(
&mut self,
Expand Down Expand Up @@ -474,7 +516,7 @@ pub async fn start_slot_worker<B, C, W, SO, CIDP, Proof>(
C: SelectChain<B>,
W: SlotWorker<B, Proof>,
SO: SyncOracle + Send,
CIDP: CreateInherentDataProviders<B, ()> + Send,
CIDP: CreateInherentDataProviders<B, ()> + Send + 'static,
CIDP::InherentDataProviders: InherentDataProviderExt + Send,
{
let mut slots = Slots::new(slot_duration.as_duration(), create_inherent_data_providers, client);
Expand Down Expand Up @@ -786,7 +828,7 @@ mod test {
super::slots::SlotInfo {
slot: slot.into(),
duration: SLOT_DURATION,
inherent_data: Default::default(),
create_inherent_data: Box::new(()),
ends_at: Instant::now() + SLOT_DURATION,
chain_head: Header::new(
1,
Expand Down
31 changes: 13 additions & 18 deletions client/consensus/slots/src/slots.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

use super::{InherentDataProviderExt, Slot};
use sp_consensus::{Error, SelectChain};
use sp_inherents::{CreateInherentDataProviders, InherentData, InherentDataProvider};
use sp_inherents::{CreateInherentDataProviders, InherentDataProvider};
use sp_runtime::traits::{Block as BlockT, Header as HeaderT};

use futures_timer::Delay;
Expand Down Expand Up @@ -52,8 +52,8 @@ pub struct SlotInfo<B: BlockT> {
pub slot: Slot,
/// The instant at which the slot ends.
pub ends_at: Instant,
/// The inherent data.
pub inherent_data: InherentData,
/// The inherent data provider.
pub create_inherent_data: Box<dyn InherentDataProvider>,
/// Slot duration.
pub duration: Duration,
/// The chain header this slot is based on.
Expand All @@ -70,14 +70,14 @@ impl<B: BlockT> SlotInfo<B> {
/// `ends_at` is calculated using `timestamp` and `duration`.
pub fn new(
slot: Slot,
inherent_data: InherentData,
create_inherent_data: Box<dyn InherentDataProvider>,
duration: Duration,
chain_head: B::Header,
block_size_limit: Option<usize>,
) -> Self {
Self {
slot,
inherent_data,
create_inherent_data,
duration,
chain_head,
block_size_limit,
Expand Down Expand Up @@ -118,7 +118,7 @@ impl<Block, SC, IDP> Slots<Block, SC, IDP>
where
Block: BlockT,
SC: SelectChain<Block>,
IDP: CreateInherentDataProviders<Block, ()>,
IDP: CreateInherentDataProviders<Block, ()> + 'static,
IDP::InherentDataProviders: crate::InherentDataProviderExt,
{
/// Returns a future that fires when the next slot starts.
Expand All @@ -142,9 +142,6 @@ where

// reschedule delay for next slot.
self.inner_delay = Some(Delay::new(ends_in));

let ends_at = Instant::now() + ends_in;

let chain_head = match self.select_chain.best_chain().await {
Ok(x) => x,
Err(e) => {
Expand All @@ -164,21 +161,19 @@ where
.create_inherent_data_providers(chain_head.hash(), ())
.await?;

if Instant::now() > ends_at {
log::warn!(
target: "slots",
"Creating inherent data providers took more time than we had left for the slot.",
);
}

let slot = inherent_data_providers.slot();
let inherent_data = inherent_data_providers.create_inherent_data()?;

// never yield the same slot twice.
if slot > self.last_slot {
self.last_slot = slot;

break Ok(SlotInfo::new(slot, inherent_data, self.slot_duration, chain_head, None))
break Ok(SlotInfo::new(
slot,
Box::new(inherent_data_providers),
self.slot_duration,
chain_head,
None,
))
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion primitives/authorship/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ impl<H> InherentDataProvider<H> {
#[cfg(feature = "std")]
#[async_trait::async_trait]
impl<H: HeaderT> sp_inherents::InherentDataProvider for InherentDataProvider<H> {
fn provide_inherent_data(&self, inherent_data: &mut InherentData) -> Result<(), Error> {
async fn provide_inherent_data(&self, inherent_data: &mut InherentData) -> Result<(), Error> {
inherent_data.put_data(INHERENT_IDENTIFIER, &self.uncles)
}

Expand Down
2 changes: 1 addition & 1 deletion primitives/consensus/aura/src/inherents.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ impl sp_std::ops::Deref for InherentDataProvider {
#[cfg(feature = "std")]
#[async_trait::async_trait]
impl sp_inherents::InherentDataProvider for InherentDataProvider {
fn provide_inherent_data(&self, inherent_data: &mut InherentData) -> Result<(), Error> {
async fn provide_inherent_data(&self, inherent_data: &mut InherentData) -> Result<(), Error> {
inherent_data.put_data(INHERENT_IDENTIFIER, &self.slot)
}

Expand Down
2 changes: 1 addition & 1 deletion primitives/consensus/babe/src/inherents.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ impl sp_std::ops::Deref for InherentDataProvider {
#[cfg(feature = "std")]
#[async_trait::async_trait]
impl sp_inherents::InherentDataProvider for InherentDataProvider {
fn provide_inherent_data(&self, inherent_data: &mut InherentData) -> Result<(), Error> {
async fn provide_inherent_data(&self, inherent_data: &mut InherentData) -> Result<(), Error> {
inherent_data.put_data(INHERENT_IDENTIFIER, &self.slot)
}

Expand Down
10 changes: 5 additions & 5 deletions primitives/inherents/src/client_side.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,16 +83,16 @@ pub trait InherentDataProvider: Send + Sync {
/// Convenience function for creating [`InherentData`].
///
/// Basically maps around [`Self::provide_inherent_data`].
fn create_inherent_data(&self) -> Result<InherentData, Error> {
async fn create_inherent_data(&self) -> Result<InherentData, Error> {
let mut inherent_data = InherentData::new();
self.provide_inherent_data(&mut inherent_data)?;
self.provide_inherent_data(&mut inherent_data).await?;
Ok(inherent_data)
}

/// Provide inherent data that should be included in a block.
///
/// The data should be stored in the given `InherentData` structure.
fn provide_inherent_data(&self, inherent_data: &mut InherentData) -> Result<(), Error>;
async fn provide_inherent_data(&self, inherent_data: &mut InherentData) -> Result<(), Error>;

/// Convert the given encoded error to a string.
///
Expand All @@ -108,8 +108,8 @@ pub trait InherentDataProvider: Send + Sync {
#[async_trait::async_trait]
impl InherentDataProvider for Tuple {
for_tuples!( where #( Tuple: Send + Sync )* );
fn provide_inherent_data(&self, inherent_data: &mut InherentData) -> Result<(), Error> {
for_tuples!( #( Tuple.provide_inherent_data(inherent_data)?; )* );
async fn provide_inherent_data(&self, inherent_data: &mut InherentData) -> Result<(), Error> {
for_tuples!( #( Tuple.provide_inherent_data(inherent_data).await?; )* );
Ok(())
}

Expand Down
Loading

0 comments on commit c2669a0

Please sign in to comment.