Skip to content

Commit

Permalink
Millau -> Rialto headers relay (paritytech#398)
Browse files Browse the repository at this point in the history
* working-millau-to-rialto-relay

* fix Millau state root

* properly fix synced+incomplete headers for sub2sub

* removed wrong TODO

* fmt + clippy

* Update relays/headers-relay/src/headers.rs

Co-authored-by: Hernando Castano <HCastano@users.noreply.github.com>

* moved SubstrateTransactionMaker definition

* removed M from phantomdata

* removed prune_synced_children

* methods names as consts

Co-authored-by: Hernando Castano <HCastano@users.noreply.github.com>
  • Loading branch information
svyatonik and HCastano committed Oct 14, 2020
1 parent b16e9f2 commit 0cd3add
Show file tree
Hide file tree
Showing 13 changed files with 435 additions and 69 deletions.
1 change: 1 addition & 0 deletions bin/rialto/runtime/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ pub use frame_support::{
pub use pallet_balances::Call as BalancesCall;
pub use pallet_bridge_currency_exchange::Call as BridgeCurrencyExchangeCall;
pub use pallet_bridge_eth_poa::Call as BridgeEthPoACall;
pub use pallet_substrate_bridge::Call as BridgeMillauCall;
pub use pallet_timestamp::Call as TimestampCall;

#[cfg(any(feature = "std", test))]
Expand Down
2 changes: 1 addition & 1 deletion bin/rialto/runtime/src/millau.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ pub fn initial_header() -> Header {
Header {
parent_hash: Default::default(),
number: Default::default(),
state_root: hex!("bb65e8ba99408ebfefea9d28f74403d41da6858fa075c51fcc71dc383455c530").into(),
state_root: hex!("e901070e3bb061a6ae9ea8e4ba5417bf4c4642f9e75af9d372861c170ba7a9a3").into(),
extrinsics_root: hex!("03170a2e7597b7b7e3d84c05391d139a62b157e78786d8c082f29dcf4c111314").into(),
digest: Default::default(),
}
Expand Down
7 changes: 7 additions & 0 deletions primitives/millau/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,13 @@ impl Chain for Millau {
type Header = Header;
}

/// Name of the `MillauHeaderApi::best_block` runtime method.
pub const BEST_MILLAU_BLOCK_METHOD: &str = "MillauHeaderApi_best_block";
/// Name of the `MillauHeaderApi::is_known_block` runtime method.
pub const IS_KNOWN_MILLAU_BLOCK_METHOD: &str = "MillauHeaderApi_is_known_block";
/// Name of the `MillauHeaderApi::incomplete_headers` runtime method.
pub const INCOMPLETE_MILLAU_HEADERS_METHOD: &str = "MillauHeaderApi_incomplete_headers";

sp_api::decl_runtime_apis! {
/// API for querying information about Millau headers from the Bridge Pallet instance.
///
Expand Down
197 changes: 183 additions & 14 deletions relays/headers-relay/src/headers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ use std::{

type HeadersQueue<P> =
BTreeMap<<P as HeadersSyncPipeline>::Number, HashMap<<P as HeadersSyncPipeline>::Hash, QueuedHeader<P>>>;
type SyncedChildren<P> =
BTreeMap<<P as HeadersSyncPipeline>::Number, HashMap<<P as HeadersSyncPipeline>::Hash, HashSet<HeaderIdOf<P>>>>;
type KnownHeaders<P> =
BTreeMap<<P as HeadersSyncPipeline>::Number, HashMap<<P as HeadersSyncPipeline>::Hash, HeaderStatus>>;

Expand Down Expand Up @@ -63,6 +65,9 @@ pub struct QueuedHeaders<P: HeadersSyncPipeline> {
/// Headers that are (we believe) currently submitted to target node by our,
/// not-yet mined transactions.
submitted: HeadersQueue<P>,
/// Synced headers childrens. We need it to support case when header is synced, but some of
/// its parents are incomplete.
synced_children: SyncedChildren<P>,
/// Pointers to all headers that we ever seen and we believe we can touch in the future.
known_headers: KnownHeaders<P>,
/// Headers that are waiting for completion data from source node. Mapped (and auto-sorted
Expand Down Expand Up @@ -96,6 +101,7 @@ impl<P: HeadersSyncPipeline> Default for QueuedHeaders<P> {
ready: HeadersQueue::new(),
incomplete: HeadersQueue::new(),
submitted: HeadersQueue::new(),
synced_children: SyncedChildren::<P>::new(),
known_headers: KnownHeaders::<P>::new(),
incomplete_headers: LinkedHashMap::new(),
completion_data: LinkedHashMap::new(),
Expand Down Expand Up @@ -419,13 +425,17 @@ impl<P: HeadersSyncPipeline> QueuedHeaders<P> {
self.header_synced(&new_incomplete_header);
}

move_header_descendants::<P>(
&mut [&mut self.ready, &mut self.submitted],
&mut self.incomplete,
&mut self.known_headers,
HeaderStatus::Incomplete,
&new_incomplete_header,
);
let move_origins = select_synced_children::<P>(&self.synced_children, &new_incomplete_header);
let move_origins = move_origins.into_iter().chain(std::iter::once(new_incomplete_header));
for move_origin in move_origins {
move_header_descendants::<P>(
&mut [&mut self.ready, &mut self.submitted],
&mut self.incomplete,
&mut self.known_headers,
HeaderStatus::Incomplete,
&move_origin,
);
}

if make_header_incomplete {
log::debug!(
Expand Down Expand Up @@ -460,13 +470,20 @@ impl<P: HeadersSyncPipeline> QueuedHeaders<P> {
.cloned()
.collect::<Vec<_>>();
for just_completed_header in just_completed_headers {
move_header_descendants::<P>(
&mut [&mut self.incomplete],
&mut self.ready,
&mut self.known_headers,
HeaderStatus::Ready,
&just_completed_header,
);
// sub2eth rejects H if H.Parent is incomplete
// sub2sub allows 'syncing' headers like that
// => let's check if there are some synced children of just completed header
let move_origins = select_synced_children::<P>(&self.synced_children, &just_completed_header);
let move_origins = move_origins.into_iter().chain(std::iter::once(just_completed_header));
for move_origin in move_origins {
move_header_descendants::<P>(
&mut [&mut self.incomplete],
&mut self.ready,
&mut self.known_headers,
HeaderStatus::Ready,
&move_origin,
);
}

log::debug!(
target: "bridge",
Expand Down Expand Up @@ -514,6 +531,7 @@ impl<P: HeadersSyncPipeline> QueuedHeaders<P> {
prune_queue(&mut self.ready, prune_border);
prune_queue(&mut self.submitted, prune_border);
prune_queue(&mut self.incomplete, prune_border);
self.synced_children = self.synced_children.split_off(&prune_border);
prune_known_headers::<P>(&mut self.known_headers, prune_border);
self.prune_border = prune_border;
}
Expand All @@ -527,6 +545,7 @@ impl<P: HeadersSyncPipeline> QueuedHeaders<P> {
self.ready.clear();
self.incomplete.clear();
self.submitted.clear();
self.synced_children.clear();
self.known_headers.clear();
self.best_synced_number = Zero::zero();
self.prune_border = Zero::zero();
Expand Down Expand Up @@ -568,6 +587,7 @@ impl<P: HeadersSyncPipeline> QueuedHeaders<P> {
// queues
let mut current = *id;
let mut id_processed = false;
let mut previous_current = None;
loop {
let header = match self.status(&current) {
HeaderStatus::Unknown => break,
Expand All @@ -582,8 +602,42 @@ impl<P: HeadersSyncPipeline> QueuedHeaders<P> {
}
.expect("header has a given status; given queue has the header; qed");

// remember ids of all the children of the current header
let synced_children_entry = self
.synced_children
.entry(current.0)
.or_default()
.entry(current.1)
.or_default();
let all_queues = [
&self.maybe_orphan,
&self.orphan,
&self.maybe_extra,
&self.extra,
&self.ready,
&self.incomplete,
&self.submitted,
];
for queue in &all_queues {
let children_from_queue = queue
.get(&(current.0 + One::one()))
.map(|potential_children| {
potential_children
.values()
.filter(|potential_child| potential_child.header().parent_id() == current)
.map(|child| child.id())
.collect::<Vec<_>>()
})
.unwrap_or_default();
synced_children_entry.extend(children_from_queue);
}
if let Some(previous_current) = previous_current {
synced_children_entry.insert(previous_current);
}

set_header_status::<P>(&mut self.known_headers, &current, HeaderStatus::Synced);

previous_current = Some(current);
current = header.parent_id();
id_processed = true;
}
Expand Down Expand Up @@ -706,6 +760,35 @@ fn move_header_descendants<P: HeadersSyncPipeline>(
}
}

/// Selects (recursive) all synced children of given header.
fn select_synced_children<P: HeadersSyncPipeline>(
synced_children: &SyncedChildren<P>,
id: &HeaderIdOf<P>,
) -> Vec<HeaderIdOf<P>> {
let mut result = Vec::new();
let mut current_parents = HashSet::new();
current_parents.insert(*id);

while !current_parents.is_empty() {
let mut next_parents = HashSet::new();
for current_parent in &current_parents {
let current_parent_synced_children = synced_children
.get(&current_parent.0)
.and_then(|by_number_entry| by_number_entry.get(&current_parent.1));
if let Some(current_parent_synced_children) = current_parent_synced_children {
for current_parent_synced_child in current_parent_synced_children {
result.push(*current_parent_synced_child);
next_parents.insert(*current_parent_synced_child);
}
}
}

let _ = std::mem::replace(&mut current_parents, next_parents);
}

result
}

/// Return oldest header from the queue.
fn oldest_header<P: HeadersSyncPipeline>(queue: &HeadersQueue<P>) -> Option<&QueuedHeader<P>> {
queue.values().flat_map(|h| h.values()).next()
Expand Down Expand Up @@ -1050,6 +1133,37 @@ pub(crate) mod tests {
.known_headers
.values()
.all(|s| s.values().all(|s| *s == HeaderStatus::Synced)));

// children of synced headers are stored
assert_eq!(
vec![id(97)],
queue.synced_children[&96][&hash(96)]
.iter()
.cloned()
.collect::<Vec<_>>()
);
assert_eq!(
vec![id(98)],
queue.synced_children[&97][&hash(97)]
.iter()
.cloned()
.collect::<Vec<_>>()
);
assert_eq!(
vec![id(99)],
queue.synced_children[&98][&hash(98)]
.iter()
.cloned()
.collect::<Vec<_>>()
);
assert_eq!(
vec![id(100)],
queue.synced_children[&99][&hash(99)]
.iter()
.cloned()
.collect::<Vec<_>>()
);
assert_eq!(0, queue.synced_children[&100][&hash(100)].len());
}

#[test]
Expand Down Expand Up @@ -1463,6 +1577,16 @@ pub(crate) mod tests {
.or_default()
.insert(hash(100), HeaderStatus::Ready);
queue.ready.entry(100).or_default().insert(hash(100), header(100));
queue
.synced_children
.entry(100)
.or_default()
.insert(hash(100), vec![id(101)].into_iter().collect());
queue
.synced_children
.entry(102)
.or_default()
.insert(hash(102), vec![id(102)].into_iter().collect());

queue.prune(102);

Expand All @@ -1472,6 +1596,7 @@ pub(crate) mod tests {
assert_eq!(queue.orphan.len(), 1);
assert_eq!(queue.maybe_orphan.len(), 1);
assert_eq!(queue.incomplete.len(), 1);
assert_eq!(queue.synced_children.len(), 1);
assert_eq!(queue.known_headers.len(), 4);

queue.prune(110);
Expand All @@ -1482,6 +1607,7 @@ pub(crate) mod tests {
assert_eq!(queue.orphan.len(), 0);
assert_eq!(queue.maybe_orphan.len(), 0);
assert_eq!(queue.incomplete.len(), 0);
assert_eq!(queue.synced_children.len(), 0);
assert_eq!(queue.known_headers.len(), 0);

queue.header_response(header(109).header().clone());
Expand Down Expand Up @@ -1537,4 +1663,47 @@ pub(crate) mod tests {
assert_eq!(queue.status(&id(103)), HeaderStatus::Incomplete);
assert_eq!(queue.status(&id(104)), HeaderStatus::Incomplete);
}

#[test]
fn incomplete_headers_response_moves_synced_headers() {
let mut queue = QueuedHeaders::<TestHeadersSyncPipeline>::default();

// we have submitted two headers - 100 and 101. 102 is ready
queue.submitted.entry(100).or_default().insert(hash(100), header(100));
queue.submitted.entry(101).or_default().insert(hash(101), header(101));
queue.ready.entry(102).or_default().insert(hash(102), header(102));
queue
.known_headers
.entry(100)
.or_default()
.insert(hash(100), HeaderStatus::Submitted);
queue
.known_headers
.entry(101)
.or_default()
.insert(hash(101), HeaderStatus::Submitted);
queue
.known_headers
.entry(102)
.or_default()
.insert(hash(102), HeaderStatus::Ready);

// both headers are accepted
queue.target_best_header_response(&id(101));

// but header 100 is incomplete
queue.incomplete_headers_response(vec![id(100)].into_iter().collect());
assert_eq!(queue.status(&id(100)), HeaderStatus::Synced);
assert_eq!(queue.status(&id(101)), HeaderStatus::Synced);
assert_eq!(queue.status(&id(102)), HeaderStatus::Incomplete);
assert!(queue.incomplete_headers.contains_key(&id(100)));
assert!(queue.incomplete[&102].contains_key(&hash(102)));

// when header 100 is completed, 101 is synced and 102 is ready
queue.incomplete_headers_response(HashSet::new());
assert_eq!(queue.status(&id(100)), HeaderStatus::Synced);
assert_eq!(queue.status(&id(101)), HeaderStatus::Synced);
assert_eq!(queue.status(&id(102)), HeaderStatus::Ready);
assert!(queue.ready[&102].contains_key(&hash(102)));
}
}
2 changes: 1 addition & 1 deletion relays/headers-relay/src/sync_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ pub fn run<P: HeadersSyncPipeline, TC: TargetClient<P>>(
},
&mut target_go_offline_future,
|delay| async_std::task::sleep(delay),
|| format!("Error retrieving best known header from {} node", P::TARGET_NAME),
|| format!("Error retrieving best known {} header from {} node", P::SOURCE_NAME, P::TARGET_NAME),
).is_ok();
},
incomplete_headers_ids = target_incomplete_headers_future => {
Expand Down
2 changes: 1 addition & 1 deletion relays/headers-relay/src/sync_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ pub enum HeaderStatus {
}

/// Headers synchronization pipeline.
pub trait HeadersSyncPipeline: Clone + Copy + Send + Sync {
pub trait HeadersSyncPipeline: Clone + Send + Sync {
/// Name of the headers source.
const SOURCE_NAME: &'static str;
/// Name of the headers target.
Expand Down
8 changes: 7 additions & 1 deletion relays/millau-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,18 @@ impl From<millau_runtime::Header> for SyncHeader {
}
}

impl From<SyncHeader> for millau_runtime::Header {
fn from(header: SyncHeader) -> Self {
header.0
}
}

impl SourceHeader<millau_runtime::Hash, millau_runtime::BlockNumber> for SyncHeader {
fn id(&self) -> HeaderId {
relay_utils::HeaderId(*self.number(), self.hash())
}

fn parent_id(&self) -> HeaderId {
relay_utils::HeaderId(*self.number(), *self.parent_hash())
relay_utils::HeaderId(*self.number() - 1, *self.parent_hash())
}
}
2 changes: 2 additions & 0 deletions relays/rialto-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ use sp_runtime::{
traits::{Header as HeaderT, IdentifyAccount},
};

pub use rialto_runtime::BridgeMillauCall;

/// Rialto header id.
pub type HeaderId = relay_utils::HeaderId<rialto_runtime::Hash, rialto_runtime::BlockNumber>;

Expand Down
3 changes: 3 additions & 0 deletions relays/substrate-client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ const SUB_API_GRANDPA_AUTHORITIES: &str = "GrandpaApi_grandpa_authorities";
pub type OpaqueGrandpaAuthoritiesSet = Vec<u8>;

/// Substrate client type.
///
/// Cloning Client is a cheap operation.
#[derive(Clone)]
pub struct Client<C: Chain> {
/// Substrate RPC client.
client: RpcClient,
Expand Down
Loading

0 comments on commit 0cd3add

Please sign in to comment.