Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: optimize pending transactions inbound query #3500

Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions base_layer/wallet/src/transaction_service/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,8 @@ pub enum TransactionStorageError {
AeadError(String),
#[error("Transaction (TxId: '{0}') is not mined")]
TransactionNotMined(TxId),
#[error("Conversion error: `{0}`")]
ByteArrayError(#[from] ByteArrayError),
}

/// This error type is used to return TransactionServiceErrors from inside a Transaction Service protocol but also
Expand Down
6 changes: 3 additions & 3 deletions base_layer/wallet/src/transaction_service/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1478,9 +1478,9 @@ where
&mut self,
join_handles: &mut FuturesUnordered<JoinHandle<Result<u64, TransactionServiceProtocolError>>>,
) -> Result<(), TransactionServiceError> {
let inbound_txs = self.db.get_pending_inbound_transactions().await?;
for (tx_id, tx) in inbound_txs {
self.restart_receive_transaction_protocol(tx_id, tx.source_public_key.clone(), join_handles);
let inbound_txs = self.db.get_pending_inbound_transaction_sender_info().await?;
for txn in inbound_txs {
self.restart_receive_transaction_protocol(txn.tx_id, txn.source_public_key, join_handles);
}

Ok(())
Expand Down
23 changes: 20 additions & 3 deletions base_layer/wallet/src/transaction_service/storage/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use aes_gcm::Aes256Gcm;
use chrono::Utc;
use log::*;

use crate::transaction_service::storage::models::WalletTransaction;
use crate::transaction_service::storage::{models::WalletTransaction, sqlite_db::InboundTransactionSenderInfo};
use std::{
collections::HashMap,
fmt,
Expand Down Expand Up @@ -123,11 +123,14 @@ pub trait TransactionBackend: Send + Sync + Clone {
num_confirmations: u64,
is_confirmed: bool,
) -> Result<(), TransactionStorageError>;

/// Clears the mined block and height of a transaction
fn set_transaction_as_unmined(&self, tx_id: TxId) -> Result<(), TransactionStorageError>;

/// Mark all transactions as unvalidated
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe Clean Code would call this comment redundant, it is literally the name of the method.

fn mark_all_transactions_as_unvalidated(&self) -> Result<(), TransactionStorageError>;
/// Get transaction sender info for all pending inbound transactions
fn get_pending_inbound_transaction_sender_info(
&self,
) -> Result<Vec<InboundTransactionSenderInfo>, TransactionStorageError>;
}

#[derive(Clone, PartialEq)]
Expand Down Expand Up @@ -793,6 +796,20 @@ where T: TransactionBackend + 'static
.map_err(|err| TransactionStorageError::BlockingTaskSpawnError(err.to_string()))??;
Ok(())
}

pub async fn get_pending_inbound_transaction_sender_info(
&self,
) -> Result<Vec<InboundTransactionSenderInfo>, TransactionStorageError> {
let db_clone = self.db.clone();

let t = tokio::task::spawn_blocking(move || match db_clone.get_pending_inbound_transaction_sender_info() {
Ok(v) => Ok(v),
Err(e) => log_error(DbKey::PendingInboundTransactions, e),
})
.await
.map_err(|err| TransactionStorageError::BlockingTaskSpawnError(err.to_string()))??;
Ok(t)
}
}

impl Display for DbKey {
Expand Down
84 changes: 81 additions & 3 deletions base_layer/wallet/src/transaction_service/storage/sqlite_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1116,6 +1116,67 @@ impl TransactionBackend for TransactionServiceSqliteDatabase {
);
Ok(())
}

fn get_pending_inbound_transaction_sender_info(
&self,
) -> Result<Vec<InboundTransactionSenderInfo>, TransactionStorageError> {
let start = Instant::now();
let conn = self.database_connection.acquire_lock();
let acquire_lock = start.elapsed();
let mut sender_info: Vec<InboundTransactionSenderInfo> = vec![];
match InboundTransactionSenderInfoSql::get_pending_inbound_transaction_sender_info(&(*conn)) {
Ok(info) => {
for item in info {
sender_info.push(InboundTransactionSenderInfo::try_from(item)?);
}
},
Err(e) => return Err(e),
}
trace!(
target: LOG_TARGET,
"sqlite profile - get_pending_inbound_transaction_sender_info: lock {} + db_op {} = {} ms",
acquire_lock.as_millis(),
(start.elapsed() - acquire_lock).as_millis(),
start.elapsed().as_millis()
);
Ok(sender_info)
}
}

#[derive(Debug, PartialEq)]
pub struct InboundTransactionSenderInfo {
pub(crate) tx_id: u64,
pub(crate) source_public_key: CommsPublicKey,
}

impl TryFrom<InboundTransactionSenderInfoSql> for InboundTransactionSenderInfo {
type Error = TransactionStorageError;

fn try_from(i: InboundTransactionSenderInfoSql) -> Result<Self, Self::Error> {
Ok(Self {
tx_id: i.tx_id as u64,
source_public_key: CommsPublicKey::from_bytes(&*i.source_public_key)
.map_err(TransactionStorageError::ByteArrayError)?,
})
}
}

#[derive(Clone, Queryable)]
pub struct InboundTransactionSenderInfoSql {
pub tx_id: i64,
pub source_public_key: Vec<u8>,
}

impl InboundTransactionSenderInfoSql {
pub fn get_pending_inbound_transaction_sender_info(
conn: &SqliteConnection,
) -> Result<Vec<InboundTransactionSenderInfoSql>, TransactionStorageError> {
let query_result = inbound_transactions::table
.select((inbound_transactions::tx_id, inbound_transactions::source_public_key))
.filter(inbound_transactions::cancelled.eq(false as i32))
.load::<InboundTransactionSenderInfoSql>(conn)?;
Ok(query_result)
}
}

#[derive(Clone, Debug, Queryable, Insertable, PartialEq)]
Expand Down Expand Up @@ -1796,6 +1857,7 @@ mod test {
models::{CompletedTransaction, InboundTransaction, OutboundTransaction},
sqlite_db::{
CompletedTransactionSql,
InboundTransactionSenderInfo,
InboundTransactionSql,
OutboundTransactionSql,
TransactionServiceSqliteDatabase,
Expand Down Expand Up @@ -2400,7 +2462,7 @@ mod test {
}

#[test]
fn test_get_tranactions_to_be_rebroadcast() {
fn test_customized_transactional_queries() {
let db_name = format!("{}.sqlite3", string(8).as_str());
let temp_dir = tempdir().unwrap();
let db_folder = temp_dir.path().to_str().unwrap().to_string();
Expand All @@ -2411,6 +2473,7 @@ mod test {

embedded_migrations::run_with_output(&conn, &mut std::io::stdout()).expect("Migration failed");

let mut info_list_reference: Vec<InboundTransactionSenderInfo> = vec![];
for i in 0..1000 {
let (valid, cancelled, status, coinbase_block_height) = match i % 13 {
0 => (true, i % 3 == 0, TransactionStatus::Completed, None),
Expand Down Expand Up @@ -2452,20 +2515,35 @@ mod test {
mined_height: None,
mined_in_block: None,
};
let completed_tx_sql = CompletedTransactionSql::try_from(completed_tx).unwrap();
let completed_tx_sql = CompletedTransactionSql::try_from(completed_tx.clone()).unwrap();
completed_tx_sql.commit(&conn).unwrap();

let inbound_tx = InboundTransaction::from(completed_tx);
let inbound_tx_sql = InboundTransactionSql::try_from(inbound_tx.clone()).unwrap();
inbound_tx_sql.commit(&conn).unwrap();

if !cancelled {
info_list_reference.push(InboundTransactionSenderInfo {
tx_id: inbound_tx.tx_id,
source_public_key: inbound_tx.source_public_key,
})
}
}

let connection = WalletDbConnection::new(conn, None);
let db1 = TransactionServiceSqliteDatabase::new(connection, None);

let txn_list = db1.get_transactions_to_be_broadcast().unwrap();
assert_eq!(db1.get_transactions_to_be_broadcast().unwrap().len(), 185);
assert_eq!(txn_list.len(), 185);
for txn in &txn_list {
assert!(txn.status == TransactionStatus::Completed || txn.status == TransactionStatus::Broadcast);
assert!(txn.valid);
assert!(!txn.cancelled);
assert!(txn.coinbase_block_height == None || txn.coinbase_block_height == Some(0));
}

let info_list = db1.get_pending_inbound_transaction_sender_info().unwrap();
assert_eq!(info_list.len(), 941);
assert_eq!(info_list, info_list_reference);
}
}