Skip to content

Commit

Permalink
refactor: mapping for deleted mmr position to height/hash for perf
Browse files Browse the repository at this point in the history
- add index to optimise query for block height from deleted mmr position
- update query_deleted rpc call to use new call
- check if database should be resynced to avoid confusion around
  db errors resulting from this change
- exit code message showing help for a required db resync
  • Loading branch information
sdbondi committed Sep 29, 2021
1 parent d440afd commit ed1b831
Show file tree
Hide file tree
Showing 13 changed files with 247 additions and 213 deletions.
25 changes: 25 additions & 0 deletions applications/tari_app_utilities/src/utilities.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ pub enum ExitCodes {
NoPassword,
#[error("Tor connection is offline")]
TorOffline,
#[error("Database is in inconsistent state: {0}")]
DbInconsistentState(String),
}

impl ExitCodes {
Expand All @@ -94,6 +96,29 @@ impl ExitCodes {
Self::ConversionError(_) => 111,
Self::IncorrectPassword | Self::NoPassword => 112,
Self::TorOffline => 113,
Self::DbInconsistentState(_) => 115,
}
}

pub fn eprint_details(&self) {
use ExitCodes::*;
match self {
TorOffline => {
eprintln!("Unable to connect to the Tor control port.");
eprintln!(
"Please check that you have the Tor proxy running and that access to the Tor control port is \
turned on.",
);
eprintln!("If you are unsure of what to do, use the following command to start the Tor proxy:");
eprintln!(
"tor --allow-missing-torrc --ignore-missing-torrc --clientonly 1 --socksport 9050 --controlport \
127.0.0.1:9051 --log \"notice stdout\" --clientuseipv6 1",
);
},

e => {
eprintln!("{}", e);
},
}
}
}
Expand Down
23 changes: 9 additions & 14 deletions applications/tari_base_node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ use tari_app_utilities::{
};
use tari_common::{configuration::bootstrap::ApplicationType, ConfigBootstrap, GlobalConfig};
use tari_comms::{peer_manager::PeerFeatures, tor::HiddenServiceControllerError};
use tari_core::chain_storage::ChainStorageError;
use tari_shutdown::{Shutdown, ShutdownSignal};
use tokio::{
runtime,
Expand All @@ -128,7 +129,7 @@ const LOG_TARGET: &str = "base_node::app";
/// Application entry point
fn main() {
if let Err(exit_code) = main_inner() {
eprintln!("{:?}", exit_code);
exit_code.eprint_details();
error!(
target: LOG_TARGET,
"Exiting with code ({}): {:?}",
Expand Down Expand Up @@ -205,21 +206,15 @@ async fn run_node(node_config: Arc<GlobalConfig>, bootstrap: ConfigBootstrap) ->
.await
.map_err(|err| {
for boxed_error in err.chain() {
if let Some(HiddenServiceControllerError::TorControlPortOffline) =
boxed_error.downcast_ref::<HiddenServiceControllerError>()
{
println!("Unable to connect to the Tor control port.");
println!(
"Please check that you have the Tor proxy running and that access to the Tor control port is \
turned on.",
);
println!("If you are unsure of what to do, use the following command to start the Tor proxy:");
println!(
"tor --allow-missing-torrc --ignore-missing-torrc --clientonly 1 --socksport 9050 --controlport \
127.0.0.1:9051 --log \"notice stdout\" --clientuseipv6 1",
);
if let Some(HiddenServiceControllerError::TorControlPortOffline) = boxed_error.downcast_ref() {
return ExitCodes::TorOffline;
}
if let Some(ChainStorageError::DatabaseResyncRequired(reason)) = boxed_error.downcast_ref() {
return ExitCodes::DbInconsistentState(format!(
"You may need to resync your database because {}",
reason
));
}

// todo: find a better way to do this
if boxed_error.to_string().contains("Invalid force sync peer") {
Expand Down
46 changes: 24 additions & 22 deletions base_layer/core/src/base_node/rpc/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
use crate::{
base_node::{rpc::BaseNodeWalletService, state_machine_service::states::StateInfo, StateMachineHandle},
chain_storage::{async_db::AsyncBlockchainDb, BlockchainBackend, PrunedOutput, UtxoMinedInfo},
crypto::tari_utilities::Hashable,
mempool::{service::MempoolHandle, TxStorageResponse},
proto,
proto::{
Expand All @@ -47,7 +46,7 @@ use crate::{
},
transactions::transaction::Transaction,
};
use std::convert::{TryFrom, TryInto};
use std::convert::TryFrom;
use tari_common_types::types::Signature;
use tari_comms::protocol::rpc::{Request, Response, RpcStatus};

Expand Down Expand Up @@ -307,13 +306,13 @@ impl<B: BlockchainBackend + 'static> BaseNodeWalletService for BaseNodeWalletRpc

let db = self.db();
let mut res = Vec::with_capacity(message.output_hashes.len());
for (pruned_output, spent) in (db
let utxos = db
.fetch_utxos(message.output_hashes)
.await
.map_err(RpcStatus::log_internal_error(LOG_TARGET))?)
.into_iter()
.flatten()
{
.map_err(RpcStatus::log_internal_error(LOG_TARGET))?
.into_iter()
.flatten();
for (pruned_output, spent) in utxos {
if let PrunedOutput::NotPruned { output } = pruned_output {
if !spent {
res.push(output);
Expand Down Expand Up @@ -396,15 +395,9 @@ impl<B: BlockchainBackend + 'static> BaseNodeWalletService for BaseNodeWalletRpc
}
}

let metadata = self
.db
.get_chain_metadata()
.await
.map_err(RpcStatus::log_internal_error(LOG_TARGET))?;

let deleted_bitmap = self
.db
.fetch_complete_deleted_bitmap_at(metadata.best_block().clone())
.fetch_deleted_bitmap_at_tip()
.await
.map_err(RpcStatus::log_internal_error(LOG_TARGET))?;

Expand All @@ -416,8 +409,8 @@ impl<B: BlockchainBackend + 'static> BaseNodeWalletService for BaseNodeWalletRpc
// TODO: in future, bitmap may support higher than u32
return Err(RpcStatus::bad_request("position must fit into a u32"));
}
let pos = position.try_into().unwrap();
if deleted_bitmap.bitmap().contains(pos) {
let position = position as u32;
if deleted_bitmap.bitmap().contains(position) {
deleted_positions.push(position);
} else {
not_deleted_positions.push(position);
Expand All @@ -429,20 +422,29 @@ impl<B: BlockchainBackend + 'static> BaseNodeWalletService for BaseNodeWalletRpc
if message.include_deleted_block_data {
let headers = self
.db
.fetch_headers_of_deleted_positions(deleted_positions.clone())
.fetch_header_hash_by_deleted_mmr_positions(deleted_positions.clone())
.await
.map_err(RpcStatus::log_internal_error(LOG_TARGET))?;
for header in headers.iter() {
heights_deleted_at.push(header.height);
blocks_deleted_in.push(header.hash());

heights_deleted_at.reserve(headers.len());
blocks_deleted_in.reserve(headers.len());
for (height, hash) in headers.into_iter().flatten() {
heights_deleted_at.push(height);
blocks_deleted_in.push(hash);
}
}

let metadata = self
.db
.get_chain_metadata()
.await
.map_err(RpcStatus::log_internal_error(LOG_TARGET))?;

Ok(Response::new(QueryDeletedResponse {
height_of_longest_chain: metadata.height_of_longest_chain(),
best_block: metadata.best_block().clone(),
deleted_positions,
not_deleted_positions,
deleted_positions: deleted_positions.into_iter().map(|v| v as u64).collect(),
not_deleted_positions: not_deleted_positions.into_iter().map(|v| v as u64).collect(),
blocks_deleted_in,
heights_deleted_at,
}))
Expand Down
5 changes: 4 additions & 1 deletion base_layer/core/src/chain_storage/async_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ use crate::{
DbBasicStats,
DbTotalSizeStats,
DbTransaction,
DeletedBitmap,
HistoricalBlock,
HorizonData,
MmrTree,
Expand Down Expand Up @@ -235,7 +236,9 @@ impl<B: BlockchainBackend + 'static> AsyncBlockchainDb<B> {

make_async_fn!(fetch_complete_deleted_bitmap_at(hash: HashOutput) -> CompleteDeletedBitmap, "fetch_deleted_bitmap");

make_async_fn!(fetch_headers_of_deleted_positions(mmr_position: Vec<u64>) -> Vec<BlockHeader>, "fetch_headers_of_deleted_positions");
make_async_fn!(fetch_deleted_bitmap_at_tip() -> DeletedBitmap, "fetch_deleted_bitmap_at_tip");

make_async_fn!(fetch_header_hash_by_deleted_mmr_positions(mmr_positions: Vec<u32>) -> Vec<Option<(u64, HashOutput)>>, "fetch_headers_of_deleted_positions");

make_async_fn!(get_stats() -> DbBasicStats, "get_stats");

Expand Down
6 changes: 6 additions & 0 deletions base_layer/core/src/chain_storage/blockchain_backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,4 +168,10 @@ pub trait BlockchainBackend: Send + Sync {
/// Returns total size information about each internal database. This call may be very slow and will obtain a read
/// lock for the duration.
fn fetch_total_size_stats(&self) -> Result<DbTotalSizeStats, ChainStorageError>;

/// Returns a (block height/hash) tuple for each mmr position of the height it was spent, or None if it is not spent
fn fetch_header_hash_by_deleted_mmr_positions(
&self,
mmr_positions: Vec<u32>,
) -> Result<Vec<Option<(u64, HashOutput)>>, ChainStorageError>;
}
56 changes: 10 additions & 46 deletions base_layer/core/src/chain_storage/blockchain_database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ use crate::{
ChainHeader,
DbBasicStats,
DbTotalSizeStats,
DeletedBitmap,
HistoricalBlock,
HorizonData,
MmrTree,
Expand Down Expand Up @@ -945,58 +946,21 @@ where B: BlockchainBackend
))
}

// TODO Update this method to make use of a (mmr_position, block_height) index in the lmdb backend instead of a
// linear search
pub fn fetch_headers_of_deleted_positions(
&self,
mut mmr_positions: Vec<u64>,
) -> Result<Vec<BlockHeader>, ChainStorageError> {
pub fn fetch_deleted_bitmap_at_tip(&self) -> Result<DeletedBitmap, ChainStorageError> {
let db = self.db_read_access()?;
db.fetch_deleted_bitmap()
}

pub fn fetch_header_hash_by_deleted_mmr_positions(
&self,
mmr_positions: Vec<u32>,
) -> Result<Vec<Option<(u64, HashOutput)>>, ChainStorageError> {
if mmr_positions.is_empty() {
return Ok(Vec::new());
}

let chain_metadata = db.fetch_chain_metadata()?;
let mut height = chain_metadata.height_of_longest_chain();

mmr_positions.sort_unstable();

let mut headers = Vec::with_capacity(mmr_positions.len());

let mut target = mmr_positions.pop().expect("mmr_positions cannot be empty here");

loop {
if target > u32::MAX as u64 {
// TODO: in future, bitmap may support higher than u32
return Err(ChainStorageError::InvalidArguments {
func: "fetch_header_of_deleted_position",
arg: "mmr_positions",
message: "mmr_positions should fit into u32".into(),
});
}
if db
.fetch_block_accumulated_data_by_height(height)
.or_not_found("BlockAccumulatedData", "height", height.to_string())?
.deleted()
.contains(target as u32)
{
headers.push(fetch_header(&(*db), height)?);
if let Some(pos) = mmr_positions.pop() {
target = pos;
} else {
break;
}
}

if height > 0 {
height -= 1;
} else {
break;
}
}

Ok(headers)
let db = self.db_read_access()?;
db.fetch_header_hash_by_deleted_mmr_positions(mmr_positions)
}

pub fn get_stats(&self) -> Result<DbBasicStats, ChainStorageError> {
Expand Down
2 changes: 2 additions & 0 deletions base_layer/core/src/chain_storage/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,8 @@ pub enum ChainStorageError {
DbResizeRequired,
#[error("DB transaction was too large ({0} operations)")]
DbTransactionTooLarge(usize),
#[error("DB needs to be resynced: {0}")]
DatabaseResyncRequired(&'static str),
}

impl ChainStorageError {
Expand Down
Loading

0 comments on commit ed1b831

Please sign in to comment.