Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat!: integration of new transaction and output validation #3352

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 0 additions & 5 deletions applications/tari_base_node/src/command_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,6 @@ pub enum StatusOutput {
Full,
}

pub enum StatusOutput {
Log,
Full,
}

pub struct CommandHandler {
executor: runtime::Handle,
config: Arc<GlobalConfig>,
Expand Down
4 changes: 2 additions & 2 deletions applications/tari_base_node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ mod status_line;
mod utils;

use crate::command_handler::{CommandHandler, StatusOutput};
use futures::{future::Fuse, pin_mut, FutureExt};
use futures::{pin_mut, FutureExt};
use log::*;
use opentelemetry::{self, global, KeyValue};
use parser::Parser;
Expand All @@ -119,7 +119,7 @@ use tari_shutdown::{Shutdown, ShutdownSignal};
use tokio::{
runtime,
task,
time::{self, Delay},
time::{self},
};
use tonic::transport::Server;
use tracing_subscriber::{layer::SubscriberExt, Registry};
Expand Down
51 changes: 9 additions & 42 deletions applications/tari_console_wallet/src/init/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,9 @@ use tari_shutdown::ShutdownSignal;
use tari_wallet::{
base_node_service::config::BaseNodeServiceConfig,
error::{WalletError, WalletStorageError},
output_manager_service::{config::OutputManagerServiceConfig, TxoValidationType},
output_manager_service::config::OutputManagerServiceConfig,
storage::{database::WalletDatabase, sqlite_utilities::initialize_sqlite_database_backends},
transaction_service::{
config::{TransactionRoutingMechanism, TransactionServiceConfig},
tasks::start_transaction_validation_and_broadcast_protocols::start_transaction_validation_and_broadcast_protocols,
},
types::ValidationRetryStrategy,
transaction_service::config::{TransactionRoutingMechanism, TransactionServiceConfig},
Wallet,
WalletConfig,
WalletSqlite,
Expand Down Expand Up @@ -391,6 +387,7 @@ pub async fn init_wallet(
prevent_fee_gt_amount: config.prevent_fee_gt_amount,
event_channel_size: config.output_manager_event_channel_size,
base_node_update_publisher_channel_size: config.base_node_update_publisher_channel_size,
num_confirmations_required: config.transaction_num_confirmations_required,
..Default::default()
}),
config.network.into(),
Expand Down Expand Up @@ -500,12 +497,7 @@ pub async fn start_wallet(
if let Err(e) = wallet.transaction_service.restart_transaction_protocols().await {
error!(target: LOG_TARGET, "Problem restarting transaction protocols: {}", e);
}
if let Err(e) = start_transaction_validation_and_broadcast_protocols(
wallet.transaction_service.clone(),
ValidationRetryStrategy::UntilSuccess,
)
.await
{
if let Err(e) = wallet.transaction_service.validate_transactions().await {
error!(
target: LOG_TARGET,
"Problem validating and restarting transaction protocols: {}", e
Expand All @@ -521,37 +513,12 @@ pub async fn start_wallet(
async fn validate_txos(wallet: &mut WalletSqlite) -> Result<(), ExitCodes> {
debug!(target: LOG_TARGET, "Starting TXO validations.");

// Unspent TXOs
wallet
.output_manager_service
.validate_txos(TxoValidationType::Unspent, ValidationRetryStrategy::UntilSuccess)
.await
.map_err(|e| {
error!(target: LOG_TARGET, "Error validating Unspent TXOs: {}", e);
ExitCodes::WalletError(e.to_string())
})?;

// Spent TXOs
wallet
.output_manager_service
.validate_txos(TxoValidationType::Spent, ValidationRetryStrategy::UntilSuccess)
.await
.map_err(|e| {
error!(target: LOG_TARGET, "Error validating Spent TXOs: {}", e);
ExitCodes::WalletError(e.to_string())
})?;

// Invalid TXOs
wallet
.output_manager_service
.validate_txos(TxoValidationType::Invalid, ValidationRetryStrategy::UntilSuccess)
.await
.map_err(|e| {
error!(target: LOG_TARGET, "Error validating Invalid TXOs: {}", e);
ExitCodes::WalletError(e.to_string())
})?;
wallet.output_manager_service.validate_txos().await.map_err(|e| {
error!(target: LOG_TARGET, "Error validating Unspent TXOs: {}", e);
ExitCodes::WalletError(e.to_string())
})?;

debug!(target: LOG_TARGET, "TXO validations completed.");
debug!(target: LOG_TARGET, "TXO validations started.");

Ok(())
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ impl TransactionsTab {
format!("{}", local_time.format("%Y-%m-%d %H:%M:%S")),
Style::default().fg(text_color),
)));
let status = if t.cancelled && t.status == TransactionStatus::Coinbase {
let status = if (t.cancelled || !t.valid) && t.status == TransactionStatus::Coinbase {
"Abandoned".to_string()
} else if t.cancelled {
"Cancelled".to_string()
Expand Down
29 changes: 4 additions & 25 deletions applications/tari_console_wallet/src/ui/state/app_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,11 @@ use tari_wallet::{
base_node_service::{handle::BaseNodeEventReceiver, service::BaseNodeState},
connectivity_service::WalletConnectivityHandle,
contacts_service::storage::database::Contact,
output_manager_service::{handle::OutputManagerEventReceiver, service::Balance, TxId, TxoValidationType},
output_manager_service::{handle::OutputManagerEventReceiver, service::Balance, TxId},
transaction_service::{
handle::TransactionEventReceiver,
storage::models::{CompletedTransaction, TransactionStatus},
},
types::ValidationRetryStrategy,
WalletSqlite,
};

Expand Down Expand Up @@ -324,7 +323,7 @@ impl AppState {
self.cached_data
.completed_txs
.iter()
.filter(|tx| !(tx.cancelled && tx.status == TransactionStatus::Coinbase))
.filter(|tx| !((tx.cancelled || !tx.valid) && tx.status == TransactionStatus::Coinbase))
.collect()
} else {
self.cached_data.completed_txs.iter().collect()
Expand Down Expand Up @@ -794,33 +793,13 @@ impl AppStateInner {
let mut output_manager_service = self.wallet.output_manager_service.clone();

task::spawn(async move {
if let Err(e) = txn_service
.validate_transactions(ValidationRetryStrategy::UntilSuccess)
.await
{
if let Err(e) = txn_service.validate_transactions().await {
error!(target: LOG_TARGET, "Problem validating transactions: {}", e);
}

if let Err(e) = output_manager_service
.validate_txos(TxoValidationType::Unspent, ValidationRetryStrategy::UntilSuccess)
.await
{
if let Err(e) = output_manager_service.validate_txos().await {
error!(target: LOG_TARGET, "Problem validating UTXOs: {}", e);
}

if let Err(e) = output_manager_service
.validate_txos(TxoValidationType::Spent, ValidationRetryStrategy::UntilSuccess)
.await
{
error!(target: LOG_TARGET, "Problem validating STXOs: {}", e);
}

if let Err(e) = output_manager_service
.validate_txos(TxoValidationType::Invalid, ValidationRetryStrategy::UntilSuccess)
.await
{
error!(target: LOG_TARGET, "Problem validating Invalid TXOs: {}", e);
}
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ impl WalletEventMonitor {
match result {
Ok(msg) => {
trace!(target: LOG_TARGET, "Output Manager Service Callback Handler event {:?}", msg);
if let OutputManagerEvent::TxoValidationSuccess(_,_) = &*msg {
if let OutputManagerEvent::TxoValidationSuccess(_) = &*msg {
self.trigger_balance_refresh().await;
}
},
Expand Down
4 changes: 2 additions & 2 deletions applications/tari_console_wallet/src/ui/widgets/list_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ impl WindowedListState {
let i = match self.selected {
Some(i) => {
if i >= self.num_items - 1 {
i
0
} else {
i + 1
}
Expand All @@ -101,7 +101,7 @@ impl WindowedListState {
let i = match self.selected {
Some(i) => {
if i == 0 {
i
self.num_items - 1
} else {
i - 1
}
Expand Down
3 changes: 3 additions & 0 deletions base_layer/core/src/base_node/proto/wallet_rpc.proto
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,16 @@ message FetchUtxosResponse {
message QueryDeletedRequest{
repeated uint64 mmr_positions = 1;
google.protobuf.BytesValue chain_must_include_header = 2;
bool include_deleted_block_data = 3;
}

message QueryDeletedResponse {
repeated uint64 deleted_positions = 1;
repeated uint64 not_deleted_positions = 2;
bytes best_block = 3;
uint64 height_of_longest_chain = 4;
repeated bytes blocks_deleted_in = 5;
repeated uint64 heights_deleted_at = 6;
}

message UtxoQueryRequest{
Expand Down
6 changes: 6 additions & 0 deletions base_layer/core/src/base_node/rpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,12 @@ pub trait BaseNodeWalletService: Send + Sync + 'static {
&self,
request: Request<QueryDeletedRequest>,
) -> Result<Response<QueryDeletedResponse>, RpcStatus>;

#[rpc(method = 9)]
async fn get_header_by_height(
&self,
request: Request<u64>,
) -> Result<Response<proto::core::BlockHeader>, RpcStatus>;
}

#[cfg(feature = "base_node")]
Expand Down
39 changes: 33 additions & 6 deletions base_layer/core/src/base_node/rpc/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
use crate::{
base_node::{rpc::BaseNodeWalletService, state_machine_service::states::StateInfo, StateMachineHandle},
chain_storage::{async_db::AsyncBlockchainDb, BlockchainBackend, PrunedOutput, UtxoMinedInfo},
crypto::tari_utilities::Hashable,
mempool::{service::MempoolHandle, TxStorageResponse},
proto,
proto::{
Expand Down Expand Up @@ -351,12 +352,6 @@ impl<B: BlockchainBackend + 'static> BaseNodeWalletService for BaseNodeWalletRpc
.await
.map_err(RpcStatus::log_internal_error(LOG_TARGET))?;

// let deleted = self
// .db
// .fetch_complete_deleted_bitmap_at(metadata.best_block().clone())
// .await
// .map_err(RpcStatus::log_internal_error(LOG_TARGET))?
// .into_bytes();
Ok(Response::new(UtxoQueryResponses {
height_of_longest_chain: metadata.height_of_longest_chain(),
best_block: metadata.best_block().clone(),
Expand Down Expand Up @@ -428,11 +423,28 @@ impl<B: BlockchainBackend + 'static> BaseNodeWalletService for BaseNodeWalletRpc
not_deleted_positions.push(position);
}
}

let mut blocks_deleted_in = Vec::new();
let mut heights_deleted_at = Vec::new();
if message.include_deleted_block_data {
let headers = self
.db
.fetch_headers_of_deleted_positions(deleted_positions.clone())
.await
.map_err(RpcStatus::log_internal_error(LOG_TARGET))?;
for header in headers.iter() {
heights_deleted_at.push(header.height);
blocks_deleted_in.push(header.hash());
}
}

Ok(Response::new(QueryDeletedResponse {
height_of_longest_chain: metadata.height_of_longest_chain(),
best_block: metadata.best_block().clone(),
deleted_positions,
not_deleted_positions,
blocks_deleted_in,
heights_deleted_at,
}))
}

Expand Down Expand Up @@ -467,4 +479,19 @@ impl<B: BlockchainBackend + 'static> BaseNodeWalletService for BaseNodeWalletRpc

Ok(Response::new(header.into()))
}

async fn get_header_by_height(
&self,
request: Request<u64>,
) -> Result<Response<proto::core::BlockHeader>, RpcStatus> {
let height = request.into_message();
let header = self
.db()
.fetch_header(height)
.await
.map_err(RpcStatus::log_internal_error(LOG_TARGET))?
.ok_or_else(|| RpcStatus::not_found(format!("Header not found at height {}", height)))?;

Ok(Response::new(header.into()))
}
}
2 changes: 2 additions & 0 deletions base_layer/core/src/chain_storage/async_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,8 @@ impl<B: BlockchainBackend + 'static> AsyncBlockchainDb<B> {

make_async_fn!(fetch_complete_deleted_bitmap_at(hash: HashOutput) -> CompleteDeletedBitmap, "fetch_deleted_bitmap");

make_async_fn!(fetch_headers_of_deleted_positions(mmr_position: Vec<u64>) -> Vec<BlockHeader>, "fetch_headers_of_deleted_positions");

make_async_fn!(get_stats() -> DbBasicStats, "get_stats");

make_async_fn!(fetch_total_size_stats() -> DbTotalSizeStats, "fetch_total_size_stats");
Expand Down
54 changes: 54 additions & 0 deletions base_layer/core/src/chain_storage/blockchain_database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -923,6 +923,60 @@ where B: BlockchainBackend
))
}

// TODO Update this method to make use of a (mmr_position, block_height) index in the lmdb backend instead of a
// linear search
pub fn fetch_headers_of_deleted_positions(
&self,
mut mmr_positions: Vec<u64>,
) -> Result<Vec<BlockHeader>, ChainStorageError> {
let db = self.db_read_access()?;

if mmr_positions.is_empty() {
return Ok(Vec::new());
}

let chain_metadata = db.fetch_chain_metadata()?;
let mut height = chain_metadata.height_of_longest_chain();

mmr_positions.sort_unstable();

let mut headers = Vec::with_capacity(mmr_positions.len());

let mut target = mmr_positions.pop().expect("mmr_positions cannot be empty here");

loop {
if target > u32::MAX as u64 {
// TODO: in future, bitmap may support higher than u32
return Err(ChainStorageError::InvalidArguments {
func: "fetch_header_of_deleted_position",
arg: "mmr_positions",
message: "mmr_positions should fit into u32".into(),
});
}
if db
.fetch_block_accumulated_data_by_height(height)
.or_not_found("BlockAccumulatedData", "height", height.to_string())?
.deleted()
.contains(target as u32)
{
headers.push(fetch_header(&(*db), height)?);
if let Some(pos) = mmr_positions.pop() {
target = pos;
} else {
break;
}
}

if height > 0 {
height -= 1;
} else {
break;
}
}

Ok(headers)
}

pub fn get_stats(&self) -> Result<DbBasicStats, ChainStorageError> {
let lock = self.db_read_access()?;
lock.get_stats()
Expand Down
Loading