Skip to content

Commit

Permalink
feat!: integration of new transaction and output validation
Browse files Browse the repository at this point in the history
This PR cleans up and updates the new Output and Transaction validation strategies.

@mikethetike provided the first draft of this new validation approach for both Outputs and Transactions but the work needed some love to fully integrate into the wallet code. Many edge cases were caught and some bigger changes are made in this PR to make the process reliable.

Some major changes are:
- Output validation now supports checking and reporting on number of confirmations
- The Pending Transaction database table is fully removed and this process is now handled fully within the Outputs table
- Validations are now triggered on receiving a new block from the base node rather than at an interval
- Base node `query_deleted` call is updated to return the block data of where a deletion occured
  • Loading branch information
philipr-za committed Sep 17, 2021
1 parent 7822c72 commit 66fb85c
Show file tree
Hide file tree
Showing 57 changed files with 3,046 additions and 6,553 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 0 additions & 5 deletions applications/tari_base_node/src/command_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,6 @@ pub enum StatusOutput {
Full,
}

pub enum StatusOutput {
Log,
Full,
}

pub struct CommandHandler {
executor: runtime::Handle,
config: Arc<GlobalConfig>,
Expand Down
4 changes: 2 additions & 2 deletions applications/tari_base_node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ mod status_line;
mod utils;

use crate::command_handler::{CommandHandler, StatusOutput};
use futures::{future::Fuse, pin_mut, FutureExt};
use futures::{pin_mut, FutureExt};
use log::*;
use opentelemetry::{self, global, KeyValue};
use parser::Parser;
Expand All @@ -119,7 +119,7 @@ use tari_shutdown::{Shutdown, ShutdownSignal};
use tokio::{
runtime,
task,
time::{self, Delay},
time::{self},
};
use tonic::transport::Server;
use tracing_subscriber::{layer::SubscriberExt, Registry};
Expand Down
51 changes: 9 additions & 42 deletions applications/tari_console_wallet/src/init/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,9 @@ use tari_shutdown::ShutdownSignal;
use tari_wallet::{
base_node_service::config::BaseNodeServiceConfig,
error::{WalletError, WalletStorageError},
output_manager_service::{config::OutputManagerServiceConfig, TxoValidationType},
output_manager_service::config::OutputManagerServiceConfig,
storage::{database::WalletDatabase, sqlite_utilities::initialize_sqlite_database_backends},
transaction_service::{
config::{TransactionRoutingMechanism, TransactionServiceConfig},
tasks::start_transaction_validation_and_broadcast_protocols::start_transaction_validation_and_broadcast_protocols,
},
types::ValidationRetryStrategy,
transaction_service::config::{TransactionRoutingMechanism, TransactionServiceConfig},
Wallet,
WalletConfig,
WalletSqlite,
Expand Down Expand Up @@ -391,6 +387,7 @@ pub async fn init_wallet(
prevent_fee_gt_amount: config.prevent_fee_gt_amount,
event_channel_size: config.output_manager_event_channel_size,
base_node_update_publisher_channel_size: config.base_node_update_publisher_channel_size,
num_confirmations_required: config.transaction_num_confirmations_required,
..Default::default()
}),
config.network.into(),
Expand Down Expand Up @@ -500,12 +497,7 @@ pub async fn start_wallet(
if let Err(e) = wallet.transaction_service.restart_transaction_protocols().await {
error!(target: LOG_TARGET, "Problem restarting transaction protocols: {}", e);
}
if let Err(e) = start_transaction_validation_and_broadcast_protocols(
wallet.transaction_service.clone(),
ValidationRetryStrategy::UntilSuccess,
)
.await
{
if let Err(e) = wallet.transaction_service.validate_transactions().await {
error!(
target: LOG_TARGET,
"Problem validating and restarting transaction protocols: {}", e
Expand All @@ -521,37 +513,12 @@ pub async fn start_wallet(
async fn validate_txos(wallet: &mut WalletSqlite) -> Result<(), ExitCodes> {
debug!(target: LOG_TARGET, "Starting TXO validations.");

// Unspent TXOs
wallet
.output_manager_service
.validate_txos(TxoValidationType::Unspent, ValidationRetryStrategy::UntilSuccess)
.await
.map_err(|e| {
error!(target: LOG_TARGET, "Error validating Unspent TXOs: {}", e);
ExitCodes::WalletError(e.to_string())
})?;

// Spent TXOs
wallet
.output_manager_service
.validate_txos(TxoValidationType::Spent, ValidationRetryStrategy::UntilSuccess)
.await
.map_err(|e| {
error!(target: LOG_TARGET, "Error validating Spent TXOs: {}", e);
ExitCodes::WalletError(e.to_string())
})?;

// Invalid TXOs
wallet
.output_manager_service
.validate_txos(TxoValidationType::Invalid, ValidationRetryStrategy::UntilSuccess)
.await
.map_err(|e| {
error!(target: LOG_TARGET, "Error validating Invalid TXOs: {}", e);
ExitCodes::WalletError(e.to_string())
})?;
wallet.output_manager_service.validate_txos().await.map_err(|e| {
error!(target: LOG_TARGET, "Error validating Unspent TXOs: {}", e);
ExitCodes::WalletError(e.to_string())
})?;

debug!(target: LOG_TARGET, "TXO validations completed.");
debug!(target: LOG_TARGET, "TXO validations started.");

Ok(())
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ impl TransactionsTab {
format!("{}", local_time.format("%Y-%m-%d %H:%M:%S")),
Style::default().fg(text_color),
)));
let status = if t.cancelled && t.status == TransactionStatus::Coinbase {
let status = if (t.cancelled || !t.valid) && t.status == TransactionStatus::Coinbase {
"Abandoned".to_string()
} else if t.cancelled {
"Cancelled".to_string()
Expand Down
29 changes: 4 additions & 25 deletions applications/tari_console_wallet/src/ui/state/app_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,11 @@ use tari_wallet::{
base_node_service::{handle::BaseNodeEventReceiver, service::BaseNodeState},
connectivity_service::WalletConnectivityHandle,
contacts_service::storage::database::Contact,
output_manager_service::{handle::OutputManagerEventReceiver, service::Balance, TxId, TxoValidationType},
output_manager_service::{handle::OutputManagerEventReceiver, service::Balance, TxId},
transaction_service::{
handle::TransactionEventReceiver,
storage::models::{CompletedTransaction, TransactionStatus},
},
types::ValidationRetryStrategy,
WalletSqlite,
};

Expand Down Expand Up @@ -324,7 +323,7 @@ impl AppState {
self.cached_data
.completed_txs
.iter()
.filter(|tx| !(tx.cancelled && tx.status == TransactionStatus::Coinbase))
.filter(|tx| !((tx.cancelled || !tx.valid) && tx.status == TransactionStatus::Coinbase))
.collect()
} else {
self.cached_data.completed_txs.iter().collect()
Expand Down Expand Up @@ -794,33 +793,13 @@ impl AppStateInner {
let mut output_manager_service = self.wallet.output_manager_service.clone();

task::spawn(async move {
if let Err(e) = txn_service
.validate_transactions(ValidationRetryStrategy::UntilSuccess)
.await
{
if let Err(e) = txn_service.validate_transactions().await {
error!(target: LOG_TARGET, "Problem validating transactions: {}", e);
}

if let Err(e) = output_manager_service
.validate_txos(TxoValidationType::Unspent, ValidationRetryStrategy::UntilSuccess)
.await
{
if let Err(e) = output_manager_service.validate_txos().await {
error!(target: LOG_TARGET, "Problem validating UTXOs: {}", e);
}

if let Err(e) = output_manager_service
.validate_txos(TxoValidationType::Spent, ValidationRetryStrategy::UntilSuccess)
.await
{
error!(target: LOG_TARGET, "Problem validating STXOs: {}", e);
}

if let Err(e) = output_manager_service
.validate_txos(TxoValidationType::Invalid, ValidationRetryStrategy::UntilSuccess)
.await
{
error!(target: LOG_TARGET, "Problem validating Invalid TXOs: {}", e);
}
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ impl WalletEventMonitor {
match result {
Ok(msg) => {
trace!(target: LOG_TARGET, "Output Manager Service Callback Handler event {:?}", msg);
if let OutputManagerEvent::TxoValidationSuccess(_,_) = &*msg {
if let OutputManagerEvent::TxoValidationSuccess(_) = &*msg {
self.trigger_balance_refresh().await;
}
},
Expand Down
4 changes: 2 additions & 2 deletions applications/tari_console_wallet/src/ui/widgets/list_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ impl WindowedListState {
let i = match self.selected {
Some(i) => {
if i >= self.num_items - 1 {
i
0
} else {
i + 1
}
Expand All @@ -101,7 +101,7 @@ impl WindowedListState {
let i = match self.selected {
Some(i) => {
if i == 0 {
i
self.num_items - 1
} else {
i - 1
}
Expand Down
3 changes: 3 additions & 0 deletions base_layer/core/src/base_node/proto/wallet_rpc.proto
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,16 @@ message FetchUtxosResponse {
message QueryDeletedRequest{
repeated uint64 mmr_positions = 1;
google.protobuf.BytesValue chain_must_include_header = 2;
bool include_deleted_block_data = 3;
}

message QueryDeletedResponse {
repeated uint64 deleted_positions = 1;
repeated uint64 not_deleted_positions = 2;
bytes best_block = 3;
uint64 height_of_longest_chain = 4;
repeated bytes blocks_deleted_in = 5;
repeated uint64 heights_deleted_at = 6;
}

message UtxoQueryRequest{
Expand Down
6 changes: 6 additions & 0 deletions base_layer/core/src/base_node/rpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,12 @@ pub trait BaseNodeWalletService: Send + Sync + 'static {
&self,
request: Request<QueryDeletedRequest>,
) -> Result<Response<QueryDeletedResponse>, RpcStatus>;

#[rpc(method = 9)]
async fn get_header_by_height(
&self,
request: Request<u64>,
) -> Result<Response<proto::core::BlockHeader>, RpcStatus>;
}

#[cfg(feature = "base_node")]
Expand Down
39 changes: 33 additions & 6 deletions base_layer/core/src/base_node/rpc/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
use crate::{
base_node::{rpc::BaseNodeWalletService, state_machine_service::states::StateInfo, StateMachineHandle},
chain_storage::{async_db::AsyncBlockchainDb, BlockchainBackend, PrunedOutput, UtxoMinedInfo},
crypto::tari_utilities::Hashable,
mempool::{service::MempoolHandle, TxStorageResponse},
proto,
proto::{
Expand Down Expand Up @@ -351,12 +352,6 @@ impl<B: BlockchainBackend + 'static> BaseNodeWalletService for BaseNodeWalletRpc
.await
.map_err(RpcStatus::log_internal_error(LOG_TARGET))?;

// let deleted = self
// .db
// .fetch_complete_deleted_bitmap_at(metadata.best_block().clone())
// .await
// .map_err(RpcStatus::log_internal_error(LOG_TARGET))?
// .into_bytes();
Ok(Response::new(UtxoQueryResponses {
height_of_longest_chain: metadata.height_of_longest_chain(),
best_block: metadata.best_block().clone(),
Expand Down Expand Up @@ -428,11 +423,28 @@ impl<B: BlockchainBackend + 'static> BaseNodeWalletService for BaseNodeWalletRpc
not_deleted_positions.push(position);
}
}

let mut blocks_deleted_in = Vec::new();
let mut heights_deleted_at = Vec::new();
if message.include_deleted_block_data {
let headers = self
.db
.fetch_headers_of_deleted_positions(deleted_positions.clone())
.await
.map_err(RpcStatus::log_internal_error(LOG_TARGET))?;
for header in headers.iter() {
heights_deleted_at.push(header.height);
blocks_deleted_in.push(header.hash());
}
}

Ok(Response::new(QueryDeletedResponse {
height_of_longest_chain: metadata.height_of_longest_chain(),
best_block: metadata.best_block().clone(),
deleted_positions,
not_deleted_positions,
blocks_deleted_in,
heights_deleted_at,
}))
}

Expand Down Expand Up @@ -467,4 +479,19 @@ impl<B: BlockchainBackend + 'static> BaseNodeWalletService for BaseNodeWalletRpc

Ok(Response::new(header.into()))
}

async fn get_header_by_height(
&self,
request: Request<u64>,
) -> Result<Response<proto::core::BlockHeader>, RpcStatus> {
let height = request.into_message();
let header = self
.db()
.fetch_header(height)
.await
.map_err(RpcStatus::log_internal_error(LOG_TARGET))?
.ok_or_else(|| RpcStatus::not_found(format!("Header not found at height {}", height)))?;

Ok(Response::new(header.into()))
}
}
2 changes: 2 additions & 0 deletions base_layer/core/src/chain_storage/async_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,8 @@ impl<B: BlockchainBackend + 'static> AsyncBlockchainDb<B> {

make_async_fn!(fetch_complete_deleted_bitmap_at(hash: HashOutput) -> CompleteDeletedBitmap, "fetch_deleted_bitmap");

make_async_fn!(fetch_headers_of_deleted_positions(mmr_position: Vec<u64>) -> Vec<BlockHeader>, "fetch_headers_of_deleted_positions");

make_async_fn!(get_stats() -> DbBasicStats, "get_stats");

make_async_fn!(fetch_total_size_stats() -> DbTotalSizeStats, "fetch_total_size_stats");
Expand Down
54 changes: 54 additions & 0 deletions base_layer/core/src/chain_storage/blockchain_database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -923,6 +923,60 @@ where B: BlockchainBackend
))
}

// TODO Update this method to make use of a (mmr_position, block_height) index in the lmdb backend instead of a
// linear search
pub fn fetch_headers_of_deleted_positions(
&self,
mut mmr_positions: Vec<u64>,
) -> Result<Vec<BlockHeader>, ChainStorageError> {
let db = self.db_read_access()?;

if mmr_positions.is_empty() {
return Ok(Vec::new());
}

let chain_metadata = db.fetch_chain_metadata()?;
let mut height = chain_metadata.height_of_longest_chain();

mmr_positions.sort_unstable();

let mut headers = Vec::with_capacity(mmr_positions.len());

let mut target = mmr_positions.pop().expect("mmr_positions cannot be empty here");

loop {
if target > u32::MAX as u64 {
// TODO: in future, bitmap may support higher than u32
return Err(ChainStorageError::InvalidArguments {
func: "fetch_header_of_deleted_position",
arg: "mmr_positions",
message: "mmr_positions should fit into u32".into(),
});
}
if db
.fetch_block_accumulated_data_by_height(height)
.or_not_found("BlockAccumulatedData", "height", height.to_string())?
.deleted()
.contains(target as u32)
{
headers.push(fetch_header(&(*db), height)?);
if let Some(pos) = mmr_positions.pop() {
target = pos;
} else {
break;
}
}

if height > 0 {
height -= 1;
} else {
break;
}
}

Ok(headers)
}

pub fn get_stats(&self) -> Result<DbBasicStats, ChainStorageError> {
let lock = self.db_read_access()?;
lock.get_stats()
Expand Down
Loading

0 comments on commit 66fb85c

Please sign in to comment.