Skip to content

Commit

Permalink
[base-node] Optimise pruned UTXO sync streaming protocol
Browse files Browse the repository at this point in the history
Streams a pruned UTXO set to a client. Each block boundary (given by the
header) is punctuated by a deleted MMR difference bitmap.

Replaces the previous sync_rpc method, which has been removed and so,
this PR is not backward-compatible.
  • Loading branch information
sdbondi committed Apr 19, 2021
1 parent 12b44f5 commit fec72ad
Show file tree
Hide file tree
Showing 17 changed files with 481 additions and 441 deletions.
19 changes: 13 additions & 6 deletions applications/tari_base_node/src/command_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,19 +214,26 @@ impl CommandHandler {
let blockchain = self.blockchain_db.clone();
self.executor.spawn(async move {
match blockchain.fetch_blocks(height..=height).await {
Err(err) => {
println!("Failed to retrieve blocks: {}", err);
warn!(target: LOG_TARGET, "{}", err);
return;
},
Ok(mut data) => match (data.pop(), format) {
(Some(block), Format::Text) => println!("{}", block),
(Some(block), Format::Text) => {
let block_data =
try_or_print!(blockchain.fetch_block_accumulated_data(block.hash().clone()).await);

println!("{}", block);
println!("-- Accumulated data --");
println!("{}", block_data);
},
(Some(block), Format::Json) => println!(
"{}",
block.to_json().unwrap_or_else(|_| "Error deserializing block".into())
),
(None, _) => println!("Block not found at height {}", height),
},
Err(err) => {
println!("Failed to retrieve blocks: {}", err);
warn!(target: LOG_TARGET, "{}", err);
return;
},
};
});
}
Expand Down
28 changes: 3 additions & 25 deletions base_layer/core/src/base_node/proto/rpc.proto
Original file line number Diff line number Diff line change
Expand Up @@ -55,37 +55,15 @@ message SyncUtxosRequest {
bool include_pruned_utxos = 3;
bool include_deleted_bitmaps = 4;
}

message SyncUtxosResponse {
repeated SyncUtxo utxos = 1;
// present if a utxo in utxos is the last in a block so that the merkle root can be
// checked
repeated bytes deleted_bitmaps = 2;
}

message SyncUtxo {
// The output. optional, if deleted at the time of the requested height,
// will be empty and `hash` and `rangeproof_hash` will be populated instead
tari.types.TransactionOutput output = 1;
// Only present if `output` is empty
bytes hash = 2;
// Only present if `output` is empty
bytes rangeproof_hash = 3;
}

message SyncUtxos2Response {
oneof utxo_or_deleted {
SyncUtxo2 utxo = 1;
Bitmaps deleted_bitmaps = 2;
SyncUtxo utxo = 1;
bytes deleted_diff = 2;
}
uint64 mmr_index = 3;
}

message Bitmaps {
repeated bytes bitmaps = 1;
}

message SyncUtxo2 {
message SyncUtxo {
oneof utxo {
// The unspent transaction output
tari.types.TransactionOutput output = 1;
Expand Down
10 changes: 5 additions & 5 deletions base_layer/core/src/base_node/proto/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,20 +31,20 @@ impl From<Block> for proto::BlockBodyResponse {
}
}

impl From<PrunedOutput> for proto::SyncUtxo2 {
impl From<PrunedOutput> for proto::SyncUtxo {
fn from(output: PrunedOutput) -> Self {
match output {
PrunedOutput::Pruned {
output_hash,
range_proof_hash,
} => proto::SyncUtxo2 {
utxo: Some(proto::sync_utxo2::Utxo::PrunedOutput(proto::PrunedOutput {
} => proto::SyncUtxo {
utxo: Some(proto::sync_utxo::Utxo::PrunedOutput(proto::PrunedOutput {
hash: output_hash,
rangeproof_hash: range_proof_hash,
})),
},
PrunedOutput::NotPruned { output } => proto::SyncUtxo2 {
utxo: Some(proto::sync_utxo2::Utxo::Output(output.into())),
PrunedOutput::NotPruned { output } => proto::SyncUtxo {
utxo: Some(proto::sync_utxo::Utxo::Output(output.into())),
},
}
}
Expand Down
18 changes: 9 additions & 9 deletions base_layer/core/src/base_node/proto/wallet_rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,27 +228,27 @@ impl TryFrom<proto::TxQueryBatchResponse> for TxQueryBatchResponse {
}
}

impl proto::SyncUtxos2Response {
pub fn into_utxo(self) -> Option<proto::SyncUtxo2> {
use proto::sync_utxos2_response::UtxoOrDeleted::*;
impl proto::SyncUtxosResponse {
pub fn into_utxo(self) -> Option<proto::SyncUtxo> {
use proto::sync_utxos_response::UtxoOrDeleted::*;
match self.utxo_or_deleted? {
Utxo(utxo) => Some(utxo),
DeletedBitmaps(_) => None,
DeletedDiff(_) => None,
}
}

pub fn into_bitmaps(self) -> Option<proto::Bitmaps> {
use proto::sync_utxos2_response::UtxoOrDeleted::*;
pub fn into_bitmap(self) -> Option<Vec<u8>> {
use proto::sync_utxos_response::UtxoOrDeleted::*;
match self.utxo_or_deleted? {
Utxo(_) => None,
DeletedBitmaps(bitmaps) => Some(bitmaps),
DeletedDiff(bitmap) => Some(bitmap),
}
}
}

impl proto::sync_utxo2::Utxo {
impl proto::sync_utxo::Utxo {
pub fn into_transaction_output(self) -> Option<types::TransactionOutput> {
use proto::sync_utxo2::Utxo::*;
use proto::sync_utxo::Utxo::*;
match self {
Output(output) => Some(output),
PrunedOutput(_) => None,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ impl<B: BlockchainBackend + 'static> BaseNodeStateMachine<B> {
}

/// This function will publish the current StatusInfo to the channel
pub fn publish_event_info(&mut self) {
pub fn publish_event_info(&self) {
let status = StatusInfo {
bootstrapped: self.is_bootstrapped(),
state_info: self.info.clone(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use crate::{
transactions::transaction::TransactionError,
validation::ValidationError,
};
use std::num::TryFromIntError;
use tari_comms::protocol::rpc::{RpcError, RpcStatus};
use tari_mmr::error::MerkleMountainRangeError;
use thiserror::Error;
Expand All @@ -47,7 +48,7 @@ pub enum HorizonSyncError {
JoinError(#[from] task::JoinError),
#[error("Invalid kernel signature: {0}")]
InvalidKernelSignature(TransactionError),
#[error("MMR did not match for {mmr_tree} at height {at_height}. {expected_hex} did not equal {actual_hex}")]
#[error("MMR did not match for {mmr_tree} at height {at_height}. Expected {actual_hex} to equal {expected_hex}")]
InvalidMmrRoot {
mmr_tree: MmrTree,
at_height: u64,
Expand All @@ -67,3 +68,9 @@ pub enum HorizonSyncError {
#[error("MerkleMountainRangeError: {0}")]
MerkleMountainRangeError(#[from] MerkleMountainRangeError),
}

impl From<TryFromIntError> for HorizonSyncError {
fn from(err: TryFromIntError) -> Self {
HorizonSyncError::ConversionError(err.to_string())
}
}
Loading

0 comments on commit fec72ad

Please sign in to comment.