From fec72ad38eb6822d13ec46c91796979bbebb41e1 Mon Sep 17 00:00:00 2001 From: Stanimal Date: Thu, 15 Apr 2021 17:26:06 +0400 Subject: [PATCH] [base-node] Optimise pruned UTXO sync streaming protocol Streams a pruned UTXO set to a client. Each block boundary (given by the header) is punctuated by a deleted MMR difference bitmap. Replaces the previous sync_rpc method, which has been removed and so, this PR is not backward-compatible. --- .../tari_base_node/src/command_handler.rs | 19 +- base_layer/core/src/base_node/proto/rpc.proto | 28 +- base_layer/core/src/base_node/proto/rpc.rs | 10 +- .../core/src/base_node/proto/wallet_rpc.rs | 18 +- .../state_machine_service/state_machine.rs | 2 +- .../states/horizon_state_sync/error.rs | 9 +- .../horizon_state_synchronization.rs | 236 +++++++----- base_layer/core/src/base_node/sync/rpc/mod.rs | 9 +- .../core/src/base_node/sync/rpc/service.rs | 342 ++++++++---------- .../src/chain_storage/accumulated_data.rs | 20 +- base_layer/core/src/chain_storage/async_db.rs | 2 +- .../src/chain_storage/blockchain_backend.rs | 2 +- .../src/chain_storage/blockchain_database.rs | 2 +- .../core/src/chain_storage/lmdb_db/lmdb_db.rs | 174 ++++----- .../core/src/test_helpers/blockchain.rs | 2 +- .../wallet/src/tasks/wallet_recovery.rs | 41 +-- comms/src/protocol/rpc/status.rs | 6 +- 17 files changed, 481 insertions(+), 441 deletions(-) diff --git a/applications/tari_base_node/src/command_handler.rs b/applications/tari_base_node/src/command_handler.rs index 6ccdce838d..9dcb4f1924 100644 --- a/applications/tari_base_node/src/command_handler.rs +++ b/applications/tari_base_node/src/command_handler.rs @@ -214,19 +214,26 @@ impl CommandHandler { let blockchain = self.blockchain_db.clone(); self.executor.spawn(async move { match blockchain.fetch_blocks(height..=height).await { - Err(err) => { - println!("Failed to retrieve blocks: {}", err); - warn!(target: LOG_TARGET, "{}", err); - return; - }, Ok(mut data) => match (data.pop(), format) { - (Some(block), Format::Text) => println!("{}", block), + (Some(block), Format::Text) => { + let block_data = + try_or_print!(blockchain.fetch_block_accumulated_data(block.hash().clone()).await); + + println!("{}", block); + println!("-- Accumulated data --"); + println!("{}", block_data); + }, (Some(block), Format::Json) => println!( "{}", block.to_json().unwrap_or_else(|_| "Error deserializing block".into()) ), (None, _) => println!("Block not found at height {}", height), }, + Err(err) => { + println!("Failed to retrieve blocks: {}", err); + warn!(target: LOG_TARGET, "{}", err); + return; + }, }; }); } diff --git a/base_layer/core/src/base_node/proto/rpc.proto b/base_layer/core/src/base_node/proto/rpc.proto index 37bdf14301..66a44247bb 100644 --- a/base_layer/core/src/base_node/proto/rpc.proto +++ b/base_layer/core/src/base_node/proto/rpc.proto @@ -55,37 +55,15 @@ message SyncUtxosRequest { bool include_pruned_utxos = 3; bool include_deleted_bitmaps = 4; } - message SyncUtxosResponse { - repeated SyncUtxo utxos = 1; - // present if a utxo in utxos is the last in a block so that the merkle root can be - // checked - repeated bytes deleted_bitmaps = 2; -} - -message SyncUtxo { - // The output. optional, if deleted at the time of the requested height, - // will be empty and `hash` and `rangeproof_hash` will be populated instead - tari.types.TransactionOutput output = 1; - // Only present if `output` is empty - bytes hash = 2; - // Only present if `output` is empty - bytes rangeproof_hash = 3; -} - -message SyncUtxos2Response { oneof utxo_or_deleted { - SyncUtxo2 utxo = 1; - Bitmaps deleted_bitmaps = 2; + SyncUtxo utxo = 1; + bytes deleted_diff = 2; } uint64 mmr_index = 3; } -message Bitmaps { - repeated bytes bitmaps = 1; -} - -message SyncUtxo2 { +message SyncUtxo { oneof utxo { // The unspent transaction output tari.types.TransactionOutput output = 1; diff --git a/base_layer/core/src/base_node/proto/rpc.rs b/base_layer/core/src/base_node/proto/rpc.rs index 8f12ce8f74..ccd3db5b73 100644 --- a/base_layer/core/src/base_node/proto/rpc.rs +++ b/base_layer/core/src/base_node/proto/rpc.rs @@ -31,20 +31,20 @@ impl From for proto::BlockBodyResponse { } } -impl From for proto::SyncUtxo2 { +impl From for proto::SyncUtxo { fn from(output: PrunedOutput) -> Self { match output { PrunedOutput::Pruned { output_hash, range_proof_hash, - } => proto::SyncUtxo2 { - utxo: Some(proto::sync_utxo2::Utxo::PrunedOutput(proto::PrunedOutput { + } => proto::SyncUtxo { + utxo: Some(proto::sync_utxo::Utxo::PrunedOutput(proto::PrunedOutput { hash: output_hash, rangeproof_hash: range_proof_hash, })), }, - PrunedOutput::NotPruned { output } => proto::SyncUtxo2 { - utxo: Some(proto::sync_utxo2::Utxo::Output(output.into())), + PrunedOutput::NotPruned { output } => proto::SyncUtxo { + utxo: Some(proto::sync_utxo::Utxo::Output(output.into())), }, } } diff --git a/base_layer/core/src/base_node/proto/wallet_rpc.rs b/base_layer/core/src/base_node/proto/wallet_rpc.rs index 17fe5ccad0..64778b0b9a 100644 --- a/base_layer/core/src/base_node/proto/wallet_rpc.rs +++ b/base_layer/core/src/base_node/proto/wallet_rpc.rs @@ -228,27 +228,27 @@ impl TryFrom for TxQueryBatchResponse { } } -impl proto::SyncUtxos2Response { - pub fn into_utxo(self) -> Option { - use proto::sync_utxos2_response::UtxoOrDeleted::*; +impl proto::SyncUtxosResponse { + pub fn into_utxo(self) -> Option { + use proto::sync_utxos_response::UtxoOrDeleted::*; match self.utxo_or_deleted? { Utxo(utxo) => Some(utxo), - DeletedBitmaps(_) => None, + DeletedDiff(_) => None, } } - pub fn into_bitmaps(self) -> Option { - use proto::sync_utxos2_response::UtxoOrDeleted::*; + pub fn into_bitmap(self) -> Option> { + use proto::sync_utxos_response::UtxoOrDeleted::*; match self.utxo_or_deleted? { Utxo(_) => None, - DeletedBitmaps(bitmaps) => Some(bitmaps), + DeletedDiff(bitmap) => Some(bitmap), } } } -impl proto::sync_utxo2::Utxo { +impl proto::sync_utxo::Utxo { pub fn into_transaction_output(self) -> Option { - use proto::sync_utxo2::Utxo::*; + use proto::sync_utxo::Utxo::*; match self { Output(output) => Some(output), PrunedOutput(_) => None, diff --git a/base_layer/core/src/base_node/state_machine_service/state_machine.rs b/base_layer/core/src/base_node/state_machine_service/state_machine.rs index a5513463c4..0254e98d97 100644 --- a/base_layer/core/src/base_node/state_machine_service/state_machine.rs +++ b/base_layer/core/src/base_node/state_machine_service/state_machine.rs @@ -153,7 +153,7 @@ impl BaseNodeStateMachine { } /// This function will publish the current StatusInfo to the channel - pub fn publish_event_info(&mut self) { + pub fn publish_event_info(&self) { let status = StatusInfo { bootstrapped: self.is_bootstrapped(), state_info: self.info.clone(), diff --git a/base_layer/core/src/base_node/state_machine_service/states/horizon_state_sync/error.rs b/base_layer/core/src/base_node/state_machine_service/states/horizon_state_sync/error.rs index d5714925e2..4669bdce09 100644 --- a/base_layer/core/src/base_node/state_machine_service/states/horizon_state_sync/error.rs +++ b/base_layer/core/src/base_node/state_machine_service/states/horizon_state_sync/error.rs @@ -26,6 +26,7 @@ use crate::{ transactions::transaction::TransactionError, validation::ValidationError, }; +use std::num::TryFromIntError; use tari_comms::protocol::rpc::{RpcError, RpcStatus}; use tari_mmr::error::MerkleMountainRangeError; use thiserror::Error; @@ -47,7 +48,7 @@ pub enum HorizonSyncError { JoinError(#[from] task::JoinError), #[error("Invalid kernel signature: {0}")] InvalidKernelSignature(TransactionError), - #[error("MMR did not match for {mmr_tree} at height {at_height}. {expected_hex} did not equal {actual_hex}")] + #[error("MMR did not match for {mmr_tree} at height {at_height}. Expected {actual_hex} to equal {expected_hex}")] InvalidMmrRoot { mmr_tree: MmrTree, at_height: u64, @@ -67,3 +68,9 @@ pub enum HorizonSyncError { #[error("MerkleMountainRangeError: {0}")] MerkleMountainRangeError(#[from] MerkleMountainRangeError), } + +impl From for HorizonSyncError { + fn from(err: TryFromIntError) -> Self { + HorizonSyncError::ConversionError(err.to_string()) + } +} diff --git a/base_layer/core/src/base_node/state_machine_service/states/horizon_state_sync/horizon_state_synchronization.rs b/base_layer/core/src/base_node/state_machine_service/states/horizon_state_sync/horizon_state_synchronization.rs index d329bd3404..6a2e7f5a97 100644 --- a/base_layer/core/src/base_node/state_machine_service/states/horizon_state_sync/horizon_state_synchronization.rs +++ b/base_layer/core/src/base_node/state_machine_service/states/horizon_state_sync/horizon_state_synchronization.rs @@ -31,16 +31,23 @@ use crate::{ }, blocks::BlockHeader, chain_storage::{async_db::AsyncBlockchainDb, BlockchainBackend, ChainStorageError, MmrTree, PrunedOutput}, - proto::base_node::{SyncKernelsRequest, SyncUtxosRequest, SyncUtxosResponse}, + proto::base_node::{ + sync_utxo as proto_sync_utxo, + sync_utxos_response::UtxoOrDeleted, + SyncKernelsRequest, + SyncUtxo, + SyncUtxosRequest, + SyncUtxosResponse, + }, transactions::{ transaction::{TransactionKernel, TransactionOutput}, - types::{HashDigest, HashOutput, RangeProofService}, + types::{HashDigest, RangeProofService}, }, }; use croaring::Bitmap; use futures::StreamExt; use log::*; -use std::convert::TryInto; +use std::convert::{TryFrom, TryInto}; use tari_comms::PeerConnection; use tari_crypto::{ commitment::HomomorphicCommitment, @@ -90,7 +97,9 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> { } })?; - match self.begin_sync(&header).await { + let mut client = self.sync_peer.connect_rpc::().await?; + + match self.begin_sync(&mut client, &header).await { Ok(_) => match self.finalize_horizon_sync().await { Ok(_) => Ok(()), Err(err) => { @@ -105,15 +114,25 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> { } } - async fn begin_sync(&mut self, to_header: &BlockHeader) -> Result<(), HorizonSyncError> { + async fn begin_sync( + &mut self, + client: &mut rpc::BaseNodeSyncRpcClient, + to_header: &BlockHeader, + ) -> Result<(), HorizonSyncError> + { debug!(target: LOG_TARGET, "Synchronizing kernels"); - self.synchronize_kernels(to_header).await?; + self.synchronize_kernels(client, to_header).await?; debug!(target: LOG_TARGET, "Synchronizing outputs"); - self.synchronize_outputs(to_header).await?; + self.synchronize_outputs(client, to_header).await?; Ok(()) } - async fn synchronize_kernels(&mut self, to_header: &BlockHeader) -> Result<(), HorizonSyncError> { + async fn synchronize_kernels( + &mut self, + client: &mut rpc::BaseNodeSyncRpcClient, + to_header: &BlockHeader, + ) -> Result<(), HorizonSyncError> + { let local_num_kernels = self.db().fetch_mmr_size(MmrTree::Kernel).await?; let remote_num_kernels = to_header.kernel_mmr_size; @@ -138,12 +157,6 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> { remote_num_kernels - local_num_kernels, ); - self.sync_kernel_nodes(local_num_kernels, remote_num_kernels, to_header.hash()) - .await - } - - async fn sync_kernel_nodes(&mut self, start: u64, end: u64, end_hash: HashOutput) -> Result<(), HorizonSyncError> { - let mut client = self.sync_peer.connect_rpc::().await?; let latency = client.get_last_request_latency().await?; debug!( target: LOG_TARGET, @@ -153,21 +166,25 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> { latency.unwrap_or_default().as_millis() ); + let start = local_num_kernels; + let end = remote_num_kernels; + let end_hash = to_header.hash(); + let req = SyncKernelsRequest { start, end_header_hash: end_hash, }; let mut kernel_stream = client.sync_kernels(req).await?; - let mut current_header = self.shared.db.fetch_header_containing_kernel_mmr(start + 1).await?; + let mut current_header = self.db().fetch_header_containing_kernel_mmr(start + 1).await?; debug!( target: LOG_TARGET, - "Found current header in progress for kernels at mmr pos: {} height: {}", + "Found header for kernels at mmr pos: {} height: {}", start, current_header.height() ); let mut kernels = vec![]; - let db = self.shared.db.clone(); + let db = self.db().clone(); let mut txn = db.write_transaction(); let mut mmr_position = start; while let Some(kernel) = kernel_stream.next().await { @@ -181,14 +198,12 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> { if mmr_position == current_header.header.kernel_mmr_size - 1 { debug!( target: LOG_TARGET, - "Checking header {}, added {} kernels", + "Header #{} ({} kernels)", current_header.header.height, kernels.len() ); // Validate root - let block_data = self - .shared - .db + let block_data = db .fetch_block_accumulated_data(current_header.header.prev_hash.clone()) .await?; let kernel_pruned_set = block_data.dissolve().0; @@ -218,7 +233,7 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> { txn.commit().await?; if mmr_position < end - 1 { - current_header = self.shared.db.fetch_chain_header(current_header.height() + 1).await?; + current_header = db.fetch_chain_header(current_header.height() + 1).await?; } } mmr_position += 1; @@ -240,7 +255,12 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> { Ok(()) } - async fn synchronize_outputs(&mut self, to_header: &BlockHeader) -> Result<(), HorizonSyncError> { + async fn synchronize_outputs( + &mut self, + client: &mut rpc::BaseNodeSyncRpcClient, + to_header: &BlockHeader, + ) -> Result<(), HorizonSyncError> + { let local_num_outputs = self.db().fetch_mmr_size(MmrTree::Utxo).await?; let remote_num_outputs = to_header.output_mmr_size; @@ -265,12 +285,10 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> { remote_num_outputs - local_num_outputs, ); - self.sync_output_nodes(local_num_outputs, remote_num_outputs, to_header.hash()) - .await - } + let start = local_num_outputs; + let end = remote_num_outputs; + let end_hash = to_header.hash(); - async fn sync_output_nodes(&mut self, start: u64, end: u64, end_hash: HashOutput) -> Result<(), HorizonSyncError> { - let mut client = self.sync_peer.connect_rpc::().await?; let latency = client.get_last_request_latency().await?; debug!( target: LOG_TARGET, @@ -287,39 +305,78 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> { }; let mut output_stream = client.sync_utxos(req).await?; - let mut current_header = self.shared.db.fetch_header_containing_utxo_mmr(start + 1).await?; + let mut current_header = self.db().fetch_header_containing_utxo_mmr(start + 1).await?; debug!( target: LOG_TARGET, - "Found current header in progress for utxos at mmr pos: {} height:{}", - start, + "Found header for utxos at mmr pos: {} - {} height: {}", + start + 1, + current_header.header.output_mmr_size, current_header.height() ); + + let db = self.db().clone(); + let mut output_hashes = vec![]; let mut rp_hashes = vec![]; - let db = self.shared.db.clone(); let mut txn = db.write_transaction(); let mut unpruned_outputs = vec![]; let mut mmr_position = start; - let mut height_utxo_counter = 0; - let mut height_txo_counter = 0; + let mut height_utxo_counter = 0u64; + let mut height_txo_counter = 0u64; + + let block_data = db + .fetch_block_accumulated_data(current_header.header.prev_hash.clone()) + .await?; + let (_, output_pruned_set, rp_pruned_set, mut deleted) = block_data.dissolve(); + + let mut output_mmr = MerkleMountainRange::::new(output_pruned_set); + let mut proof_mmr = MerkleMountainRange::::new(rp_pruned_set); + while let Some(response) = output_stream.next().await { let res: SyncUtxosResponse = response?; - debug!( - target: LOG_TARGET, - "UTXOs response received from sync peer: ({} outputs, {} deleted bitmaps)", - res.utxos.len(), - res.deleted_bitmaps.len() - ); - let (utxos, mut deleted_bitmaps) = (res.utxos, res.deleted_bitmaps.into_iter()); - for utxo in utxos { - if let Some(output) = utxo.output { + + if res.mmr_index > 0 && res.mmr_index != mmr_position { + return Err(HorizonSyncError::IncorrectResponse(format!( + "Expected MMR position of {} but got {}", + mmr_position, res.mmr_index, + ))); + } + + let txo = res + .utxo_or_deleted + .ok_or_else(|| HorizonSyncError::IncorrectResponse("Peer sent no transaction output data".into()))?; + + match txo { + UtxoOrDeleted::Utxo(SyncUtxo { + utxo: Some(proto_sync_utxo::Utxo::Output(output)), + }) => { + trace!( + target: LOG_TARGET, + "UTXO {} received from sync peer for header #{}", + res.mmr_index, + current_header.height() + ); height_utxo_counter += 1; - let output: TransactionOutput = output.try_into().map_err(HorizonSyncError::ConversionError)?; + let output = TransactionOutput::try_from(output).map_err(HorizonSyncError::ConversionError)?; output_hashes.push(output.hash()); rp_hashes.push(output.proof().hash()); unpruned_outputs.push(output.clone()); - txn.insert_output_via_horizon_sync(output, current_header.hash().clone(), mmr_position as u32); - } else { + txn.insert_output_via_horizon_sync( + output, + current_header.hash().clone(), + u32::try_from(mmr_position)?, + ); + mmr_position += 1; + }, + UtxoOrDeleted::Utxo(SyncUtxo { + utxo: Some(proto_sync_utxo::Utxo::PrunedOutput(utxo)), + }) => { + trace!( + target: LOG_TARGET, + "UTXO {} (pruned) received from sync peer for header #{}", + res.mmr_index, + current_header.height() + ); height_txo_counter += 1; output_hashes.push(utxo.hash.clone()); rp_hashes.push(utxo.rangeproof_hash.clone()); @@ -327,31 +384,31 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> { utxo.hash, utxo.rangeproof_hash, current_header.hash().clone(), - mmr_position as u32, + u32::try_from(mmr_position)?, ); - } + mmr_position += 1; + }, + UtxoOrDeleted::DeletedDiff(diff_bitmap) => { + if mmr_position != current_header.header.output_mmr_size { + return Err(HorizonSyncError::IncorrectResponse(format!( + "Peer unexpectedly sent a deleted bitmap. Expected at MMR index {} but it was sent at {}", + current_header.header.output_mmr_size, mmr_position + ))); + } - if mmr_position == current_header.header.output_mmr_size - 1 { - trace!( + debug!( target: LOG_TARGET, - "Checking header {}, added {} utxos, added {} txos)", + "UTXO: {} (Header #{}), added {} utxos, added {} txos", + mmr_position, current_header.header.height, height_utxo_counter, height_txo_counter ); + height_txo_counter = 0; height_utxo_counter = 0; - // Validate root - let block_data = self - .shared - .db - .fetch_block_accumulated_data(current_header.header.prev_hash.clone()) - .await?; - - let (_, output_pruned_set, rp_pruned_set, deleted) = block_data.dissolve(); - let mut output_mmr = MerkleMountainRange::::new(output_pruned_set); - let mut proof_mmr = MerkleMountainRange::::new(rp_pruned_set); + // Validate root for hash in output_hashes.drain(..) { output_mmr.push(hash)?; } @@ -360,18 +417,13 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> { proof_mmr.push(hash)?; } - let deleted_diff = deleted_bitmaps.next(); - if deleted_diff.is_none() { - return Err(HorizonSyncError::IncorrectResponse(format!( - "No deleted bitmap was provided for the header at height:{}", - current_header.height() - ))); - } + // Add in the changes + let bitmap = Bitmap::deserialize(&diff_bitmap); + deleted.or_inplace(&bitmap); + deleted.run_optimize(); - let bitmap = Bitmap::deserialize(&deleted_diff.unwrap()); - let deleted = deleted.or(&bitmap); let pruned_output_set = output_mmr.get_pruned_hash_set()?; - let output_mmr = MutableMmr::::new(pruned_output_set.clone(), deleted)?; + let output_mmr = MutableMmr::::new(pruned_output_set.clone(), deleted.clone())?; let mmr_root = output_mmr.get_merkle_root()?; if mmr_root != current_header.header.output_mr { @@ -382,6 +434,7 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> { actual_hex: mmr_root.to_hex(), }); } + let mmr_root = proof_mmr.get_merkle_root()?; if mmr_root != current_header.header.range_proof_mr { return Err(HorizonSyncError::InvalidMmrRoot { @@ -391,6 +444,7 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> { actual_hex: mmr_root.to_hex(), }); } + // Validate rangeproofs if the MMR matches for o in unpruned_outputs.drain(..) { o.verify_range_proof(self.prover) @@ -406,21 +460,31 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> { txn.update_deleted_with_diff(current_header.hash().clone(), output_mmr.deleted().clone()); txn.commit().await?; - if mmr_position < end - 1 { - current_header = self.shared.db.fetch_chain_header(current_header.height() + 1).await?; - } - } - mmr_position += 1; - if mmr_position % 100 == 0 || mmr_position == self.num_outputs { - let info = HorizonSyncInfo::new( - vec![self.sync_peer.peer_node_id().clone()], - HorizonSyncStatus::Outputs(mmr_position, self.num_outputs), + current_header = db.fetch_chain_header(current_header.height() + 1).await?; + debug!( + target: LOG_TARGET, + "Expecting to receive the next UTXO set for header #{}", + current_header.height() ); - self.shared.set_state_info(StateInfo::HorizonSync(info)); - } + }, + v => { + error!(target: LOG_TARGET, "Remote node returned an invalid response {:?}", v); + return Err(HorizonSyncError::IncorrectResponse( + "Invalid sync utxo returned".to_string(), + )); + }, + } + + if mmr_position % 100 == 0 || mmr_position == self.num_outputs { + let info = HorizonSyncInfo::new( + vec![self.sync_peer.peer_node_id().clone()], + HorizonSyncStatus::Outputs(mmr_position, self.num_outputs), + ); + self.shared.set_state_info(StateInfo::HorizonSync(info)); } } + if mmr_position != end { return Err(HorizonSyncError::IncorrectResponse( "Sync node did not send all utxos requested".to_string(), @@ -440,7 +504,7 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> { ); self.shared.set_state_info(StateInfo::HorizonSync(info)); - let header = self.shared.db.fetch_chain_header(self.horizon_sync_height).await?; + let header = self.db().fetch_chain_header(self.horizon_sync_height).await?; let mut pruned_utxo_sum = HomomorphicCommitment::default(); let mut pruned_kernel_sum = HomomorphicCommitment::default(); @@ -457,7 +521,7 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> { prev_mmr, curr_header.header.output_mmr_size - 1 ); - let utxos = self + let (utxos, _) = self .db() .fetch_utxos_by_mmr_position(prev_mmr, curr_header.header.output_mmr_size - 1, header.hash().clone()) .await?; @@ -476,9 +540,9 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> { let mut utxo_sum = HomomorphicCommitment::default(); debug!(target: LOG_TARGET, "Number of kernels returned: {}", kernels.len()); - debug!(target: LOG_TARGET, "Number of utxos returned: {}", utxos.0.len()); + debug!(target: LOG_TARGET, "Number of utxos returned: {}", utxos.len()); let mut prune_counter = 0; - for u in utxos.0 { + for u in utxos { match u { PrunedOutput::NotPruned { output } => { utxo_sum = &output.commitment + &utxo_sum; diff --git a/base_layer/core/src/base_node/sync/rpc/mod.rs b/base_layer/core/src/base_node/sync/rpc/mod.rs index b2d31d8dd5..4762fc8afc 100644 --- a/base_layer/core/src/base_node/sync/rpc/mod.rs +++ b/base_layer/core/src/base_node/sync/rpc/mod.rs @@ -25,6 +25,8 @@ mod service; #[cfg(feature = "base_node")] pub use service::BaseNodeSyncRpcService; +// mod sync_utxos; + // TODO: Tests need to be rewritten // #[cfg(test)] // mod tests; @@ -40,7 +42,6 @@ use crate::{ SyncBlocksRequest, SyncHeadersRequest, SyncKernelsRequest, - SyncUtxos2Response, SyncUtxosRequest, SyncUtxosResponse, }, @@ -86,12 +87,8 @@ pub trait BaseNodeSyncService: Send + Sync + 'static { request: Request, ) -> Result, RpcStatus>; - #[rpc(method = 7)] - async fn sync_utxos(&self, request: Request) -> Result, RpcStatus>; - #[rpc(method = 8)] - async fn sync_utxos2(&self, request: Request) - -> Result, RpcStatus>; + async fn sync_utxos(&self, request: Request) -> Result, RpcStatus>; } #[cfg(feature = "base_node")] diff --git a/base_layer/core/src/base_node/sync/rpc/service.rs b/base_layer/core/src/base_node/sync/rpc/service.rs index 9f1c9267bb..3f6612ad6d 100644 --- a/base_layer/core/src/base_node/sync/rpc/service.rs +++ b/base_layer/core/src/base_node/sync/rpc/service.rs @@ -22,7 +22,8 @@ use crate::{ base_node::sync::rpc::BaseNodeSyncService, - chain_storage::{async_db::AsyncBlockchainDb, BlockchainBackend, OrNotFound, PrunedOutput}, + chain_storage::{async_db::AsyncBlockchainDb, BlockchainBackend, OrNotFound}, + crypto::tari_utilities::Hashable, iterators::NonOverlappingIntegerPairIter, proto, proto::base_node::{ @@ -32,17 +33,16 @@ use crate::{ SyncHeadersRequest, SyncKernelsRequest, SyncUtxo, - SyncUtxos2Response, SyncUtxosRequest, SyncUtxosResponse, }, }; use futures::{channel::mpsc, stream, SinkExt}; use log::*; -use std::cmp; +use std::{cmp, time::Instant}; use tari_comms::protocol::rpc::{Request, Response, RpcStatus, Streaming}; use tari_crypto::tari_utilities::hex::Hex; -use tokio::{task, time::Instant}; +use tokio::task; const LOG_TARGET: &str = "c::base_node::sync_rpc"; @@ -409,218 +409,184 @@ impl BaseNodeSyncService for BaseNodeSyncRpcServ } async fn sync_utxos(&self, request: Request) -> Result, RpcStatus> { - let peer = request.context().peer_node_id().clone(); - let req = request.into_message(); - const UTXOS_PER_BATCH: usize = 100; - const BATCH_SIZE: usize = 100; - let (mut tx, rx) = mpsc::channel(BATCH_SIZE); - let db = self.db(); + let req = request.message(); + let peer = request.context().peer_node_id(); + debug!( + target: LOG_TARGET, + "Received sync_utxos request from {} (start = {}, include_pruned_utxos = {}, include_deleted_bitmaps = {})", + peer, + req.start, + req.include_pruned_utxos, + req.include_deleted_bitmaps + ); - task::spawn(async move { - let timer = Instant::now(); - let end_header = match db - .fetch_header_by_block_hash(req.end_header_hash.clone()) - .await - .or_not_found("BlockHeader", "hash", req.end_header_hash.to_hex()) - .map_err(RpcStatus::log_internal_error(LOG_TARGET)) - { - Ok(header) => header, - Err(err) => { + struct SyncUtxosTask { + db: AsyncBlockchainDb, + request: SyncUtxosRequest, + } + + impl SyncUtxosTask + where B: BlockchainBackend + 'static + { + pub fn new(db: AsyncBlockchainDb, request: SyncUtxosRequest) -> Self { + Self { db, request } + } + + pub async fn run(self, mut tx: mpsc::Sender>) { + if let Err(err) = self.start_streaming(&mut tx).await { let _ = tx.send(Err(err)).await; - return; - }, - }; + } + } - let iter = NonOverlappingIntegerPairIter::new(req.start, end_header.output_mmr_size, UTXOS_PER_BATCH); - let fetch_header_time = timer.elapsed().as_millis(); - let mut fetch_utxos_time = 0u128; - for (start, end) in iter { - let timer = Instant::now(); - if tx.is_closed() { - debug!( - target: LOG_TARGET, - "Exiting sync_utxos early because client ({}) has gone", peer - ); - break; + async fn start_streaming( + &self, + tx: &mut mpsc::Sender>, + ) -> Result<(), RpcStatus> + { + let end_header = self + .db + .fetch_header_by_block_hash(self.request.end_header_hash.clone()) + .await + .map_err(RpcStatus::log_internal_error(LOG_TARGET))? + .ok_or_else(|| { + RpcStatus::not_found(format!( + "End header hash {} is was not found", + self.request.end_header_hash.to_hex() + )) + })?; + let end_header_hash = end_header.hash(); + + if self.request.start > end_header.output_mmr_size - 1 { + return Err(RpcStatus::bad_request(format!( + "start index {} cannot be greater than the end header's output MMR size ({})", + self.request.start, end_header.output_mmr_size + ))); } - debug!(target: LOG_TARGET, "Streaming utxos {} to {}", start, end); - let res = db - .fetch_utxos_by_mmr_position(start, end, req.end_header_hash.clone()) + + let prev_header = self + .db + .fetch_header_containing_utxo_mmr(self.request.start) .await - .map_err(RpcStatus::log_internal_error(LOG_TARGET)); - fetch_utxos_time += timer.elapsed().as_millis(); - match res { - Ok((utxos, deleted)) => { - if utxos.is_empty() { - break; - } - let response = SyncUtxosResponse { - utxos: utxos - .into_iter() - .map(|pruned_output| match pruned_output { - PrunedOutput::Pruned { - output_hash, - range_proof_hash, - } => SyncUtxo { - output: None, - hash: output_hash, - rangeproof_hash: range_proof_hash, - }, - PrunedOutput::NotPruned { output } => SyncUtxo { - output: Some(output.into()), - hash: vec![], - rangeproof_hash: vec![], - }, - }) - .collect(), - deleted_bitmaps: deleted.into_iter().map(|d| d.serialize()).collect(), - }; + .map_err(RpcStatus::log_internal_error(LOG_TARGET))?; + let mut prev_header = prev_header.header; - // Ensure task stops if the peer prematurely stops their RPC session - if tx.send(Ok(response)).await.is_err() { - break; - } - }, - Err(err) => { - let _ = tx.send(Err(err)).await; - break; - }, + if prev_header.height > end_header.height { + return Err(RpcStatus::bad_request("start index is greater than end index")); } - debug!( - target: LOG_TARGET, - "Streamed utxos {} to {} in {:.2?}", - start, - end, - timer.elapsed() - ); - } - let send_utxos_time = timer.elapsed().as_millis() - fetch_header_time - fetch_utxos_time; - trace!( - target: LOG_TARGET, - "Timings - Fetch header info from db: {} ms, Fetch UTXOs from db: {} ms, RPC send UTXO stream: {} ms", - fetch_header_time, - fetch_utxos_time, - send_utxos_time, - ); - }); - Ok(Streaming::new(rx)) - } + loop { + let timer = Instant::now(); + if prev_header.height == end_header.height { + break; + } - async fn sync_utxos2( - &self, - request: Request, - ) -> Result, RpcStatus> - { - let peer = request.context().peer_node_id().clone(); - let req = request.into_message(); - const BATCH_SIZE: usize = 100; - let (mut tx, rx) = mpsc::channel(BATCH_SIZE); - let db = self.db(); + let current_header = self + .db + .fetch_header(prev_header.height + 1) + .await + .map_err(RpcStatus::log_internal_error(LOG_TARGET))? + .ok_or_else(|| { + RpcStatus::general(format!( + "Potential data consistency issue: header {} not found", + prev_header.height + 1 + )) + })?; - let end_header = db - .fetch_header_by_block_hash(req.end_header_hash.clone()) - .await - .map_err(RpcStatus::log_internal_error(LOG_TARGET))? - .ok_or_else(|| RpcStatus::not_found("end_header_hash was not found"))?; + debug!( + target: LOG_TARGET, + "previous header = {} ({}) current header = {} ({})", + prev_header.height, + prev_header.hash().to_hex(), + current_header.height, + current_header.hash().to_hex() + ); - debug!( - target: LOG_TARGET, - "Received sync_utxos request from {} (start = {}, include_pruned_utxos = {}, include_deleted_bitmaps = {})", - peer, - req.start, - req.include_pruned_utxos, - req.include_deleted_bitmaps - ); + let start = cmp::max(self.request.start, prev_header.output_mmr_size); + let end = current_header.output_mmr_size - 1; - if req.start > end_header.output_mmr_size { - return Err(RpcStatus::bad_request("start index is greater than end index")); - } + if tx.is_closed() { + debug!(target: LOG_TARGET, "Exiting sync_utxos early because client has gone",); + break; + } - task::spawn(async move { - let iter = NonOverlappingIntegerPairIter::new(req.start, end_header.output_mmr_size, BATCH_SIZE); - for (start, end) in iter { - let timer = Instant::now(); - if tx.is_closed() { debug!( target: LOG_TARGET, - "Exiting sync_utxos early because client ({}) has gone", peer + "Streaming UTXOs {}-{} ({}) for block #{}", + start, + end, + end.saturating_sub(start).saturating_add(1), + current_header.height + ); + let (utxos, deleted_diff) = self + .db + .fetch_utxos_by_mmr_position(start, end, end_header_hash.clone()) + .await + .map_err(RpcStatus::log_internal_error(LOG_TARGET))?; + trace!( + target: LOG_TARGET, + "Loaded {} UTXO(s) and |deleted_diff| = {}", + utxos.len(), + deleted_diff.cardinality(), + ); + let mut utxos = stream::iter( + utxos + .into_iter() + .enumerate() + // Only include pruned UTXOs if include_pruned_utxos is true + .filter(|(_, utxo)| self.request.include_pruned_utxos || !utxo.is_pruned()) + .map(|(i, utxo)| { + SyncUtxosResponse { + utxo_or_deleted: Some(proto::base_node::sync_utxos_response::UtxoOrDeleted::Utxo( + SyncUtxo::from(utxo) + )), + mmr_index: start + i as u64, + } + }) + .map(Ok) + .map(Ok), ); - break; - } - debug!(target: LOG_TARGET, "Streaming utxos {} to {}", start, end); - let res = db - .fetch_utxos_by_mmr_position(start, end, req.end_header_hash.clone()) - .await - .map_err(RpcStatus::log_internal_error(LOG_TARGET)); - debug!( - target: LOG_TARGET, - "Fetched {} utxos in {:.2?}", - end - start, - timer.elapsed() - ); - match res { - Ok((utxos, deleted)) => { - if utxos.is_empty() { - break; - } + // Ensure task stops if the peer prematurely stops their RPC session + if tx.send_all(&mut utxos).await.is_err() { + break; + } - let mut utxos = stream::iter( - utxos - .into_iter() - .enumerate() - // Only include pruned UTXOs if include_pruned_utxos is true - .filter(|(_, utxo)| req.include_pruned_utxos || !utxo.is_pruned()) - .map(|(i, utxo)| { - let utxo = proto::base_node::SyncUtxo2::from(utxo); - proto::base_node::SyncUtxos2Response { - utxo_or_deleted: Some(proto::base_node::sync_utxos2_response::UtxoOrDeleted::Utxo( - utxo, - )), - mmr_index: start.saturating_add(i as u64), - } - }) - .map(Ok) - .map(Ok), - ); + if self.request.include_deleted_bitmaps { + let bitmaps = SyncUtxosResponse { + utxo_or_deleted: Some(proto::base_node::sync_utxos_response::UtxoOrDeleted::DeletedDiff( + deleted_diff.serialize(), + )), + mmr_index: 0, + }; - // Ensure task stops if the peer prematurely stops their RPC session - if tx.send_all(&mut utxos).await.is_err() { + if tx.send(Ok(bitmaps)).await.is_err() { break; } + } + debug!( + target: LOG_TARGET, + "Streamed utxos {} to {} in {:.2?} (including stream backpressure)", + start, + end, + timer.elapsed() + ); - if req.include_deleted_bitmaps { - let bitmaps = deleted.into_iter().map(|b| b.serialize()).collect(); - let bitmaps = proto::base_node::SyncUtxos2Response { - utxo_or_deleted: Some( - proto::base_node::sync_utxos2_response::UtxoOrDeleted::DeletedBitmaps( - proto::base_node::Bitmaps { bitmaps }, - ), - ), - mmr_index: 0, - }; - - if tx.send(Ok(bitmaps)).await.is_err() { - break; - } - } - }, - Err(err) => { - let _ = tx.send(Err(err)).await; - break; - }, + prev_header = current_header; } debug!( target: LOG_TARGET, - "Streamed utxos {} to {} in {:.2?} (including stream backpressure)", - start, - end, - timer.elapsed() + "UTXO sync completed to UTXO {} (Header hash = {})", + prev_header.output_mmr_size, + prev_header.hash().to_hex() ); + + Ok(()) } - }); + } + + let (tx, rx) = mpsc::channel(200); + task::spawn(SyncUtxosTask::new(self.db(), request.into_message()).run(tx)); Ok(Streaming::new(rx)) } diff --git a/base_layer/core/src/chain_storage/accumulated_data.rs b/base_layer/core/src/chain_storage/accumulated_data.rs index de82586fe5..21bb52277d 100644 --- a/base_layer/core/src/chain_storage/accumulated_data.rs +++ b/base_layer/core/src/chain_storage/accumulated_data.rs @@ -38,9 +38,12 @@ use serde::{ Serialize, Serializer, }; -use std::{fmt, fmt::Display}; +use std::{ + fmt, + fmt::{Display, Formatter}, +}; use tari_crypto::tari_utilities::hex::Hex; -use tari_mmr::pruned_hashset::PrunedHashSet; +use tari_mmr::{pruned_hashset::PrunedHashSet, ArrayLike}; const LOG_TARGET: &str = "c::bn::acc_data"; @@ -105,6 +108,19 @@ impl Default for BlockAccumulatedData { } } +impl Display for BlockAccumulatedData { + fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), fmt::Error> { + write!( + f, + "{} output(s), {} spent, {} kernel(s), {} rangeproof(s)", + self.outputs.len().unwrap_or(0), + self.deleted.deleted.cardinality(), + self.kernels.len().unwrap_or(0), + self.range_proofs.len().unwrap_or(0) + ) + } +} + impl Serialize for DeletedBitmap { fn serialize(&self, serializer: S) -> Result<::Ok, ::Error> where S: Serializer { diff --git a/base_layer/core/src/chain_storage/async_db.rs b/base_layer/core/src/chain_storage/async_db.rs index aac5ab4433..e8dcd31bdf 100644 --- a/base_layer/core/src/chain_storage/async_db.rs +++ b/base_layer/core/src/chain_storage/async_db.rs @@ -142,7 +142,7 @@ impl AsyncBlockchainDb { make_async_fn!(fetch_utxos(hashes: Vec, is_spent_as_of: Option) -> Vec>, "fetch_utxos"); - make_async_fn!(fetch_utxos_by_mmr_position(start: u64, end: u64, end_header_hash: HashOutput) -> (Vec, Vec), "fetch_utxos_by_mmr_position"); + make_async_fn!(fetch_utxos_by_mmr_position(start: u64, end: u64, end_header_hash: HashOutput) -> (Vec, Bitmap), "fetch_utxos_by_mmr_position"); //---------------------------------- Kernel --------------------------------------------// make_async_fn!(fetch_kernel_by_excess_sig(excess_sig: Signature) -> Option<(TransactionKernel, HashOutput)>, "fetch_kernel_by_excess_sig"); diff --git a/base_layer/core/src/chain_storage/blockchain_backend.rs b/base_layer/core/src/chain_storage/blockchain_backend.rs index 8ae902cc15..09dfaa6a4d 100644 --- a/base_layer/core/src/chain_storage/blockchain_backend.rs +++ b/base_layer/core/src/chain_storage/blockchain_backend.rs @@ -101,7 +101,7 @@ pub trait BlockchainBackend: Send + Sync { start: u64, end: u64, deleted: &Bitmap, - ) -> Result<(Vec, Vec), ChainStorageError>; + ) -> Result<(Vec, Bitmap), ChainStorageError>; /// Fetch a specific output. Returns the output and the leaf index in the output MMR fn fetch_output(&self, output_hash: &HashOutput) -> Result, ChainStorageError>; diff --git a/base_layer/core/src/chain_storage/blockchain_database.rs b/base_layer/core/src/chain_storage/blockchain_database.rs index 45410809d6..d81e498d05 100644 --- a/base_layer/core/src/chain_storage/blockchain_database.rs +++ b/base_layer/core/src/chain_storage/blockchain_database.rs @@ -409,7 +409,7 @@ where B: BlockchainBackend start: u64, end: u64, end_header_hash: HashOutput, - ) -> Result<(Vec, Vec), ChainStorageError> + ) -> Result<(Vec, Bitmap), ChainStorageError> { let db = self.db_read_access()?; let accum_data = db.fetch_block_accumulated_data(&end_header_hash).or_not_found( diff --git a/base_layer/core/src/chain_storage/lmdb_db/lmdb_db.rs b/base_layer/core/src/chain_storage/lmdb_db/lmdb_db.rs index 2e5b0c4534..72ee5a961c 100644 --- a/base_layer/core/src/chain_storage/lmdb_db/lmdb_db.rs +++ b/base_layer/core/src/chain_storage/lmdb_db/lmdb_db.rs @@ -83,6 +83,7 @@ use lmdb_zero::{ConstTransaction, Database, Environment, ReadTransaction, WriteT use log::*; use serde::{Deserialize, Serialize}; use std::{ + convert::TryFrom, fmt, fs, fs::File, @@ -222,7 +223,7 @@ impl LMDBDatabase { DeleteBlock(hash) => { self.delete_block_body(&write_txn, hash)?; }, - WriteOperation::InsertMoneroSeedHeight(data, height) => { + InsertMoneroSeedHeight(data, height) => { self.insert_monero_seed_height(&write_txn, data, height)?; }, InsertChainOrphanBlock(chain_block) => { @@ -1436,96 +1437,107 @@ impl BlockchainBackend for LMDBDatabase { start: u64, end: u64, deleted: &Bitmap, - ) -> Result<(Vec, Vec), ChainStorageError> + ) -> Result<(Vec, Bitmap), ChainStorageError> { let txn = ReadTransaction::new(&*self.env)?; - if let Some(start_height) = lmdb_first_after(&txn, &self.output_mmr_size_index, &(start + 1).to_be_bytes())? { - let end_height: u64 = - lmdb_first_after(&txn, &self.output_mmr_size_index, &(end + 1).to_be_bytes())?.unwrap_or(start_height); - - let previous_mmr_count = if start_height == 0 { - 0 - } else { - let header: BlockHeader = - lmdb_get(&txn, &self.headers_db, &(start_height - 1))?.expect("Header should exist"); - debug!(target: LOG_TARGET, "Previous header:{}", header); - header.output_mmr_size - }; + let start_height = lmdb_first_after(&txn, &self.output_mmr_size_index, &(start + 1).to_be_bytes())? + .ok_or_else(|| { + ChainStorageError::InvalidQuery(format!( + "Unable to find block height from start output MMR index {}", + start + )) + })?; + let end_height: u64 = + lmdb_first_after(&txn, &self.output_mmr_size_index, &(end + 1).to_be_bytes())?.unwrap_or(start_height); - let total_size = (end - start) as usize + 1; - let mut result = Vec::with_capacity(total_size); - let mut deleted_result = vec![]; + let previous_mmr_count = if start_height == 0 { + 0 + } else { + let header: BlockHeader = + lmdb_get(&txn, &self.headers_db, &(start_height - 1))?.expect("Header should exist"); + debug!(target: LOG_TARGET, "Previous header:{}", header); + header.output_mmr_size + }; - let mut skip_amount = (start - previous_mmr_count) as usize; - debug!( - target: LOG_TARGET, - "Fetching outputs by MMR position. Start {}, end {}, starting in header at height {}, prev mmr \ - count: {}, skipping the first:{}", - start, - end, - start_height, - previous_mmr_count, - skip_amount - ); + let total_size = end + .checked_sub(start) + .and_then(|v| v.checked_add(1)) + .and_then(|v| usize::try_from(v).ok()) + .ok_or_else(|| { + ChainStorageError::InvalidQuery("fetch_utxos_by_mmr_position: end is less than start".to_string()) + })?; + let mut result = Vec::with_capacity(total_size); - for height in start_height..=end_height { - let accum_data = - lmdb_get::<_, BlockHeaderAccumulatedData>(&txn, &self.header_accumulated_data_db, &height)? - .ok_or_else(|| ChainStorageError::ValueNotFound { - entity: "BlockHeader".to_string(), - field: "height".to_string(), - value: height.to_string(), - })?; + let mut skip_amount = (start - previous_mmr_count) as usize; + debug!( + target: LOG_TARGET, + "Fetching outputs by MMR position. Start {}, end {}, starting in header at height {}, prev mmr count: \ + {}, skipping the first:{}", + start, + end, + start_height, + previous_mmr_count, + skip_amount + ); + let mut difference_bitmap = Bitmap::create(); - result.extend( - lmdb_fetch_keys_starting_with::( - accum_data.hash.to_hex().as_str(), - &txn, - &self.utxos_db, - )? - .into_iter() - .skip(skip_amount) - .take(total_size - result.len()) - .map(|row| { - if deleted.contains(row.mmr_position) { - return PrunedOutput::Pruned { - output_hash: row.hash, - range_proof_hash: row.range_proof_hash, - }; - } - if let Some(output) = row.output { - PrunedOutput::NotPruned { output } - } else { - PrunedOutput::Pruned { - output_hash: row.hash, - range_proof_hash: row.range_proof_hash, - } + for height in start_height..=end_height { + let accum_data = + lmdb_get::<_, BlockHeaderAccumulatedData>(&txn, &self.header_accumulated_data_db, &height)? + .ok_or_else(|| ChainStorageError::ValueNotFound { + entity: "BlockHeader".to_string(), + field: "height".to_string(), + value: height.to_string(), + })?; + + result.extend( + lmdb_fetch_keys_starting_with::( + accum_data.hash.to_hex().as_str(), + &txn, + &self.utxos_db, + )? + .into_iter() + .skip(skip_amount) + .take(total_size - result.len()) + .map(|row| { + if deleted.contains(row.mmr_position) { + return PrunedOutput::Pruned { + output_hash: row.hash, + range_proof_hash: row.range_proof_hash, + }; + } + if let Some(output) = row.output { + PrunedOutput::NotPruned { output } + } else { + PrunedOutput::Pruned { + output_hash: row.hash, + range_proof_hash: row.range_proof_hash, } - }), - ); - - let block_accum_data = self - .fetch_block_accumulated_data(&txn, height) - .or_not_found("BlockAccumulatedData", "height", height.to_string())? - .deleted() - .clone(); - let prev_block_accum_data = if height == 0 { - Bitmap::create() - } else { - self.fetch_block_accumulated_data(&txn, height - 1) - .or_not_found("BlockAccumulatedData", "height", height.to_string())? - .deleted() - .clone() - }; - let diff_bitmap = block_accum_data.xor(&prev_block_accum_data); - deleted_result.push(diff_bitmap); + } + }), + ); - skip_amount = 0; + // Builds a BitMap of the deleted UTXO MMR indexes that occurred at the current height + let mut diff_bitmap = self + .fetch_block_accumulated_data(&txn, height) + .or_not_found("BlockAccumulatedData", "height", height.to_string())? + .deleted() + .clone(); + if height > 0 { + let prev_accum = self.fetch_block_accumulated_data(&txn, height - 1).or_not_found( + "BlockAccumulatedData", + "height", + height.to_string(), + )?; + diff_bitmap.xor_inplace(prev_accum.deleted()); } - Ok((result, deleted_result)) - } else { - Ok((vec![], vec![])) + difference_bitmap.or_inplace(&diff_bitmap); + + skip_amount = 0; } + + difference_bitmap.run_optimize(); + Ok((result, difference_bitmap)) } fn fetch_output(&self, output_hash: &HashOutput) -> Result, ChainStorageError> { diff --git a/base_layer/core/src/test_helpers/blockchain.rs b/base_layer/core/src/test_helpers/blockchain.rs index f6f42393eb..899d336fcd 100644 --- a/base_layer/core/src/test_helpers/blockchain.rs +++ b/base_layer/core/src/test_helpers/blockchain.rs @@ -245,7 +245,7 @@ impl BlockchainBackend for TempDatabase { start: u64, end: u64, deleted: &Bitmap, - ) -> Result<(Vec, Vec), ChainStorageError> + ) -> Result<(Vec, Bitmap), ChainStorageError> { self.db.fetch_utxos_by_mmr_position(start, end, deleted) } diff --git a/base_layer/wallet/src/tasks/wallet_recovery.rs b/base_layer/wallet/src/tasks/wallet_recovery.rs index dd1a09c165..9e7f7a9dc2 100644 --- a/base_layer/wallet/src/tasks/wallet_recovery.rs +++ b/base_layer/wallet/src/tasks/wallet_recovery.rs @@ -155,27 +155,14 @@ impl WalletRecoveryTask { async fn finalize(&self, total_scanned: u64, final_utxo_pos: u64, elapsed: Duration) -> Result<(), WalletError> { let num_recovered = self.get_metadata(RecoveryMetadataKey::NumUtxos).await?.unwrap_or(0); let total_amount = self - .wallet - .db - .get_client_key_from_str(RECOVERY_TOTAL_AMOUNT_KEY.to_string()) + .get_metadata(RecoveryMetadataKey::TotalAmount) .await? .unwrap_or_else(|| 0.into()); - let _ = self - .wallet - .db - .clear_client_value(RECOVERY_HEIGHT_KEY.to_string()) - .await?; - let _ = self - .wallet - .db - .clear_client_value(RECOVERY_NUM_UTXOS_KEY.to_string()) - .await?; - let _ = self - .wallet - .db - .clear_client_value(RECOVERY_TOTAL_AMOUNT_KEY.to_string()) - .await?; + self.clear_metadata(RecoveryMetadataKey::Height).await?; + self.clear_metadata(RecoveryMetadataKey::NumUtxos).await?; + self.clear_metadata(RecoveryMetadataKey::TotalAmount).await?; + self.clear_metadata(RecoveryMetadataKey::UtxoIndex).await?; self.publish_event(WalletRecoveryEvent::Progress(final_utxo_pos, final_utxo_pos)); self.publish_event(WalletRecoveryEvent::Completed( @@ -337,14 +324,14 @@ impl WalletRecoveryTask { include_deleted_bitmaps: false, }; - let utxo_stream = client.sync_utxos2(request).await.map_err(to_wallet_recovery_error)?; + let utxo_stream = client.sync_utxos(request).await.map_err(to_wallet_recovery_error)?; // We download in chunks just because rewind_outputs works with multiple outputs (and could parallelized // rewinding) let mut utxo_stream = utxo_stream.chunks(10); let mut last_utxo_index = 0u64; let mut iteration_count = 0u64; while let Some(response) = utxo_stream.next().await { - let response: Vec = response + let response: Vec = response .into_iter() .map(|v| v.map_err(to_wallet_recovery_error)) .collect::, _>>()?; @@ -417,11 +404,8 @@ impl WalletRecoveryTask { .await?; let current_num_utxos = self.get_metadata(RecoveryMetadataKey::NumUtxos).await?.unwrap_or(0u64); - self.set_metadata( - RecoveryMetadataKey::NumUtxos, - (current_num_utxos + num_recovered).to_string(), - ) - .await?; + self.set_metadata(RecoveryMetadataKey::NumUtxos, current_num_utxos + num_recovered) + .await?; let current_total_amount = self .get_metadata::(RecoveryMetadataKey::TotalAmount) @@ -432,7 +416,7 @@ impl WalletRecoveryTask { .await?; self.set_metadata( RecoveryMetadataKey::TotalAmount, - (current_total_amount + total_amount).as_u64().to_string(), + (current_total_amount + total_amount).as_u64(), ) .await?; @@ -462,6 +446,11 @@ impl WalletRecoveryTask { Ok(value) } + async fn clear_metadata(&self, key: RecoveryMetadataKey) -> Result<(), WalletError> { + self.wallet.db.clear_client_value(key.as_key_str().to_string()).await?; + Ok(()) + } + fn publish_event(&self, event: WalletRecoveryEvent) { let _ = self.event_sender.send(event); } diff --git a/comms/src/protocol/rpc/status.rs b/comms/src/protocol/rpc/status.rs index eef17c3b10..ffa4fa839f 100644 --- a/comms/src/protocol/rpc/status.rs +++ b/comms/src/protocol/rpc/status.rs @@ -63,6 +63,8 @@ impl RpcStatus { } } + /// Returns a general error. As with all other errors care should be taken not to leak sensitive data to remote + /// peers through error messages. pub fn general(details: T) -> Self { Self { code: RpcStatusCode::General, @@ -88,10 +90,12 @@ impl RpcStatus { } } + /// Returns a closure that logs the given error and returns a generic general error that does not leak any + /// potentially sensitive error information. Use this function with map_err to catch "miscellaneous" errors. pub fn log_internal_error<'a, E: std::error::Error + 'a>(target: &'a str) -> impl Fn(E) -> Self + 'a { move |err| { log::error!(target: target, "Internal error: {}", err); - Self::general(err.to_string()) + Self::general_default() } }