Skip to content

Commit

Permalink
fix(wallet): detect base node change during long-running protocols (#…
Browse files Browse the repository at this point in the history
…4610)

Description
---

Interrupt txo_validation_protocol and txo_validation_task if base node is changed

Motivation and Context
---
These long-running tasks continue to run using the same base node even if it has changed. This PR checks for base node changes and interrupts the tasks at the correct points.

Other tasks may also need to be interrupted in a similar way.

Ref #4599 - this may fix this issue, but more info is needed to confirm

How Has This Been Tested?
---
Manually, changing the base node and checking that the tasks end.
  • Loading branch information
sdbondi authored Sep 6, 2022
1 parent c6c47fc commit 2a2a8b6
Show file tree
Hide file tree
Showing 9 changed files with 127 additions and 90 deletions.
5 changes: 5 additions & 0 deletions base_layer/wallet/src/connectivity_service/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,11 @@ impl WalletConnectivityMock {
self.base_node_watch.send(Some(base_node_peer));
}

pub async fn base_node_changed(&mut self) -> Option<Peer> {
self.base_node_watch.changed().await;
self.base_node_watch.borrow().as_ref().cloned()
}

pub fn send_shutdown(&self) {
self.base_node_wallet_rpc_client.send(None);
self.base_node_sync_rpc_client.send(None);
Expand Down
2 changes: 2 additions & 0 deletions base_layer/wallet/src/output_manager_service/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ pub enum OutputManagerError {
ServiceError(String),
#[error("Base node is not synced")]
BaseNodeNotSynced,
#[error("Base node changed")]
BaseNodeChanged,
#[error("Invalid Sender Message Type")]
InvalidSenderMessage,
#[error("Coinbase build error: `{0}`")]
Expand Down
76 changes: 53 additions & 23 deletions base_layer/wallet/src/output_manager_service/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -534,40 +534,70 @@ where
}

fn validate_outputs(&mut self) -> Result<u64, OutputManagerError> {
if !self.resources.connectivity.is_base_node_set() {
return Err(OutputManagerError::NoBaseNodeKeysProvided);
}
let current_base_node = self
.resources
.connectivity
.get_current_base_node_id()
.ok_or(OutputManagerError::NoBaseNodeKeysProvided)?;
let id = OsRng.next_u64();
let utxo_validation = TxoValidationTask::new(
let txo_validation = TxoValidationTask::new(
id,
self.resources.db.clone(),
self.resources.connectivity.clone(),
self.resources.event_publisher.clone(),
self.resources.config.clone(),
);

let shutdown = self.resources.shutdown_signal.clone();
let mut shutdown = self.resources.shutdown_signal.clone();
let mut base_node_watch = self.resources.connectivity.get_current_base_node_watcher();
let event_publisher = self.resources.event_publisher.clone();
tokio::spawn(async move {
match utxo_validation.execute(shutdown).await {
Ok(id) => {
info!(
target: LOG_TARGET,
"UTXO Validation Protocol (Id: {}) completed successfully", id
);
},
Err(OutputManagerProtocolError { id, error }) => {
warn!(
target: LOG_TARGET,
"Error completing UTXO Validation Protocol (Id: {}): {:?}", id, error
);
if let Err(e) = event_publisher.send(Arc::new(OutputManagerEvent::TxoValidationFailure(id))) {
debug!(
target: LOG_TARGET,
"Error sending event because there are no subscribers: {:?}", e
);
let exec_fut = txo_validation.execute();
tokio::pin!(exec_fut);
loop {
tokio::select! {
result = &mut exec_fut => {
match result {
Ok(id) => {
info!(
target: LOG_TARGET,
"UTXO Validation Protocol (Id: {}) completed successfully", id
);
return;
},
Err(OutputManagerProtocolError { id, error }) => {
warn!(
target: LOG_TARGET,
"Error completing UTXO Validation Protocol (Id: {}): {:?}", id, error
);
if let Err(e) = event_publisher.send(Arc::new(OutputManagerEvent::TxoValidationFailure(id))) {
debug!(
target: LOG_TARGET,
"Error sending event because there are no subscribers: {:?}", e
);
}

return;
},
}
},
_ = shutdown.wait() => {
debug!(target: LOG_TARGET, "TXO Validation Protocol (Id: {}) shutting down because the system is shutting down", id);
return;
},
_ = base_node_watch.changed() => {
if let Some(peer) = base_node_watch.borrow().as_ref() {
if peer.node_id != current_base_node {
debug!(
target: LOG_TARGET,
"TXO Validation Protocol (Id: {}) cancelled because base node changed", id
);
return;
}
}

}
},
}
}
});
Ok(id)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,14 @@ use std::{

use log::*;
use tari_common_types::types::{BlockHash, FixedHash};
use tari_comms::protocol::rpc::RpcError::RequestFailed;
use tari_comms::{peer_manager::Peer, protocol::rpc::RpcError::RequestFailed};
use tari_core::{
base_node::rpc::BaseNodeWalletRpcClient,
blocks::BlockHeader,
proto::base_node::{QueryDeletedRequest, UtxoQueryRequest},
};
use tari_shutdown::ShutdownSignal;
use tari_utilities::hex::Hex;
use tokio::sync::watch;

use crate::{
connectivity_service::WalletConnectivityInterface,
Expand All @@ -54,6 +54,7 @@ const LOG_TARGET: &str = "wallet::output_service::txo_validation_task";
pub struct TxoValidationTask<TBackend, TWalletConnectivity> {
operation_id: u64,
db: OutputManagerDatabase<TBackend>,
base_node_watch: watch::Receiver<Option<Peer>>,
connectivity: TWalletConnectivity,
event_publisher: OutputManagerEventSender,
config: OutputManagerServiceConfig,
Expand All @@ -74,23 +75,30 @@ where
Self {
operation_id,
db,
base_node_watch: connectivity.get_current_base_node_watcher(),
connectivity,
event_publisher,
config,
}
}

pub async fn execute(mut self, _shutdown: ShutdownSignal) -> Result<u64, OutputManagerProtocolError> {
pub async fn execute(mut self) -> Result<u64, OutputManagerProtocolError> {
let mut base_node_client = self
.connectivity
.obtain_base_node_wallet_rpc_client()
.await
.ok_or(OutputManagerError::Shutdown)
.for_protocol(self.operation_id)?;

let base_node_peer = self
.base_node_watch
.borrow()
.as_ref()
.map(|p| p.node_id.clone())
.ok_or_else(|| OutputManagerProtocolError::new(self.operation_id, OutputManagerError::BaseNodeChanged))?;
debug!(
target: LOG_TARGET,
"Starting TXO validation protocol (Id: {})", self.operation_id,
"Starting TXO validation protocol with peer {} (Id: {})", base_node_peer, self.operation_id,
);

let last_mined_header = self.check_for_reorgs(&mut base_node_client).await?;
Expand All @@ -99,10 +107,11 @@ where

self.update_spent_outputs(&mut base_node_client, last_mined_header)
.await?;

self.publish_event(OutputManagerEvent::TxoValidationSuccess(self.operation_id));
debug!(
target: LOG_TARGET,
"Finished TXO validation protocol (Id: {})", self.operation_id,
"Finished TXO validation protocol from base node {} (Id: {})", base_node_peer, self.operation_id,
);
Ok(self.operation_id)
}
Expand Down Expand Up @@ -233,6 +242,7 @@ where
batch.len(),
self.operation_id
);

let (mined, unmined, tip_height) = self
.query_base_node_for_outputs(batch, wallet_client)
.await
Expand Down
2 changes: 2 additions & 0 deletions base_layer/wallet/src/transaction_service/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ pub enum TransactionServiceError {
AttemptedToBroadcastCoinbaseTransaction(TxId),
#[error("No Base Node public keys are provided for Base chain broadcast and monitoring")]
NoBaseNodeKeysProvided,
#[error("Base node changed during {task_name}")]
BaseNodeChanged { task_name: &'static str },
#[error("Error sending data to Protocol via registered channels")]
ProtocolChannelError,
#[error("Transaction detected as rejected by mempool")]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use std::{
use log::*;
use tari_common_types::{
transaction::{TransactionStatus, TxId},
types::BlockHash,
types::{BlockHash, Signature},
};
use tari_comms::protocol::rpc::{RpcError::RequestFailed, RpcStatusCode::NotFound};
use tari_core::{
Expand All @@ -51,6 +51,7 @@ use crate::{
handle::{TransactionEvent, TransactionEventSender},
storage::{
database::{TransactionBackend, TransactionDatabase},
models::TxCancellationReason,
sqlite_db::UnconfirmedTransactionInfo,
},
},
Expand All @@ -67,9 +68,6 @@ pub struct TransactionValidationProtocol<TTransactionBackend, TWalletConnectivit
event_publisher: TransactionEventSender,
output_manager_handle: OutputManagerHandle,
}
use tari_common_types::types::Signature;

use crate::transaction_service::storage::models::TxCancellationReason;

#[allow(unused_variables)]
impl<TTransactionBackend, TWalletConnectivity> TransactionValidationProtocol<TTransactionBackend, TWalletConnectivity>
Expand Down Expand Up @@ -504,10 +502,6 @@ where
tx_id: TxId,
status: &TransactionStatus,
) -> Result<(), TransactionServiceProtocolError<OperationId>> {
self.db
.set_transaction_as_unmined(tx_id)
.for_protocol(self.operation_id)?;

if *status == TransactionStatus::Coinbase {
if let Err(e) = self.output_manager_handle.set_coinbase_abandoned(tx_id, false).await {
warn!(
Expand All @@ -520,6 +514,10 @@ where
};
}

self.db
.set_transaction_as_unmined(tx_id)
.for_protocol(self.operation_id)?;

self.publish_event(TransactionEvent::TransactionBroadcast(tx_id));
Ok(())
}
Expand Down
33 changes: 29 additions & 4 deletions base_layer/wallet/src/transaction_service/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2180,9 +2180,12 @@ where
JoinHandle<Result<OperationId, TransactionServiceProtocolError<OperationId>>>,
>,
) -> Result<OperationId, TransactionServiceError> {
if !self.connectivity().is_base_node_set() {
return Err(TransactionServiceError::NoBaseNodeKeysProvided);
}
let current_base_node = self
.resources
.connectivity
.get_current_base_node_id()
.ok_or(TransactionServiceError::NoBaseNodeKeysProvided)?;

trace!(target: LOG_TARGET, "Starting transaction validation protocol");
let id = OperationId::new_random();

Expand All @@ -2195,7 +2198,29 @@ where
self.resources.output_manager_service.clone(),
);

let join_handle = tokio::spawn(protocol.execute());
let mut base_node_watch = self.connectivity().get_current_base_node_watcher();

let join_handle = tokio::spawn(async move {
let exec_fut = protocol.execute();
tokio::pin!(exec_fut);
loop {
tokio::select! {
result = &mut exec_fut => {
return result;
},
_ = base_node_watch.changed() => {
if let Some(peer) = base_node_watch.borrow().as_ref() {
if peer.node_id != current_base_node {
debug!(target: LOG_TARGET, "Base node changed, exiting transaction validation protocol");
return Err(TransactionServiceProtocolError::new(id, TransactionServiceError::BaseNodeChanged {
task_name: "transaction validation_protocol",
}));
}
}
}
}
}
});
join_handles.push(join_handle);

Ok(id)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,17 +146,20 @@ async fn setup_output_manager_service<T: OutputManagerBackend + 'static, U: KeyM
mock_base_node_service.set_default_base_node_state();
task::spawn(mock_base_node_service.run());

let wallet_connectivity_mock = create_wallet_connectivity_mock();
let mut wallet_connectivity_mock = create_wallet_connectivity_mock();
// let (connectivity, connectivity_mock) = create_connectivity_mock();
// let connectivity_mock_state = connectivity_mock.get_shared_state();
// task::spawn(connectivity_mock.run());
let server_node_identity = build_node_identity(PeerFeatures::COMMUNICATION_NODE);

wallet_connectivity_mock.notify_base_node_set(server_node_identity.to_peer());
wallet_connectivity_mock.base_node_changed().await;

let service = BaseNodeWalletRpcMockService::new();
let rpc_service_state = service.get_state();

let server = BaseNodeWalletRpcServer::new(service);
let protocol_name = server.as_protocol_name();
let server_node_identity = build_node_identity(PeerFeatures::COMMUNICATION_NODE);

let mut mock_server = MockRpcServer::new(server, server_node_identity.clone());
mock_server.serve();
Expand Down Expand Up @@ -1301,7 +1304,6 @@ async fn test_txo_validation() {

let mut oms = setup_output_manager_service(backend, ks_backend, true).await;

oms.wallet_connectivity_mock.notify_base_node_set(oms.node_id.to_peer());
// Now we add the connection
let mut connection = oms
.mock_rpc_service
Expand Down Expand Up @@ -1852,7 +1854,6 @@ async fn test_txo_revalidation() {

let mut oms = setup_output_manager_service(backend, ks_backend, true).await;

oms.wallet_connectivity_mock.notify_base_node_set(oms.node_id.to_peer());
// Now we add the connection
let mut connection = oms
.mock_rpc_service
Expand Down
Loading

0 comments on commit 2a2a8b6

Please sign in to comment.