Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add pov-recovery unit tests and support for elastic scaling #4733

Merged
merged 8 commits into from
Jun 10, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions cumulus/client/consensus/common/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,14 @@ impl RelayChainInterface for Relaychain {
unimplemented!("Not needed for test")
}

async fn candidates_pending_availability(
&self,
_: PHash,
_: ParaId,
) -> RelayChainResult<Vec<CommittedCandidateReceipt>> {
unimplemented!("Not needed for test")
}

async fn session_index_for_child(&self, _: PHash) -> RelayChainResult<SessionIndex> {
Ok(0)
}
Expand Down
27 changes: 16 additions & 11 deletions cumulus/client/network/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -266,18 +266,18 @@ where
Ok(para_head)
}

/// Get the backed block hash of the given parachain in the relay chain.
async fn backed_block_hash(
/// Get the backed block hashes of the given parachain in the relay chain.
async fn backed_block_hashes(
relay_chain_interface: &RCInterface,
hash: PHash,
para_id: ParaId,
) -> Result<Option<PHash>, BoxedError> {
let candidate_receipt = relay_chain_interface
.candidate_pending_availability(hash, para_id)
) -> Result<impl Iterator<Item = PHash>, BoxedError> {
let candidate_receipts = relay_chain_interface
.candidates_pending_availability(hash, para_id)
.await
.map_err(|e| Box::new(BlockAnnounceError(format!("{:?}", e))) as Box<_>)?;

Ok(candidate_receipt.map(|cr| cr.descriptor.para_head))
Ok(candidate_receipts.into_iter().map(|cr| cr.descriptor.para_head))
}

/// Handle a block announcement with empty data (no statement) attached to it.
Expand All @@ -298,15 +298,20 @@ where
let best_head =
Self::included_block(&relay_chain_interface, relay_chain_best_hash, para_id).await?;
let known_best_number = best_head.number();
let backed_block = || async {
Self::backed_block_hash(&relay_chain_interface, relay_chain_best_hash, para_id).await
};

if best_head == header {
tracing::debug!(target: LOG_TARGET, "Announced block matches best block.",);

Ok(Validation::Success { is_new_best: true })
} else if Some(HeadData(header.encode()).hash()) == backed_block().await? {
return Ok(Validation::Success { is_new_best: true })
}

let mut backed_blocks =
Self::backed_block_hashes(&relay_chain_interface, relay_chain_best_hash, para_id)
.await?;

let head_hash = HeadData(header.encode()).hash();

if backed_blocks.any(|block_hash| block_hash == head_hash) {
tracing::debug!(target: LOG_TARGET, "Announced block matches latest backed block.",);

Ok(Validation::Success { is_new_best: true })
Expand Down
14 changes: 11 additions & 3 deletions cumulus/client/network/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,16 @@ impl RelayChainInterface for DummyRelayChainInterface {
_: PHash,
_: ParaId,
) -> RelayChainResult<Option<CommittedCandidateReceipt>> {
unimplemented!("Not needed for test")
}

async fn candidates_pending_availability(
&self,
_: PHash,
_: ParaId,
) -> RelayChainResult<Vec<CommittedCandidateReceipt>> {
if self.data.lock().has_pending_availability {
Ok(Some(CommittedCandidateReceipt {
Ok(vec![CommittedCandidateReceipt {
descriptor: CandidateDescriptor {
para_head: polkadot_parachain_primitives::primitives::HeadData(
default_header().encode(),
Expand All @@ -155,9 +163,9 @@ impl RelayChainInterface for DummyRelayChainInterface {
processed_downward_messages: 0,
hrmp_watermark: 0,
},
}))
}])
} else {
Ok(None)
Ok(vec![])
}
}

Expand Down
5 changes: 5 additions & 0 deletions cumulus/client/pov-recovery/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@ async-trait = "0.1.79"
[dev-dependencies]
tokio = { version = "1.32.0", features = ["macros"] }
portpicker = "0.1.1"
sp-blockchain = { path = "../../../substrate/primitives/blockchain" }
cumulus-test-client = { path = "../../test/client" }
sc-utils = { path = "../../../substrate/client/utils" }
sp-tracing = { path = "../../../substrate/primitives/tracing" }
assert_matches = "1.5"

# Cumulus
cumulus-test-service = { path = "../../test/service" }
Expand Down
9 changes: 2 additions & 7 deletions cumulus/client/pov-recovery/src/active_candidate_recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use polkadot_node_subsystem::messages::AvailabilityRecoveryMessage;

use futures::{channel::oneshot, stream::FuturesUnordered, Future, FutureExt, StreamExt};

use std::{collections::HashSet, pin::Pin, sync::Arc};
use std::{pin::Pin, sync::Arc};

use crate::RecoveryHandle;

Expand All @@ -32,14 +32,12 @@ pub(crate) struct ActiveCandidateRecovery<Block: BlockT> {
/// The recoveries that are currently being executed.
recoveries:
FuturesUnordered<Pin<Box<dyn Future<Output = (Block::Hash, Option<Arc<PoV>>)> + Send>>>,
/// The block hashes of the candidates currently being recovered.
candidates: HashSet<Block::Hash>,
recovery_handle: Box<dyn RecoveryHandle>,
}

impl<Block: BlockT> ActiveCandidateRecovery<Block> {
pub fn new(recovery_handle: Box<dyn RecoveryHandle>) -> Self {
Self { recoveries: Default::default(), candidates: Default::default(), recovery_handle }
Self { recoveries: Default::default(), recovery_handle }
}

/// Recover the given `candidate`.
Expand All @@ -63,8 +61,6 @@ impl<Block: BlockT> ActiveCandidateRecovery<Block> {
)
.await;

self.candidates.insert(block_hash);

self.recoveries.push(
async move {
match rx.await {
Expand Down Expand Up @@ -97,7 +93,6 @@ impl<Block: BlockT> ActiveCandidateRecovery<Block> {
pub async fn wait_for_recovery(&mut self) -> (Block::Hash, Option<Arc<PoV>>) {
loop {
if let Some(res) = self.recoveries.next().await {
self.candidates.remove(&res.0);
return res
} else {
futures::pending!()
Expand Down
21 changes: 13 additions & 8 deletions cumulus/client/pov-recovery/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@ use std::{
time::Duration,
};

#[cfg(test)]
mod tests;

mod active_candidate_recovery;
use active_candidate_recovery::ActiveCandidateRecovery;

Expand Down Expand Up @@ -544,7 +547,7 @@ where
)
.await
{
Ok(pending_candidate_stream) => pending_candidate_stream.fuse(),
Ok(pending_candidates_stream) => pending_candidates_stream.fuse(),
Err(err) => {
tracing::error!(target: LOG_TARGET, error = ?err, "Unable to retrieve pending candidate stream.");
return
Expand All @@ -554,9 +557,11 @@ where
futures::pin_mut!(pending_candidates);
loop {
select! {
pending_candidate = pending_candidates.next() => {
if let Some((receipt, session_index)) = pending_candidate {
self.handle_pending_candidate(receipt, session_index);
next_pending_candidates = pending_candidates.next() => {
if let Some((candidates, session_index)) = next_pending_candidates {
for candidate in candidates {
self.handle_pending_candidate(candidate, session_index);
}
} else {
tracing::debug!(target: LOG_TARGET, "Pending candidates stream ended");
return;
Expand Down Expand Up @@ -615,7 +620,7 @@ async fn pending_candidates(
relay_chain_client: impl RelayChainInterface + Clone,
para_id: ParaId,
sync_service: Arc<dyn SyncOracle + Sync + Send>,
) -> RelayChainResult<impl Stream<Item = (CommittedCandidateReceipt, SessionIndex)>> {
) -> RelayChainResult<impl Stream<Item = (Vec<CommittedCandidateReceipt>, SessionIndex)>> {
let import_notification_stream = relay_chain_client.import_notification_stream().await?;

let filtered_stream = import_notification_stream.filter_map(move |n| {
Expand All @@ -633,7 +638,7 @@ async fn pending_candidates(
}

let pending_availability_result = client_for_closure
.candidate_pending_availability(hash, para_id)
.candidates_pending_availability(hash, para_id)
.await
.map_err(|e| {
tracing::error!(
Expand All @@ -651,8 +656,8 @@ async fn pending_candidates(
)
});

if let Ok(Some(candidate)) = pending_availability_result {
session_index_result.map(|session_index| (candidate, session_index)).ok()
if let Ok(candidates) = pending_availability_result {
session_index_result.map(|session_index| (candidates, session_index)).ok()
} else {
None
}
Expand Down
Loading
Loading