Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add sql query to obtain balance #3446

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use std::{
sync::Arc,
};
use tari_common_types::types::{BlindingFactor, Commitment, HashOutput, PrivateKey};
use tari_core::transactions::{tari_amount::MicroTari, transaction::TransactionOutput};
use tari_core::transactions::transaction::TransactionOutput;

const LOG_TARGET: &str = "wallet::output_manager_service::database";

Expand All @@ -51,7 +51,6 @@ pub trait OutputManagerBackend: Send + Sync + Clone {
/// Modify the state the of the backend with a write operation
fn write(&self, op: WriteOperation) -> Result<Option<DbValue>, OutputManagerStorageError>;
fn fetch_pending_incoming_outputs(&self) -> Result<Vec<DbUnblindedOutput>, OutputManagerStorageError>;
fn fetch_pending_outgoing_outputs(&self) -> Result<Vec<DbUnblindedOutput>, OutputManagerStorageError>;

fn set_received_output_mined_height(
&self,
Expand Down Expand Up @@ -119,6 +118,8 @@ pub trait OutputManagerBackend: Send + Sync + Clone {
fn set_coinbase_abandoned(&self, tx_id: TxId, abandoned: bool) -> Result<(), OutputManagerStorageError>;
/// Reinstate a cancelled inbound output
fn reinstate_cancelled_inbound_output(&self, tx_id: TxId) -> Result<(), OutputManagerStorageError>;
/// Return the available, time locked, pending incoming and pending outgoing balance
fn get_balance(&self, tip: Option<u64>) -> Result<Balance, OutputManagerStorageError>;
}

/// Holds the state of the KeyManager being used by the Output Manager Service
Expand Down Expand Up @@ -276,69 +277,9 @@ where T: OutputManagerBackend + 'static

pub async fn get_balance(&self, current_chain_tip: Option<u64>) -> Result<Balance, OutputManagerStorageError> {
let db_clone = self.db.clone();
let db_clone2 = self.db.clone();
let db_clone3 = self.db.clone();
let db_clone4 = self.db.clone();

let unspent_outputs = tokio::task::spawn_blocking(move || match db_clone.fetch(&DbKey::UnspentOutputs) {
Ok(None) => log_error(
DbKey::UnspentOutputs,
OutputManagerStorageError::UnexpectedResult("Could not retrieve unspent outputs".to_string()),
),
Ok(Some(DbValue::UnspentOutputs(uo))) => Ok(uo),
Ok(Some(other)) => unexpected_result(DbKey::UnspentOutputs, other),
Err(e) => log_error(DbKey::UnspentOutputs, e),
})
.await
.map_err(|err| OutputManagerStorageError::BlockingTaskSpawnError(err.to_string()))??;

let pending_incoming_outputs = tokio::task::spawn_blocking(move || db_clone2.fetch_pending_incoming_outputs())
.await
.map_err(|err| OutputManagerStorageError::BlockingTaskSpawnError(err.to_string()))??;

let pending_outgoing_outputs = tokio::task::spawn_blocking(move || db_clone3.fetch_pending_outgoing_outputs())
.await
.map_err(|err| OutputManagerStorageError::BlockingTaskSpawnError(err.to_string()))??;

let time_locked_balance = if let Some(tip) = current_chain_tip {
let time_locked_outputs = tokio::task::spawn_blocking(move || {
db_clone4.fetch(&DbKey::TimeLockedUnspentOutputs(tip))?.ok_or_else(|| {
OutputManagerStorageError::UnexpectedResult("Time-locked Outputs cannot be retrieved".to_string())
})
})
tokio::task::spawn_blocking(move || db_clone.get_balance(current_chain_tip))
.await
.map_err(|err| OutputManagerStorageError::BlockingTaskSpawnError(err.to_string()))??;
if let DbValue::UnspentOutputs(time_locked_uo) = time_locked_outputs {
Some(
time_locked_uo
.iter()
.fold(MicroTari::from(0), |acc, x| acc + x.unblinded_output.value),
)
} else {
None
}
} else {
None
};

let available_balance = unspent_outputs
.iter()
.fold(MicroTari::from(0), |acc, x| acc + x.unblinded_output.value);

let pending_incoming = pending_incoming_outputs
.iter()
.fold(MicroTari::from(0), |acc, x| acc + x.unblinded_output.value);

let pending_outgoing = pending_outgoing_outputs
.iter()
.fold(MicroTari::from(0), |acc, x| acc + x.unblinded_output.value);

Ok(Balance {
available_balance,
time_locked_balance,
pending_incoming_balance: pending_incoming,
pending_outgoing_balance: pending_outgoing,
})
.map_err(|err| OutputManagerStorageError::BlockingTaskSpawnError(err.to_string()))?
}

/// This method is called when a transaction is built to be sent. It will encumber unspent outputs against a pending
Expand Down
125 changes: 106 additions & 19 deletions base_layer/wallet/src/output_manager_service/storage/sqlite_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
use crate::{
output_manager_service::{
error::OutputManagerStorageError,
service::Balance,
storage::{
database::{DbKey, DbKeyValuePair, DbValue, KeyManagerState, OutputManagerBackend, WriteOperation},
models::{DbUnblindedOutput, KnownOneSidedPaymentScript, OutputStatus},
Expand All @@ -38,7 +39,7 @@ use crate::{
};
use aes_gcm::{aead::Error as AeadError, Aes256Gcm, Error};
use chrono::{NaiveDateTime, Utc};
use diesel::{prelude::*, result::Error as DieselError, SqliteConnection};
use diesel::{prelude::*, result::Error as DieselError, sql_query, SqliteConnection};
use log::*;
use std::{
convert::{TryFrom, TryInto},
Expand Down Expand Up @@ -330,24 +331,6 @@ impl OutputManagerBackend for OutputManagerSqliteDatabase {
.collect::<Result<Vec<_>, _>>()
}

fn fetch_pending_outgoing_outputs(&self) -> Result<Vec<DbUnblindedOutput>, OutputManagerStorageError> {
let conn = self.database_connection.acquire_lock();

let mut outputs = OutputSql::index_status(OutputStatus::EncumberedToBeSpent, &conn)?;
outputs.extend(OutputSql::index_status(
OutputStatus::ShortTermEncumberedToBeSpent,
&conn,
)?);
outputs.extend(OutputSql::index_status(OutputStatus::SpentMinedUnconfirmed, &conn)?);
for o in outputs.iter_mut() {
self.decrypt_if_necessary(o)?;
}
outputs
.iter()
.map(|o| DbUnblindedOutput::try_from(o.clone()))
.collect::<Result<Vec<_>, _>>()
}

fn write(&self, op: WriteOperation) -> Result<Option<DbValue>, OutputManagerStorageError> {
let conn = self.database_connection.acquire_lock();

Expand Down Expand Up @@ -650,6 +633,12 @@ impl OutputManagerBackend for OutputManagerSqliteDatabase {
}
}

fn get_balance(&self, tip: Option<u64>) -> Result<Balance, OutputManagerStorageError> {
let conn = self.database_connection.acquire_lock();

OutputSql::get_balance(tip, &(*conn))
}

fn cancel_pending_transaction(&self, tx_id: TxId) -> Result<(), OutputManagerStorageError> {
let conn = self.database_connection.acquire_lock();

Expand Down Expand Up @@ -1047,6 +1036,104 @@ impl OutputSql {
.first::<OutputSql>(conn)?)
}

/// Return the available, time locked, pending incoming and pending outgoing balance
pub fn get_balance(tip: Option<u64>, conn: &SqliteConnection) -> Result<Balance, OutputManagerStorageError> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will be clearer what the tip argument is for if we rename it to something like tip_for_timelock_calculation. Currently it is not clear what the tip is for and that it actually affects whether the timelock balance is returned or not. The comment should also make the behaviour of the function clear when the tip is provided or not.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will make work these two comments into a new small PR after the merge.

#[derive(QueryableByName, Clone)]
struct BalanceQueryResult {
#[sql_type = "diesel::sql_types::BigInt"]
amount: i64,
#[sql_type = "diesel::sql_types::Text"]
category: String,
}
let balance_query_result = if let Some(val) = tip {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the generic val here makes reading the code difficult below.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As above

let balance_query = sql_query(
"SELECT coalesce(sum(value), 0) as amount, 'available_balance' as category \
FROM outputs WHERE status = ? \
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should take the mined heights into account when the tip is specified

Suggested change
FROM outputs WHERE status = ? \
FROM outputs WHERE status = ? and mined_height < $val and deleted_at_height > $val\

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is a job for the validation code though, I think it is cleaner that this function is based purely on the status.

UNION ALL \
SELECT coalesce(sum(value), 0) as amount, 'time_locked_balance' as category \
FROM outputs WHERE status = ? AND maturity > ? \
UNION ALL \
SELECT coalesce(sum(value), 0) as amount, 'pending_incoming_balance' as category \
FROM outputs WHERE status = ? OR status = ? OR status = ? \
UNION ALL \
SELECT coalesce(sum(value), 0) as amount, 'pending_outgoing_balance' as category \
FROM outputs WHERE status = ? OR status = ? OR status = ?",
)
// available_balance
.bind::<diesel::sql_types::Integer, _>(OutputStatus::Unspent as i32)
// time_locked_balance
.bind::<diesel::sql_types::Integer, _>(OutputStatus::Unspent as i32)
.bind::<diesel::sql_types::BigInt, _>(val as i64)
// pending_incoming_balance
.bind::<diesel::sql_types::Integer, _>(OutputStatus::EncumberedToBeReceived as i32)
.bind::<diesel::sql_types::Integer, _>(OutputStatus::ShortTermEncumberedToBeReceived as i32)
.bind::<diesel::sql_types::Integer, _>(OutputStatus::UnspentMinedUnconfirmed as i32)
// pending_outgoing_balance
.bind::<diesel::sql_types::Integer, _>(OutputStatus::EncumberedToBeSpent as i32)
.bind::<diesel::sql_types::Integer, _>(OutputStatus::ShortTermEncumberedToBeSpent as i32)
.bind::<diesel::sql_types::Integer, _>(OutputStatus::SpentMinedUnconfirmed as i32);
balance_query.load::<BalanceQueryResult>(conn)?
} else {
let balance_query = sql_query(
"SELECT coalesce(sum(value), 0) as amount, 'available_balance' as category \
FROM outputs WHERE status = ? \
UNION ALL \
SELECT coalesce(sum(value), 0) as amount, 'pending_incoming_balance' as category \
FROM outputs WHERE status = ? OR status = ? OR status = ? \
UNION ALL \
SELECT coalesce(sum(value), 0) as amount, 'pending_outgoing_balance' as category \
FROM outputs WHERE status = ? OR status = ? OR status = ?",
)
// available_balance
.bind::<diesel::sql_types::Integer, _>(OutputStatus::Unspent as i32)
// pending_incoming_balance
.bind::<diesel::sql_types::Integer, _>(OutputStatus::EncumberedToBeReceived as i32)
.bind::<diesel::sql_types::Integer, _>(OutputStatus::ShortTermEncumberedToBeReceived as i32)
.bind::<diesel::sql_types::Integer, _>(OutputStatus::UnspentMinedUnconfirmed as i32)
// pending_outgoing_balance
.bind::<diesel::sql_types::Integer, _>(OutputStatus::EncumberedToBeSpent as i32)
.bind::<diesel::sql_types::Integer, _>(OutputStatus::ShortTermEncumberedToBeSpent as i32)
.bind::<diesel::sql_types::Integer, _>(OutputStatus::SpentMinedUnconfirmed as i32);
balance_query.load::<BalanceQueryResult>(conn)?
};
let mut available_balance = None;
let mut time_locked_balance = Some(None);
let mut pending_incoming_balance = None;
let mut pending_outgoing_balance = None;
for balance in balance_query_result.clone() {
match balance.category.as_str() {
"available_balance" => available_balance = Some(MicroTari::from(balance.amount as u64)),
"time_locked_balance" => time_locked_balance = Some(Some(MicroTari::from(balance.amount as u64))),
"pending_incoming_balance" => pending_incoming_balance = Some(MicroTari::from(balance.amount as u64)),
"pending_outgoing_balance" => pending_outgoing_balance = Some(MicroTari::from(balance.amount as u64)),
_ => {
return Err(OutputManagerStorageError::UnexpectedResult(
"Unexpected category in balance query".to_string(),
))
},
}
}

Ok(Balance {
available_balance: available_balance.ok_or_else(|| {
OutputManagerStorageError::UnexpectedResult("Available balance could not be calculated".to_string())
})?,
time_locked_balance: time_locked_balance.ok_or_else(|| {
OutputManagerStorageError::UnexpectedResult("Time locked balance could not be calculated".to_string())
})?,
pending_incoming_balance: pending_incoming_balance.ok_or_else(|| {
OutputManagerStorageError::UnexpectedResult(
"Pending incoming balance could not be calculated".to_string(),
)
})?,
pending_outgoing_balance: pending_outgoing_balance.ok_or_else(|| {
OutputManagerStorageError::UnexpectedResult(
"Pending outgoing balance could not be calculated".to_string(),
)
})?,
})
}

pub fn find_by_commitment(
commitment: &[u8],
conn: &SqliteConnection,
Expand Down
15 changes: 15 additions & 0 deletions base_layer/wallet/tests/output_manager_service/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -759,6 +759,7 @@ async fn test_get_balance() {
let balance = oms.get_balance().await.unwrap();

assert_eq!(output_val, balance.available_balance);
assert_eq!(output_val, balance.time_locked_balance.unwrap());
assert_eq!(recv_value + change_val, balance.pending_incoming_balance);
assert_eq!(output_val, balance.pending_outgoing_balance);
}
Expand All @@ -776,6 +777,10 @@ async fn sending_transaction_with_short_term_clear() {
let (_ti, uo) = make_input(&mut OsRng.clone(), available_balance, &factories.commitment);
oms.add_output(uo).await.unwrap();

let balance = oms.get_balance().await.unwrap();
assert_eq!(balance.available_balance, available_balance);
assert_eq!(balance.time_locked_balance.unwrap(), available_balance);

// Check that funds are encumbered and then unencumbered if the pending tx is not confirmed before restart
let _stp = oms
.prepare_transaction_to_send(
Expand All @@ -790,13 +795,16 @@ async fn sending_transaction_with_short_term_clear() {
.unwrap();

let balance = oms.get_balance().await.unwrap();
assert_eq!(balance.available_balance, MicroTari::from(0));
assert_eq!(balance.time_locked_balance.unwrap(), MicroTari::from(0));
assert_eq!(balance.pending_outgoing_balance, available_balance);

drop(oms);
let (mut oms, _, _shutdown, _, _, _, _, _) = setup_output_manager_service(backend.clone(), true).await;

let balance = oms.get_balance().await.unwrap();
assert_eq!(balance.available_balance, available_balance);
assert_eq!(balance.time_locked_balance.unwrap(), available_balance);

// Check that is the pending tx is confirmed that the encumberance persists after restart
let stp = oms
Expand All @@ -817,6 +825,8 @@ async fn sending_transaction_with_short_term_clear() {
let (mut oms, _, _shutdown, _, _, _, _, _) = setup_output_manager_service(backend, true).await;

let balance = oms.get_balance().await.unwrap();
assert_eq!(balance.available_balance, MicroTari::from(0));
assert_eq!(balance.time_locked_balance.unwrap(), MicroTari::from(0));
assert_eq!(balance.pending_outgoing_balance, available_balance);
}

Expand Down Expand Up @@ -1080,6 +1090,7 @@ async fn test_txo_validation() {
balance.available_balance,
MicroTari::from(output2_value) + MicroTari::from(output3_value)
);
assert_eq!(balance.available_balance, balance.time_locked_balance.unwrap());
assert_eq!(balance.pending_outgoing_balance, MicroTari::from(output1_value));
assert_eq!(
balance.pending_incoming_balance,
Expand Down Expand Up @@ -1179,6 +1190,7 @@ async fn test_txo_validation() {
balance.available_balance,
MicroTari::from(output2_value) + MicroTari::from(output3_value)
);
assert_eq!(balance.available_balance, balance.time_locked_balance.unwrap());

assert_eq!(oms.get_unspent_outputs().await.unwrap().len(), 2);

Expand Down Expand Up @@ -1226,6 +1238,7 @@ async fn test_txo_validation() {
);
assert_eq!(balance.pending_outgoing_balance, MicroTari::from(0));
assert_eq!(balance.pending_incoming_balance, MicroTari::from(0));
assert_eq!(balance.available_balance, balance.time_locked_balance.unwrap());

// Trigger another validation and only Output3 should be checked
oms.validate_txos().await.unwrap();
Expand Down Expand Up @@ -1331,6 +1344,7 @@ async fn test_txo_validation() {
balance.pending_incoming_balance,
MicroTari::from(output1_value) - MicroTari::from(900_300)
);
assert_eq!(balance.available_balance, balance.time_locked_balance.unwrap());

// Now we will update the mined_height in the responses so that the outputs on the reorged chain are confirmed
// Output 1: Spent in Block 5 - Confirmed
Expand Down Expand Up @@ -1390,6 +1404,7 @@ async fn test_txo_validation() {
);
assert_eq!(balance.pending_outgoing_balance, MicroTari::from(0));
assert_eq!(balance.pending_incoming_balance, MicroTari::from(0));
assert_eq!(balance.available_balance, balance.time_locked_balance.unwrap());
}

#[tokio::test]
Expand Down
9 changes: 9 additions & 0 deletions base_layer/wallet/tests/output_manager_service/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,15 @@ pub fn test_db_backend<T: OutputManagerBackend + 'static>(backend: T) {
assert_eq!(time_locked_outputs.len(), 0);
let time_locked_balance = unspent_outputs[4].unblinded_output.value;

for i in 0..4usize {
let balance = runtime.block_on(db.get_balance(Some(i as u64))).unwrap();
let mut sum = MicroTari::from(0);
for output in unspent_outputs.iter().take(5).skip(i + 1) {
sum += output.unblinded_output.value;
}
assert_eq!(balance.time_locked_balance.unwrap(), sum);
}

unspent_outputs.sort();

let outputs = runtime.block_on(db.fetch_sorted_unspent_outputs()).unwrap();
Expand Down