Skip to content

Commit

Permalink
fix(wallet): handle receiver cancelling an inbound transaction that i…
Browse files Browse the repository at this point in the history
…s later received (#3177)
  • Loading branch information
stringhandler committed Aug 17, 2021
2 parents 23a7d64 + a16d04b commit c79e53c
Show file tree
Hide file tree
Showing 19 changed files with 563 additions and 127 deletions.
10 changes: 10 additions & 0 deletions applications/tari_app_grpc/proto/wallet.proto
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ service Wallet {
rpc GetNetworkStatus(Empty) returns (NetworkStatusResponse);
// List currently connected peers
rpc ListConnectedPeers(Empty) returns (ListConnectedPeersResponse);
// Cancel pending transaction
rpc CancelTransaction (CancelTransactionRequest) returns (CancelTransactionResponse);
}

message GetVersionRequest { }
Expand Down Expand Up @@ -185,3 +187,11 @@ message ImportUtxosResponse {
repeated uint64 tx_ids = 1;
}

message CancelTransactionRequest {
uint64 tx_id = 1;
}

message CancelTransactionResponse {
bool is_success = 1;
string failure_message = 2;
}
27 changes: 27 additions & 0 deletions applications/tari_console_wallet/src/grpc/wallet_grpc_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,33 @@ impl wallet_server::Wallet for WalletGrpcServer {

Ok(Response::new(resp))
}

async fn cancel_transaction(
&self,
request: Request<tari_rpc::CancelTransactionRequest>,
) -> Result<Response<tari_rpc::CancelTransactionResponse>, Status> {
let message = request.into_inner();
debug!(
target: LOG_TARGET,
"Incoming gRPC request to Cancel Transaction (TxId: {})", message.tx_id,
);
let mut transaction_service = self.get_transaction_service();

match transaction_service.cancel_transaction(message.tx_id).await {
Ok(_) => {
return Ok(Response::new(tari_rpc::CancelTransactionResponse {
is_success: true,
failure_message: "".to_string(),
}))
},
Err(e) => {
return Ok(Response::new(tari_rpc::CancelTransactionResponse {
is_success: false,
failure_message: e.to_string(),
}))
},
}
}
}

fn convert_wallet_transaction_into_transaction_info(
Expand Down
14 changes: 14 additions & 0 deletions base_layer/wallet/src/output_manager_service/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ pub enum OutputManagerRequest {
ScanForRecoverableOutputs(Vec<TransactionOutput>),
ScanOutputs(Vec<TransactionOutput>),
AddKnownOneSidedPaymentScript(KnownOneSidedPaymentScript),
ReinstateCancelledInboundTx(TxId),
}

impl fmt::Display for OutputManagerRequest {
Expand Down Expand Up @@ -115,6 +116,7 @@ impl fmt::Display for OutputManagerRequest {
ScanForRecoverableOutputs(_) => write!(f, "ScanForRecoverableOutputs"),
ScanOutputs(_) => write!(f, "ScanRewindAndImportOutputs"),
AddKnownOneSidedPaymentScript(_) => write!(f, "AddKnownOneSidedPaymentScript"),
ReinstateCancelledInboundTx(_) => write!(f, "ReinstateCancelledInboundTx"),
}
}
}
Expand Down Expand Up @@ -149,6 +151,7 @@ pub enum OutputManagerResponse {
RewoundOutputs(Vec<UnblindedOutput>),
ScanOutputs(Vec<UnblindedOutput>),
AddKnownOneSidedPaymentScript,
ReinstatedCancelledInboundTx,
}

pub type OutputManagerEventSender = broadcast::Sender<Arc<OutputManagerEvent>>;
Expand Down Expand Up @@ -545,4 +548,15 @@ impl OutputManagerHandle {
_ => Err(OutputManagerError::UnexpectedApiResponse),
}
}

pub async fn reinstate_cancelled_inbound_transaction(&mut self, tx_id: TxId) -> Result<(), OutputManagerError> {
match self
.handle
.call(OutputManagerRequest::ReinstateCancelledInboundTx(tx_id))
.await??
{
OutputManagerResponse::ReinstatedCancelledInboundTx => Ok(()),
_ => Err(OutputManagerError::UnexpectedApiResponse),
}
}
}
26 changes: 26 additions & 0 deletions base_layer/wallet/src/output_manager_service/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ use crate::{
types::{HashDigest, ValidationRetryStrategy},
};
use blake2::Digest;
use chrono::Utc;
use diesel::result::{DatabaseErrorKind, Error as DieselError};
use futures::{pin_mut, StreamExt};
use log::*;
Expand Down Expand Up @@ -344,6 +345,10 @@ where TBackend: OutputManagerBackend + 'static
.add_known_script(known_script)
.await
.map(|_| OutputManagerResponse::AddKnownOneSidedPaymentScript),
OutputManagerRequest::ReinstateCancelledInboundTx(tx_id) => self
.reinstate_cancelled_inbound_transaction(tx_id)
.await
.map(|_| OutputManagerResponse::ReinstatedCancelledInboundTx),
}
}

Expand Down Expand Up @@ -897,6 +902,27 @@ where TBackend: OutputManagerBackend + 'static
Ok(self.resources.db.cancel_pending_transaction_outputs(tx_id).await?)
}

/// Restore the pending transaction encumberance and output for an inbound transaction that was previously
/// cancelled.
async fn reinstate_cancelled_inbound_transaction(&mut self, tx_id: TxId) -> Result<(), OutputManagerError> {
self.resources.db.reinstate_inbound_output(tx_id).await?;

self.resources
.db
.add_pending_transaction_outputs(PendingTransactionOutputs {
tx_id,
outputs_to_be_spent: Vec::new(),
outputs_to_be_received: Vec::new(),
timestamp: Utc::now().naive_utc(),
coinbase_block_height: None,
})
.await?;

self.confirm_encumberance(tx_id).await?;

Ok(())
}

/// Go through the pending transaction and if any have existed longer than the specified duration, cancel them
async fn timeout_pending_transactions(&mut self, period: Duration) -> Result<(), OutputManagerError> {
Ok(self.resources.db.timeout_pending_transaction_outputs(period).await?)
Expand Down
49 changes: 48 additions & 1 deletion base_layer/wallet/src/output_manager_service/storage/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
use crate::output_manager_service::{
error::OutputManagerStorageError,
service::Balance,
storage::models::{DbUnblindedOutput, KnownOneSidedPaymentScript},
storage::models::{DbUnblindedOutput, KnownOneSidedPaymentScript, OutputStatus},
TxId,
};
use aes_gcm::Aes256Gcm;
Expand Down Expand Up @@ -135,6 +135,7 @@ pub enum DbKey {
KeyManagerState,
InvalidOutputs,
KnownOneSidedPaymentScripts,
OutputsByTxIdAndStatus(TxId, OutputStatus),
}

#[derive(Debug)]
Expand All @@ -149,6 +150,7 @@ pub enum DbValue {
KeyManagerState(KeyManagerState),
KnownOneSidedPaymentScripts(Vec<KnownOneSidedPaymentScript>),
AnyOutput(Box<DbUnblindedOutput>),
AnyOutputs(Vec<DbUnblindedOutput>),
}

pub enum DbKeyValuePair {
Expand All @@ -158,6 +160,7 @@ pub enum DbKeyValuePair {
PendingTransactionOutputs(TxId, Box<PendingTransactionOutputs>),
KeyManagerState(KeyManagerState),
KnownOneSidedPaymentScripts(KnownOneSidedPaymentScript),
UpdateOutputStatus(Commitment, OutputStatus),
}

pub enum WriteOperation {
Expand Down Expand Up @@ -710,6 +713,48 @@ where T: OutputManagerBackend + 'static
.map_err(|err| OutputManagerStorageError::BlockingTaskSpawnError(err.to_string()))??;
Ok(())
}

/// Check if a single cancelled inbound output exists that matches this TxID, if it does then return its status to
/// EncumberedToBeReceived
pub async fn reinstate_inbound_output(&self, tx_id: TxId) -> Result<(), OutputManagerStorageError> {
let db_clone = self.db.clone();
let outputs = tokio::task::spawn_blocking(move || {
match db_clone.fetch(&DbKey::OutputsByTxIdAndStatus(tx_id, OutputStatus::CancelledInbound)) {
Ok(None) => Err(OutputManagerStorageError::ValueNotFound),
Ok(Some(DbValue::AnyOutputs(o))) => Ok(o),
Ok(Some(other)) => unexpected_result(
DbKey::OutputsByTxIdAndStatus(tx_id, OutputStatus::CancelledInbound),
other,
),
Err(e) => log_error(DbKey::OutputsByTxIdAndStatus(tx_id, OutputStatus::CancelledInbound), e),
}
})
.await
.map_err(|err| OutputManagerStorageError::BlockingTaskSpawnError(err.to_string()))
.and_then(|inner_result| inner_result)?;

if outputs.len() != 1 {
return Err(OutputManagerStorageError::UnexpectedResult(
"There should be only 1 output for a cancelled inbound transaction but more were found".to_string(),
));
}
let db_clone2 = self.db.clone();

tokio::task::spawn_blocking(move || {
db_clone2.write(WriteOperation::Insert(DbKeyValuePair::UpdateOutputStatus(
outputs
.first()
.expect("Must be only one element in outputs")
.commitment
.clone(),
OutputStatus::EncumberedToBeReceived,
)))
})
.await
.map_err(|err| OutputManagerStorageError::BlockingTaskSpawnError(err.to_string()))??;

Ok(())
}
}

fn unexpected_result<T>(req: DbKey, res: DbValue) -> Result<T, OutputManagerStorageError> {
Expand All @@ -734,6 +779,7 @@ impl Display for DbKey {
DbKey::TimeLockedUnspentOutputs(_t) => f.write_str(&"Timelocked Outputs"),
DbKey::KnownOneSidedPaymentScripts => f.write_str(&"Known claiming scripts"),
DbKey::AnyOutputByCommitment(_) => f.write_str(&"AnyOutputByCommitment"),
DbKey::OutputsByTxIdAndStatus(_, _) => f.write_str(&"OutputsByTxIdAndStatus"),
}
}
}
Expand All @@ -751,6 +797,7 @@ impl Display for DbValue {
DbValue::InvalidOutputs(_) => f.write_str("Invalid Outputs"),
DbValue::KnownOneSidedPaymentScripts(_) => f.write_str(&"Known claiming scripts"),
DbValue::AnyOutput(_) => f.write_str(&"Any Output"),
DbValue::AnyOutputs(_) => f.write_str(&"Any Outputs"),
}
}
}
Expand Down
11 changes: 11 additions & 0 deletions base_layer/wallet/src/output_manager_service/storage/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,3 +105,14 @@ impl PartialEq for KnownOneSidedPaymentScript {
self.script_hash == other.script_hash
}
}

/// The status of a given output
#[derive(Copy, Clone, Debug, PartialEq)]
pub enum OutputStatus {
Unspent,
Spent,
EncumberedToBeReceived,
EncumberedToBeSpent,
Invalid,
CancelledInbound,
}
54 changes: 42 additions & 12 deletions base_layer/wallet/src/output_manager_service/storage/sqlite_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use crate::{
PendingTransactionOutputs,
WriteOperation,
},
models::{DbUnblindedOutput, KnownOneSidedPaymentScript},
models::{DbUnblindedOutput, KnownOneSidedPaymentScript, OutputStatus},
},
TxId,
},
Expand Down Expand Up @@ -105,6 +105,7 @@ impl OutputManagerSqliteDatabase {
}
}
impl OutputManagerBackend for OutputManagerSqliteDatabase {
#[allow(clippy::cognitive_complexity)]
fn fetch(&self, key: &DbKey) -> Result<Option<DbValue>, OutputManagerStorageError> {
let conn = self.database_connection.acquire_lock();

Expand Down Expand Up @@ -135,6 +136,7 @@ impl OutputManagerBackend for OutputManagerSqliteDatabase {
None
},
},

DbKey::AnyOutputByCommitment(commitment) => {
match OutputSql::find_by_commitment(&commitment.to_vec(), &(*conn)) {
Ok(mut o) => {
Expand Down Expand Up @@ -173,6 +175,18 @@ impl OutputManagerBackend for OutputManagerSqliteDatabase {
None
},
},
DbKey::OutputsByTxIdAndStatus(tx_id, status) => {
let mut outputs = OutputSql::find_by_tx_id_and_status(*tx_id, *status, &(*conn))?;
for o in outputs.iter_mut() {
self.decrypt_if_necessary(o)?;
}
Some(DbValue::AnyOutputs(
outputs
.iter()
.map(|o| DbUnblindedOutput::try_from(o.clone()))
.collect::<Result<Vec<_>, _>>()?,
))
},
DbKey::UnspentOutputs => {
let mut outputs = OutputSql::index_status(OutputStatus::Unspent, &(*conn))?;
for o in outputs.iter_mut() {
Expand Down Expand Up @@ -273,6 +287,7 @@ impl OutputManagerBackend for OutputManagerSqliteDatabase {
Ok(result)
}

#[allow(clippy::cognitive_complexity)]
fn write(&self, op: WriteOperation) -> Result<Option<DbValue>, OutputManagerStorageError> {
let conn = self.database_connection.acquire_lock();

Expand Down Expand Up @@ -337,6 +352,20 @@ impl OutputManagerBackend for OutputManagerSqliteDatabase {
self.encrypt_if_necessary(&mut script_sql)?;
script_sql.commit(&(*conn))?
},
DbKeyValuePair::UpdateOutputStatus(commitment, status) => {
let output = OutputSql::find_by_commitment(&commitment.to_vec(), &(*conn))?;
output.update(
UpdateOutput {
status: Some(status),
tx_id: None,
spending_key: None,
script_private_key: None,
metadata_signature_nonce: None,
metadata_signature_u_key: None,
},
&(*conn),
)?;
},
},
WriteOperation::Remove(k) => match k {
DbKey::SpentOutput(s) => match OutputSql::find_status(&s.to_vec(), OutputStatus::Spent, &(*conn)) {
Expand Down Expand Up @@ -409,6 +438,7 @@ impl OutputManagerBackend for OutputManagerSqliteDatabase {
DbKey::InvalidOutputs => return Err(OutputManagerStorageError::OperationNotSupported),
DbKey::TimeLockedUnspentOutputs(_) => return Err(OutputManagerStorageError::OperationNotSupported),
DbKey::KnownOneSidedPaymentScripts => return Err(OutputManagerStorageError::OperationNotSupported),
DbKey::OutputsByTxIdAndStatus(_, _) => return Err(OutputManagerStorageError::OperationNotSupported),
},
}

Expand Down Expand Up @@ -840,17 +870,6 @@ fn pending_transaction_outputs_from_sql_outputs(
})
}

/// The status of a given output
#[derive(PartialEq)]
enum OutputStatus {
Unspent,
Spent,
EncumberedToBeReceived,
EncumberedToBeSpent,
Invalid,
CancelledInbound,
}

impl TryFrom<i32> for OutputStatus {
type Error = OutputManagerStorageError;

Expand Down Expand Up @@ -1011,6 +1030,17 @@ impl OutputSql {
Ok(request.first::<OutputSql>(conn)?)
}

pub fn find_by_tx_id_and_status(
tx_id: TxId,
status: OutputStatus,
conn: &SqliteConnection,
) -> Result<Vec<OutputSql>, OutputManagerStorageError> {
Ok(outputs::table
.filter(outputs::tx_id.eq(Some(tx_id as i64)))
.filter(outputs::status.eq(status as i32))
.load(conn)?)
}

/// Find outputs via tx_id that are encumbered. Any outputs that are encumbered cannot be marked as spent.
pub fn find_by_tx_id_and_encumbered(
tx_id: TxId,
Expand Down
Loading

0 comments on commit c79e53c

Please sign in to comment.