Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Split the service initialisation up into seperate functions #6332

Merged
merged 3 commits into from
Jun 16, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion client/consensus/aura/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -833,7 +833,7 @@ pub fn import_queue<B, I, C, P, S>(
P: Pair + Send + Sync + 'static,
P::Public: Clone + Eq + Send + Sync + Hash + Debug + Encode + Decode,
P::Signature: Encode + Decode,
S: sp_core::traits::SpawnBlocking,
S: sp_core::traits::SpawnNamed,
{
register_aura_inherent_data_provider(&inherent_data_providers, slot_duration.get())?;
initialize_authorities_cache(&*client)?;
Expand Down
2 changes: 1 addition & 1 deletion client/consensus/babe/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1291,7 +1291,7 @@ pub fn import_queue<Block: BlockT, Client, Inner>(
finality_proof_import: Option<BoxFinalityProofImport<Block>>,
client: Arc<Client>,
inherent_data_providers: InherentDataProviders,
spawner: &impl sp_core::traits::SpawnBlocking,
spawner: &impl sp_core::traits::SpawnNamed,
registry: Option<&Registry>,
) -> ClientResult<BabeImportQueue<Block, sp_api::TransactionFor<Client, Block>>> where
Inner: BlockImport<Block, Error = ConsensusError, Transaction = sp_api::TransactionFor<Client, Block>>
Expand Down
2 changes: 1 addition & 1 deletion client/consensus/manual-seal/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ impl<B: BlockT> Verifier<B> for ManualSealVerifier {
/// Instantiate the import queue for the manual seal consensus engine.
pub fn import_queue<Block, Transaction>(
block_import: BoxBlockImport<Block, Transaction>,
spawner: &impl sp_core::traits::SpawnBlocking,
spawner: &impl sp_core::traits::SpawnNamed,
registry: Option<&Registry>,
) -> BasicQueue<Block, Transaction>
where
Expand Down
2 changes: 1 addition & 1 deletion client/consensus/pow/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -466,7 +466,7 @@ pub fn import_queue<B, Transaction, Algorithm>(
finality_proof_import: Option<BoxFinalityProofImport<B>>,
algorithm: Algorithm,
inherent_data_providers: InherentDataProviders,
spawner: &impl sp_core::traits::SpawnBlocking,
spawner: &impl sp_core::traits::SpawnNamed,
registry: Option<&Registry>,
) -> Result<
PowImportQueue<B, Transaction>,
Expand Down
1 change: 1 addition & 0 deletions client/informant/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,4 @@ sp-blockchain = { version = "2.0.0-rc3", path = "../../primitives/blockchain" }
sp-runtime = { version = "2.0.0-rc3", path = "../../primitives/runtime" }
sp-utils = { version = "2.0.0-rc2", path = "../../primitives/utils" }
sp-transaction-pool = { version = "2.0.0-rc2", path = "../../primitives/transaction-pool" }
parking_lot = "0.10.2"
15 changes: 7 additions & 8 deletions client/informant/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,11 @@ use sc_network::{network_state::NetworkState, NetworkStatus};
use sp_blockchain::HeaderMetadata;
use sp_runtime::traits::{Block as BlockT, Header};
use sp_transaction_pool::TransactionPool;
use sp_utils::mpsc::TracingUnboundedReceiver;
use sp_utils::{status_sinks, mpsc::tracing_unbounded};
use std::fmt::Display;
use std::sync::Arc;
use std::time::Duration;
use parking_lot::Mutex;

mod display;

Expand Down Expand Up @@ -60,12 +61,7 @@ impl<T: TransactionPool + MallocSizeOf> TransactionPoolAndMaybeMallogSizeOf for
/// Builds the informant and returns a `Future` that drives the informant.
pub fn build<B: BlockT, C>(
client: Arc<C>,
network_status_stream_builder: impl FnOnce(
Duration,
) -> TracingUnboundedReceiver<(
NetworkStatus<B>,
NetworkState,
)>,
network_status_sinks: Arc<Mutex<status_sinks::StatusSinks<(NetworkStatus<B>, NetworkState)>>>,
pool: Arc<impl TransactionPoolAndMaybeMallogSizeOf>,
format: OutputFormat,
) -> impl futures::Future<Output = ()>
Expand All @@ -76,7 +72,10 @@ where
let mut display = display::InformantDisplay::new(format.clone());

let client_1 = client.clone();
let display_notifications = network_status_stream_builder(Duration::from_millis(5000))
let (network_status_sink, network_status_stream) = tracing_unbounded("mpsc_network_status");
network_status_sinks.lock().push(Duration::from_millis(5000), network_status_sink);

let display_notifications = network_status_stream
.for_each(move |(net_status, _)| {
let info = client_1.usage_info();
if let Some(ref usage) = info.usage {
Expand Down
40 changes: 39 additions & 1 deletion client/offchain/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,9 @@ use sp_api::{ApiExt, ProvideRuntimeApi};
use futures::future::Future;
use log::{debug, warn};
use sc_network::NetworkStateInfo;
use sp_core::{offchain::{self, OffchainStorage}, ExecutionContext};
use sp_core::{offchain::{self, OffchainStorage}, ExecutionContext, traits::SpawnNamed};
use sp_runtime::{generic::BlockId, traits::{self, Header}};
use futures::{prelude::*, future::ready};

mod api;

Expand Down Expand Up @@ -161,6 +162,43 @@ impl<Client, Storage, Block> OffchainWorkers<
}
}

/// Inform the offchain worker about new imported blocks
pub async fn notification_future<Client, Storage, Block, Spawner>(
is_validator: bool,
client: Arc<Client>,
offchain: Arc<OffchainWorkers<Client, Storage, Block>>,
spawner: Spawner,
network_state_info: Arc<dyn NetworkStateInfo + Send + Sync>,
)
where
Block: traits::Block,
Client: ProvideRuntimeApi<Block> + sc_client_api::BlockchainEvents<Block> + Send + Sync + 'static,
Client::Api: OffchainWorkerApi<Block>,
Storage: OffchainStorage + 'static,
Spawner: SpawnNamed
{
client.import_notification_stream().for_each(move |n| {
if n.is_new_best {
spawner.spawn(
"offchain-on-block",
offchain.on_block_imported(
&n.header,
network_state_info.clone(),
is_validator,
).boxed(),
);
} else {
log::debug!(
target: "sc_offchain",
"Skipping offchain workers for non-canon block: {:?}",
n.header,
)
}

ready(())
}).await;
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
Loading