diff --git a/bin/rialto/runtime/src/lib.rs b/bin/rialto/runtime/src/lib.rs index 77a23cbf8f4a..cdb09c6cb009 100644 --- a/bin/rialto/runtime/src/lib.rs +++ b/bin/rialto/runtime/src/lib.rs @@ -62,6 +62,7 @@ pub use frame_support::{ pub use pallet_balances::Call as BalancesCall; pub use pallet_bridge_currency_exchange::Call as BridgeCurrencyExchangeCall; pub use pallet_bridge_eth_poa::Call as BridgeEthPoACall; +pub use pallet_substrate_bridge::Call as BridgeMillauCall; pub use pallet_timestamp::Call as TimestampCall; #[cfg(any(feature = "std", test))] diff --git a/bin/rialto/runtime/src/millau.rs b/bin/rialto/runtime/src/millau.rs index 47568f41742a..57ca750e2406 100644 --- a/bin/rialto/runtime/src/millau.rs +++ b/bin/rialto/runtime/src/millau.rs @@ -36,7 +36,7 @@ pub fn initial_header() -> Header { Header { parent_hash: Default::default(), number: Default::default(), - state_root: hex!("bb65e8ba99408ebfefea9d28f74403d41da6858fa075c51fcc71dc383455c530").into(), + state_root: hex!("e901070e3bb061a6ae9ea8e4ba5417bf4c4642f9e75af9d372861c170ba7a9a3").into(), extrinsics_root: hex!("03170a2e7597b7b7e3d84c05391d139a62b157e78786d8c082f29dcf4c111314").into(), digest: Default::default(), } diff --git a/primitives/millau/src/lib.rs b/primitives/millau/src/lib.rs index 99bb0207f50d..844e4667c603 100644 --- a/primitives/millau/src/lib.rs +++ b/primitives/millau/src/lib.rs @@ -49,6 +49,13 @@ impl Chain for Millau { type Header = Header; } +/// Name of the `MillauHeaderApi::best_block` runtime method. +pub const BEST_MILLAU_BLOCK_METHOD: &str = "MillauHeaderApi_best_block"; +/// Name of the `MillauHeaderApi::is_known_block` runtime method. +pub const IS_KNOWN_MILLAU_BLOCK_METHOD: &str = "MillauHeaderApi_is_known_block"; +/// Name of the `MillauHeaderApi::incomplete_headers` runtime method. +pub const INCOMPLETE_MILLAU_HEADERS_METHOD: &str = "MillauHeaderApi_incomplete_headers"; + sp_api::decl_runtime_apis! { /// API for querying information about Millau headers from the Bridge Pallet instance. /// diff --git a/relays/headers-relay/src/headers.rs b/relays/headers-relay/src/headers.rs index 5800d370fba0..b8c51eec1b49 100644 --- a/relays/headers-relay/src/headers.rs +++ b/relays/headers-relay/src/headers.rs @@ -32,6 +32,8 @@ use std::{ type HeadersQueue

= BTreeMap<

::Number, HashMap<

::Hash, QueuedHeader

>>; +type SyncedChildren

= + BTreeMap<

::Number, HashMap<

::Hash, HashSet>>>; type KnownHeaders

= BTreeMap<

::Number, HashMap<

::Hash, HeaderStatus>>; @@ -63,6 +65,9 @@ pub struct QueuedHeaders { /// Headers that are (we believe) currently submitted to target node by our, /// not-yet mined transactions. submitted: HeadersQueue

, + /// Synced headers childrens. We need it to support case when header is synced, but some of + /// its parents are incomplete. + synced_children: SyncedChildren

, /// Pointers to all headers that we ever seen and we believe we can touch in the future. known_headers: KnownHeaders

, /// Headers that are waiting for completion data from source node. Mapped (and auto-sorted @@ -96,6 +101,7 @@ impl Default for QueuedHeaders

{ ready: HeadersQueue::new(), incomplete: HeadersQueue::new(), submitted: HeadersQueue::new(), + synced_children: SyncedChildren::

::new(), known_headers: KnownHeaders::

::new(), incomplete_headers: LinkedHashMap::new(), completion_data: LinkedHashMap::new(), @@ -419,13 +425,17 @@ impl QueuedHeaders

{ self.header_synced(&new_incomplete_header); } - move_header_descendants::

( - &mut [&mut self.ready, &mut self.submitted], - &mut self.incomplete, - &mut self.known_headers, - HeaderStatus::Incomplete, - &new_incomplete_header, - ); + let move_origins = select_synced_children::

(&self.synced_children, &new_incomplete_header); + let move_origins = move_origins.into_iter().chain(std::iter::once(new_incomplete_header)); + for move_origin in move_origins { + move_header_descendants::

( + &mut [&mut self.ready, &mut self.submitted], + &mut self.incomplete, + &mut self.known_headers, + HeaderStatus::Incomplete, + &move_origin, + ); + } if make_header_incomplete { log::debug!( @@ -460,13 +470,20 @@ impl QueuedHeaders

{ .cloned() .collect::>(); for just_completed_header in just_completed_headers { - move_header_descendants::

( - &mut [&mut self.incomplete], - &mut self.ready, - &mut self.known_headers, - HeaderStatus::Ready, - &just_completed_header, - ); + // sub2eth rejects H if H.Parent is incomplete + // sub2sub allows 'syncing' headers like that + // => let's check if there are some synced children of just completed header + let move_origins = select_synced_children::

(&self.synced_children, &just_completed_header); + let move_origins = move_origins.into_iter().chain(std::iter::once(just_completed_header)); + for move_origin in move_origins { + move_header_descendants::

( + &mut [&mut self.incomplete], + &mut self.ready, + &mut self.known_headers, + HeaderStatus::Ready, + &move_origin, + ); + } log::debug!( target: "bridge", @@ -514,6 +531,7 @@ impl QueuedHeaders

{ prune_queue(&mut self.ready, prune_border); prune_queue(&mut self.submitted, prune_border); prune_queue(&mut self.incomplete, prune_border); + self.synced_children = self.synced_children.split_off(&prune_border); prune_known_headers::

(&mut self.known_headers, prune_border); self.prune_border = prune_border; } @@ -527,6 +545,7 @@ impl QueuedHeaders

{ self.ready.clear(); self.incomplete.clear(); self.submitted.clear(); + self.synced_children.clear(); self.known_headers.clear(); self.best_synced_number = Zero::zero(); self.prune_border = Zero::zero(); @@ -568,6 +587,7 @@ impl QueuedHeaders

{ // queues let mut current = *id; let mut id_processed = false; + let mut previous_current = None; loop { let header = match self.status(¤t) { HeaderStatus::Unknown => break, @@ -582,8 +602,42 @@ impl QueuedHeaders

{ } .expect("header has a given status; given queue has the header; qed"); + // remember ids of all the children of the current header + let synced_children_entry = self + .synced_children + .entry(current.0) + .or_default() + .entry(current.1) + .or_default(); + let all_queues = [ + &self.maybe_orphan, + &self.orphan, + &self.maybe_extra, + &self.extra, + &self.ready, + &self.incomplete, + &self.submitted, + ]; + for queue in &all_queues { + let children_from_queue = queue + .get(&(current.0 + One::one())) + .map(|potential_children| { + potential_children + .values() + .filter(|potential_child| potential_child.header().parent_id() == current) + .map(|child| child.id()) + .collect::>() + }) + .unwrap_or_default(); + synced_children_entry.extend(children_from_queue); + } + if let Some(previous_current) = previous_current { + synced_children_entry.insert(previous_current); + } + set_header_status::

(&mut self.known_headers, ¤t, HeaderStatus::Synced); + previous_current = Some(current); current = header.parent_id(); id_processed = true; } @@ -706,6 +760,35 @@ fn move_header_descendants( } } +/// Selects (recursive) all synced children of given header. +fn select_synced_children( + synced_children: &SyncedChildren

, + id: &HeaderIdOf

, +) -> Vec> { + let mut result = Vec::new(); + let mut current_parents = HashSet::new(); + current_parents.insert(*id); + + while !current_parents.is_empty() { + let mut next_parents = HashSet::new(); + for current_parent in ¤t_parents { + let current_parent_synced_children = synced_children + .get(¤t_parent.0) + .and_then(|by_number_entry| by_number_entry.get(¤t_parent.1)); + if let Some(current_parent_synced_children) = current_parent_synced_children { + for current_parent_synced_child in current_parent_synced_children { + result.push(*current_parent_synced_child); + next_parents.insert(*current_parent_synced_child); + } + } + } + + let _ = std::mem::replace(&mut current_parents, next_parents); + } + + result +} + /// Return oldest header from the queue. fn oldest_header(queue: &HeadersQueue

) -> Option<&QueuedHeader

> { queue.values().flat_map(|h| h.values()).next() @@ -1050,6 +1133,37 @@ pub(crate) mod tests { .known_headers .values() .all(|s| s.values().all(|s| *s == HeaderStatus::Synced))); + + // children of synced headers are stored + assert_eq!( + vec![id(97)], + queue.synced_children[&96][&hash(96)] + .iter() + .cloned() + .collect::>() + ); + assert_eq!( + vec![id(98)], + queue.synced_children[&97][&hash(97)] + .iter() + .cloned() + .collect::>() + ); + assert_eq!( + vec![id(99)], + queue.synced_children[&98][&hash(98)] + .iter() + .cloned() + .collect::>() + ); + assert_eq!( + vec![id(100)], + queue.synced_children[&99][&hash(99)] + .iter() + .cloned() + .collect::>() + ); + assert_eq!(0, queue.synced_children[&100][&hash(100)].len()); } #[test] @@ -1463,6 +1577,16 @@ pub(crate) mod tests { .or_default() .insert(hash(100), HeaderStatus::Ready); queue.ready.entry(100).or_default().insert(hash(100), header(100)); + queue + .synced_children + .entry(100) + .or_default() + .insert(hash(100), vec![id(101)].into_iter().collect()); + queue + .synced_children + .entry(102) + .or_default() + .insert(hash(102), vec![id(102)].into_iter().collect()); queue.prune(102); @@ -1472,6 +1596,7 @@ pub(crate) mod tests { assert_eq!(queue.orphan.len(), 1); assert_eq!(queue.maybe_orphan.len(), 1); assert_eq!(queue.incomplete.len(), 1); + assert_eq!(queue.synced_children.len(), 1); assert_eq!(queue.known_headers.len(), 4); queue.prune(110); @@ -1482,6 +1607,7 @@ pub(crate) mod tests { assert_eq!(queue.orphan.len(), 0); assert_eq!(queue.maybe_orphan.len(), 0); assert_eq!(queue.incomplete.len(), 0); + assert_eq!(queue.synced_children.len(), 0); assert_eq!(queue.known_headers.len(), 0); queue.header_response(header(109).header().clone()); @@ -1537,4 +1663,47 @@ pub(crate) mod tests { assert_eq!(queue.status(&id(103)), HeaderStatus::Incomplete); assert_eq!(queue.status(&id(104)), HeaderStatus::Incomplete); } + + #[test] + fn incomplete_headers_response_moves_synced_headers() { + let mut queue = QueuedHeaders::::default(); + + // we have submitted two headers - 100 and 101. 102 is ready + queue.submitted.entry(100).or_default().insert(hash(100), header(100)); + queue.submitted.entry(101).or_default().insert(hash(101), header(101)); + queue.ready.entry(102).or_default().insert(hash(102), header(102)); + queue + .known_headers + .entry(100) + .or_default() + .insert(hash(100), HeaderStatus::Submitted); + queue + .known_headers + .entry(101) + .or_default() + .insert(hash(101), HeaderStatus::Submitted); + queue + .known_headers + .entry(102) + .or_default() + .insert(hash(102), HeaderStatus::Ready); + + // both headers are accepted + queue.target_best_header_response(&id(101)); + + // but header 100 is incomplete + queue.incomplete_headers_response(vec![id(100)].into_iter().collect()); + assert_eq!(queue.status(&id(100)), HeaderStatus::Synced); + assert_eq!(queue.status(&id(101)), HeaderStatus::Synced); + assert_eq!(queue.status(&id(102)), HeaderStatus::Incomplete); + assert!(queue.incomplete_headers.contains_key(&id(100))); + assert!(queue.incomplete[&102].contains_key(&hash(102))); + + // when header 100 is completed, 101 is synced and 102 is ready + queue.incomplete_headers_response(HashSet::new()); + assert_eq!(queue.status(&id(100)), HeaderStatus::Synced); + assert_eq!(queue.status(&id(101)), HeaderStatus::Synced); + assert_eq!(queue.status(&id(102)), HeaderStatus::Ready); + assert!(queue.ready[&102].contains_key(&hash(102))); + } } diff --git a/relays/headers-relay/src/sync_loop.rs b/relays/headers-relay/src/sync_loop.rs index 14eb7e2a4140..c53a1ab0f082 100644 --- a/relays/headers-relay/src/sync_loop.rs +++ b/relays/headers-relay/src/sync_loop.rs @@ -277,7 +277,7 @@ pub fn run>( }, &mut target_go_offline_future, |delay| async_std::task::sleep(delay), - || format!("Error retrieving best known header from {} node", P::TARGET_NAME), + || format!("Error retrieving best known {} header from {} node", P::SOURCE_NAME, P::TARGET_NAME), ).is_ok(); }, incomplete_headers_ids = target_incomplete_headers_future => { diff --git a/relays/headers-relay/src/sync_types.rs b/relays/headers-relay/src/sync_types.rs index 0dcb712c9180..a910ce581c7e 100644 --- a/relays/headers-relay/src/sync_types.rs +++ b/relays/headers-relay/src/sync_types.rs @@ -43,7 +43,7 @@ pub enum HeaderStatus { } /// Headers synchronization pipeline. -pub trait HeadersSyncPipeline: Clone + Copy + Send + Sync { +pub trait HeadersSyncPipeline: Clone + Send + Sync { /// Name of the headers source. const SOURCE_NAME: &'static str; /// Name of the headers target. diff --git a/relays/millau-client/src/lib.rs b/relays/millau-client/src/lib.rs index 291954cbea9e..c1fadba024da 100644 --- a/relays/millau-client/src/lib.rs +++ b/relays/millau-client/src/lib.rs @@ -60,12 +60,18 @@ impl From for SyncHeader { } } +impl From for millau_runtime::Header { + fn from(header: SyncHeader) -> Self { + header.0 + } +} + impl SourceHeader for SyncHeader { fn id(&self) -> HeaderId { relay_utils::HeaderId(*self.number(), self.hash()) } fn parent_id(&self) -> HeaderId { - relay_utils::HeaderId(*self.number(), *self.parent_hash()) + relay_utils::HeaderId(*self.number() - 1, *self.parent_hash()) } } diff --git a/relays/rialto-client/src/lib.rs b/relays/rialto-client/src/lib.rs index b8601c3ccd0f..95382cf591d9 100644 --- a/relays/rialto-client/src/lib.rs +++ b/relays/rialto-client/src/lib.rs @@ -25,6 +25,8 @@ use sp_runtime::{ traits::{Header as HeaderT, IdentifyAccount}, }; +pub use rialto_runtime::BridgeMillauCall; + /// Rialto header id. pub type HeaderId = relay_utils::HeaderId; diff --git a/relays/substrate-client/src/client.rs b/relays/substrate-client/src/client.rs index ad6dad1a2a47..896b01477590 100644 --- a/relays/substrate-client/src/client.rs +++ b/relays/substrate-client/src/client.rs @@ -33,6 +33,9 @@ const SUB_API_GRANDPA_AUTHORITIES: &str = "GrandpaApi_grandpa_authorities"; pub type OpaqueGrandpaAuthoritiesSet = Vec; /// Substrate client type. +/// +/// Cloning Client is a cheap operation. +#[derive(Clone)] pub struct Client { /// Substrate RPC client. client: RpcClient, diff --git a/relays/substrate/Cargo.toml b/relays/substrate/Cargo.toml index c10923dff9ff..2e61699b7d93 100644 --- a/relays/substrate/Cargo.toml +++ b/relays/substrate/Cargo.toml @@ -16,6 +16,7 @@ structopt = "0.3" # Bridge dependencies +bp-millau = { path = "../../primitives/millau" } bp-rialto = { path = "../../primitives/rialto" } headers-relay = { path = "../headers-relay" } messages-relay = { path = "../messages-relay" } @@ -24,6 +25,7 @@ relay-rialto-client = { path = "../rialto-client" } relay-substrate-client = { path = "../substrate-client" } relay-utils = { path = "../utils" } -# Substrate dependencies +# Substrate Dependencies +sp-core = "2.0" sp-runtime = "2.0" diff --git a/relays/substrate/src/headers_target.rs b/relays/substrate/src/headers_target.rs new file mode 100644 index 000000000000..92bc017a72eb --- /dev/null +++ b/relays/substrate/src/headers_target.rs @@ -0,0 +1,168 @@ +// Copyright 2019-2020 Parity Technologies (UK) Ltd. +// This file is part of Parity Bridges Common. + +// Parity Bridges Common is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity Bridges Common is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity Bridges Common. If not, see . + +//! Substrate client as Substrate headers target. The chain we connect to should have +//! runtime that implements `HeaderApi` to allow bridging with +//! chain. + +use async_trait::async_trait; +use codec::{Decode, Encode}; +use futures::TryFutureExt; +use headers_relay::{ + sync_loop::TargetClient, + sync_types::{HeaderIdOf, HeadersSyncPipeline, QueuedHeader, SubmittedHeaders}, +}; +use relay_substrate_client::{Chain, Client, Error as SubstrateError}; +use relay_utils::HeaderId; +use sp_core::Bytes; +use sp_runtime::{DeserializeOwned, Justification}; +use std::collections::HashSet; + +/// Headers sync pipeline for Substrate <-> Substrate relays. +#[async_trait] +pub trait SubstrateHeadersSyncPipeline: HeadersSyncPipeline { + /// Name of the `best_block` runtime method. + const BEST_BLOCK_METHOD: &'static str; + /// Name of the `is_known_block` runtime method. + const IS_KNOWN_BLOCK_METHOD: &'static str; + /// Name of the `incomplete_headers` runtime method. + const INCOMPLETE_HEADERS_METHOD: &'static str; + + /// Signed transaction type. + type SignedTransaction: Send + Sync + Encode; + + /// Make submit header transaction. + async fn make_submit_header_transaction( + &self, + header: QueuedHeader, + ) -> Result; + + /// Make completion transaction for the header. + async fn make_complete_header_transaction( + &self, + id: HeaderIdOf, + completion: Justification, + ) -> Result; +} + +/// Substrate client as Substrate headers target. +pub struct SubstrateHeadersTarget { + client: Client, + pipeline: P, +} + +impl SubstrateHeadersTarget { + /// Create new Substrate headers target. + pub fn new(client: Client, pipeline: P) -> Self { + SubstrateHeadersTarget { client, pipeline } + } +} + +#[async_trait] +impl TargetClient

for SubstrateHeadersTarget +where + C: Chain, + C::Header: DeserializeOwned, + C::Index: DeserializeOwned, + P::Number: Decode, + P::Hash: Decode + Encode, + P: SubstrateHeadersSyncPipeline, +{ + type Error = SubstrateError; + + async fn best_header_id(&self) -> Result, Self::Error> { + let call = P::BEST_BLOCK_METHOD.into(); + let data = Bytes(Vec::new()); + + let encoded_response = self.client.state_call(call, data, None).await?; + let decoded_response: (P::Number, P::Hash) = + Decode::decode(&mut &encoded_response.0[..]).map_err(SubstrateError::ResponseParseFailed)?; + + let best_header_id = HeaderId(decoded_response.0, decoded_response.1); + Ok(best_header_id) + } + + async fn is_known_header(&self, id: HeaderIdOf

) -> Result<(HeaderIdOf

, bool), Self::Error> { + let call = P::IS_KNOWN_BLOCK_METHOD.into(); + let data = Bytes(id.1.encode()); + + let encoded_response = self.client.state_call(call, data, None).await?; + let is_known_block: bool = + Decode::decode(&mut &encoded_response.0[..]).map_err(SubstrateError::ResponseParseFailed)?; + + Ok((id, is_known_block)) + } + + async fn submit_headers(&self, mut headers: Vec>) -> SubmittedHeaders, Self::Error> { + debug_assert_eq!( + headers.len(), + 1, + "Substrate pallet only supports single header / transaction" + ); + + let header = headers.remove(0); + let id = header.id(); + let submit_transaction_result = self + .pipeline + .make_submit_header_transaction(header) + .and_then(|tx| self.client.submit_extrinsic(Bytes(tx.encode()))) + .await; + + match submit_transaction_result { + Ok(_) => SubmittedHeaders { + submitted: vec![id], + incomplete: Vec::new(), + rejected: Vec::new(), + fatal_error: None, + }, + Err(error) => SubmittedHeaders { + submitted: Vec::new(), + incomplete: Vec::new(), + rejected: vec![id], + fatal_error: Some(error), + }, + } + } + + async fn incomplete_headers_ids(&self) -> Result>, Self::Error> { + let call = P::INCOMPLETE_HEADERS_METHOD.into(); + let data = Bytes(Vec::new()); + + let encoded_response = self.client.state_call(call, data, None).await?; + let decoded_response: Vec<(P::Number, P::Hash)> = + Decode::decode(&mut &encoded_response.0[..]).map_err(SubstrateError::ResponseParseFailed)?; + + let incomplete_headers = decoded_response + .into_iter() + .map(|(number, hash)| HeaderId(number, hash)) + .collect(); + Ok(incomplete_headers) + } + + async fn complete_header( + &self, + id: HeaderIdOf

, + completion: Justification, + ) -> Result, Self::Error> { + let tx = self.pipeline.make_complete_header_transaction(id, completion).await?; + self.client.submit_extrinsic(Bytes(tx.encode())).await?; + Ok(id) + } + + async fn requires_extra(&self, header: QueuedHeader

) -> Result<(HeaderIdOf

, bool), Self::Error> { + Ok((header.id(), false)) + } +} diff --git a/relays/substrate/src/main.rs b/relays/substrate/src/main.rs index 743827ab8d87..ad77a13eea9b 100644 --- a/relays/substrate/src/main.rs +++ b/relays/substrate/src/main.rs @@ -28,6 +28,7 @@ pub type MillauClient = relay_substrate_client::Client; mod cli; +mod headers_target; mod millau_headers_to_rialto; fn main() { diff --git a/relays/substrate/src/millau_headers_to_rialto.rs b/relays/substrate/src/millau_headers_to_rialto.rs index f7064c51bf11..67ac6d6b94e7 100644 --- a/relays/substrate/src/millau_headers_to_rialto.rs +++ b/relays/substrate/src/millau_headers_to_rialto.rs @@ -16,24 +16,33 @@ //! Millau-to-Rialto headers sync entrypoint. -use crate::{MillauClient, RialtoClient}; +use crate::{ + headers_target::{SubstrateHeadersSyncPipeline, SubstrateHeadersTarget}, + MillauClient, RialtoClient, +}; use async_trait::async_trait; +use bp_millau::{BEST_MILLAU_BLOCK_METHOD, INCOMPLETE_MILLAU_HEADERS_METHOD, IS_KNOWN_MILLAU_BLOCK_METHOD}; use codec::Encode; use headers_relay::{ sync::{HeadersSyncParams, TargetTransactionMode}, - sync_loop::TargetClient, - sync_types::{HeadersSyncPipeline, QueuedHeader, SubmittedHeaders}, + sync_types::{HeadersSyncPipeline, QueuedHeader}, }; use relay_millau_client::{HeaderId as MillauHeaderId, Millau, SyncHeader as MillauSyncHeader}; -use relay_rialto_client::SigningParams as RialtoSigningParams; -use relay_substrate_client::{headers_source::HeadersSource, BlockNumberOf, Error as SubstrateError, HashOf}; +use relay_rialto_client::{BridgeMillauCall, Rialto, SigningParams as RialtoSigningParams}; +use relay_substrate_client::{ + headers_source::HeadersSource, BlockNumberOf, Error as SubstrateError, HashOf, TransactionSignScheme, +}; +use sp_core::Pair; use sp_runtime::Justification; -use std::{collections::HashSet, time::Duration}; +use std::time::Duration; /// Millau-to-Rialto headers pipeline. -#[derive(Debug, Clone, Copy)] -struct MillauHeadersToRialto; +#[derive(Debug, Clone)] +struct MillauHeadersToRialto { + client: RialtoClient, + sign: RialtoSigningParams, +} impl HeadersSyncPipeline for MillauHeadersToRialto { const SOURCE_NAME: &'static str = "Millau"; @@ -50,51 +59,46 @@ impl HeadersSyncPipeline for MillauHeadersToRialto { } } -/// Millau header in-the-queue. -type QueuedMillauHeader = QueuedHeader; - -/// Millau node as headers source. -type MillauSourceClient = HeadersSource; - -/// Rialto node as headers target. -struct RialtoTargetClient { - _client: RialtoClient, - _sign: RialtoSigningParams, -} - #[async_trait] -impl TargetClient for RialtoTargetClient { - type Error = SubstrateError; +impl SubstrateHeadersSyncPipeline for MillauHeadersToRialto { + const BEST_BLOCK_METHOD: &'static str = BEST_MILLAU_BLOCK_METHOD; + const IS_KNOWN_BLOCK_METHOD: &'static str = IS_KNOWN_MILLAU_BLOCK_METHOD; + const INCOMPLETE_HEADERS_METHOD: &'static str = INCOMPLETE_MILLAU_HEADERS_METHOD; - async fn best_header_id(&self) -> Result { - unimplemented!("https://github.com/paritytech/parity-bridges-common/issues/209") - } + type SignedTransaction = ::SignedTransaction; - async fn is_known_header(&self, _id: MillauHeaderId) -> Result<(MillauHeaderId, bool), Self::Error> { - unimplemented!("https://github.com/paritytech/parity-bridges-common/issues/209") + async fn make_submit_header_transaction( + &self, + header: QueuedMillauHeader, + ) -> Result { + let account_id = self.sign.signer.public().as_array_ref().clone().into(); + let nonce = self.client.next_account_index(account_id).await?; + let call = BridgeMillauCall::import_signed_header(header.header().clone().into()).into(); + let transaction = Rialto::sign_transaction(&self.client, &self.sign.signer, nonce, call); + Ok(transaction) } - async fn submit_headers(&self, _headers: Vec) -> SubmittedHeaders { - unimplemented!("https://github.com/paritytech/parity-bridges-common/issues/209") + async fn make_complete_header_transaction( + &self, + id: MillauHeaderId, + completion: Justification, + ) -> Result { + let account_id = self.sign.signer.public().as_array_ref().clone().into(); + let nonce = self.client.next_account_index(account_id).await?; + let call = BridgeMillauCall::finalize_header(id.1, completion).into(); + let transaction = Rialto::sign_transaction(&self.client, &self.sign.signer, nonce, call); + Ok(transaction) } +} - async fn incomplete_headers_ids(&self) -> Result, Self::Error> { - unimplemented!("https://github.com/paritytech/parity-bridges-common/issues/209") - } +/// Millau header in-the-queue. +type QueuedMillauHeader = QueuedHeader; - #[allow(clippy::unit_arg)] - async fn complete_header( - &self, - _id: MillauHeaderId, - _completion: Justification, - ) -> Result { - unimplemented!("https://github.com/paritytech/parity-bridges-common/issues/209") - } +/// Millau node as headers source. +type MillauSourceClient = HeadersSource; - async fn requires_extra(&self, _header: QueuedMillauHeader) -> Result<(MillauHeaderId, bool), Self::Error> { - unimplemented!("https://github.com/paritytech/parity-bridges-common/issues/209") - } -} +/// Rialto node as headers target. +type RialtoTargetClient = SubstrateHeadersTarget; /// Run Millau-to-Rialto headers sync. pub fn run( @@ -107,8 +111,8 @@ pub fn run( let rialto_tick = Duration::from_secs(5); let sync_params = HeadersSyncParams { max_future_headers_to_download: 32, - max_headers_in_submitted_status: 1024, - max_headers_in_single_submit: 8, + max_headers_in_submitted_status: 8, + max_headers_in_single_submit: 1, max_headers_size_in_single_submit: 1024 * 1024, prune_depth: 256, target_tx_mode: TargetTransactionMode::Signed, @@ -117,10 +121,13 @@ pub fn run( headers_relay::sync_loop::run( MillauSourceClient::new(millau_client), millau_tick, - RialtoTargetClient { - _client: rialto_client, - _sign: rialto_sign, - }, + RialtoTargetClient::new( + rialto_client.clone(), + MillauHeadersToRialto { + client: rialto_client, + sign: rialto_sign, + }, + ), rialto_tick, sync_params, metrics_params,