From 25c4d99699438526725701dff167e3c608af7ad5 Mon Sep 17 00:00:00 2001 From: Hansie Odendaal <39146854+hansieodendaal@users.noreply.github.com> Date: Tue, 20 Sep 2022 09:08:04 +0200 Subject: [PATCH] feat: wallet optimize output manager db operations (#4663) Description --- Optimized output manager and contacts liveness database operations to do as much work as possible inside the SQL database instead of inside Rust code. These changes are most noticeable where a query would return a number of outputs and followed by updating each output one by one. Motivation and Context --- See #4621 How Has This Been Tested? --- - Passed unit tests - Passed cucumber tests - System-level tests --- .../src/contacts_service/storage/sqlite_db.rs | 119 ++++++------ .../storage/sqlite_db/mod.rs | 172 +++++++++--------- .../storage/sqlite_db/output_sql.rs | 23 +++ .../protocols/transaction_receive_protocol.rs | 2 +- common_sqlite/src/connection_options.rs | 6 +- 5 files changed, 182 insertions(+), 140 deletions(-) diff --git a/base_layer/wallet/src/contacts_service/storage/sqlite_db.rs b/base_layer/wallet/src/contacts_service/storage/sqlite_db.rs index b1b4889fff..a599c6cc02 100644 --- a/base_layer/wallet/src/contacts_service/storage/sqlite_db.rs +++ b/base_layer/wallet/src/contacts_service/storage/sqlite_db.rs @@ -80,57 +80,43 @@ impl ContactsBackend for ContactsServiceSqliteDatabase { match op { WriteOperation::Upsert(kvp) => match *kvp { - DbKeyValuePair::Contact(k, c) => match ContactSql::find_by_public_key(&k.to_vec(), &conn) { - Ok(found_c) => { - let _contact_sql = found_c.update( - UpdateContact { - alias: Some(c.alias), - last_seen: None, - latency: None, - }, - &conn, - )?; - }, - Err(_) => { + DbKeyValuePair::Contact(k, c) => { + if ContactSql::find_by_public_key_and_update(&conn, &k.to_vec(), UpdateContact { + alias: Some(c.clone().alias), + last_seen: None, + latency: None, + }) + .is_err() + { ContactSql::from(c).commit(&conn)?; - }, + } }, DbKeyValuePair::LastSeen(..) => return Err(ContactsServiceStorageError::OperationNotSupported), }, WriteOperation::UpdateLastSeen(kvp) => match *kvp { DbKeyValuePair::LastSeen(node_id, date_time, latency) => { - match ContactSql::find_by_node_id(&node_id.to_vec(), &conn) { - Ok(found_c) => { - let contact = found_c.update( - UpdateContact { - alias: None, - last_seen: Some(Some(date_time)), - latency: Some(latency), - }, - &conn, - )?; - return Ok(Some(DbValue::PublicKey(Box::new( - PublicKey::from_vec(&contact.public_key) - .map_err(|_| ContactsServiceStorageError::ConversionError)?, - )))); - }, - Err(e) => return Err(e), - } + let contact = ContactSql::find_by_node_id_and_update(&conn, &node_id.to_vec(), UpdateContact { + alias: None, + last_seen: Some(Some(date_time)), + latency: Some(latency), + })?; + return Ok(Some(DbValue::PublicKey(Box::new( + PublicKey::from_vec(&contact.public_key) + .map_err(|_| ContactsServiceStorageError::ConversionError)?, + )))); }, DbKeyValuePair::Contact(..) => return Err(ContactsServiceStorageError::OperationNotSupported), }, WriteOperation::Remove(k) => match k { - DbKey::Contact(k) => match ContactSql::find_by_public_key(&k.to_vec(), &conn) { + DbKey::Contact(k) => match ContactSql::find_by_public_key_and_delete(&conn, &k.to_vec()) { Ok(c) => { - c.delete(&conn)?; return Ok(Some(DbValue::Contact(Box::new(Contact::try_from(c)?)))); }, Err(ContactsServiceStorageError::DieselError(DieselError::NotFound)) => (), Err(e) => return Err(e), }, - DbKey::ContactId(id) => match ContactSql::find_by_node_id(&id.to_vec(), &conn) { + DbKey::ContactId(id) => match ContactSql::find_by_node_id_and_delete(&conn, &id.to_vec()) { Ok(c) => { - c.delete(&conn)?; return Ok(Some(DbValue::Contact(Box::new(Contact::try_from(c)?)))); }, Err(ContactsServiceStorageError::DieselError(DieselError::NotFound)) => (), @@ -186,28 +172,58 @@ impl ContactSql { .first::(conn)?) } - pub fn delete(&self, conn: &SqliteConnection) -> Result<(), ContactsServiceStorageError> { - let num_deleted = - diesel::delete(contacts::table.filter(contacts::public_key.eq(&self.public_key))).execute(conn)?; + /// Find a particular Contact by their public key, and update it if it exists, returning the affected record + pub fn find_by_public_key_and_update( + conn: &SqliteConnection, + public_key: &[u8], + updated_contact: UpdateContact, + ) -> Result { + // Note: `get_result` not implemented for SQLite + diesel::update(contacts::table.filter(contacts::public_key.eq(public_key))) + .set(updated_contact) + .execute(conn) + .num_rows_affected_or_not_found(1)?; + ContactSql::find_by_public_key(public_key, conn) + } - if num_deleted == 0 { + /// Find a particular Contact by their public key, and delete it if it exists, returning the affected record + pub fn find_by_public_key_and_delete( + conn: &SqliteConnection, + public_key: &[u8], + ) -> Result { + // Note: `get_result` not implemented for SQLite + let contact = ContactSql::find_by_public_key(public_key, conn)?; + if diesel::delete(contacts::table.filter(contacts::public_key.eq(public_key))).execute(conn)? == 0 { return Err(ContactsServiceStorageError::ValuesNotFound); } - - Ok(()) + Ok(contact) } - pub fn update( - &self, - updated_contact: UpdateContact, + /// Find a particular Contact by their node ID, and update it if it exists, returning the affected record + pub fn find_by_node_id_and_update( conn: &SqliteConnection, + node_id: &[u8], + updated_contact: UpdateContact, ) -> Result { - diesel::update(contacts::table.filter(contacts::public_key.eq(&self.public_key))) + // Note: `get_result` not implemented for SQLite + diesel::update(contacts::table.filter(contacts::node_id.eq(node_id))) .set(updated_contact) .execute(conn) .num_rows_affected_or_not_found(1)?; + ContactSql::find_by_node_id(node_id, conn) + } - ContactSql::find_by_public_key(&self.public_key, conn) + /// Find a particular Contact by their node ID, and delete it if it exists, returning the affected record + pub fn find_by_node_id_and_delete( + conn: &SqliteConnection, + node_id: &[u8], + ) -> Result { + // Note: `get_result` not implemented for SQLite + let contact = ContactSql::find_by_node_id(node_id, conn)?; + if diesel::delete(contacts::table.filter(contacts::node_id.eq(node_id))).execute(conn)? == 0 { + return Err(ContactsServiceStorageError::ValuesNotFound); + } + Ok(contact) } } @@ -306,7 +322,7 @@ mod test { .unwrap() ); - ContactSql::from(contacts[0].clone()).delete(&conn).unwrap(); + ContactSql::find_by_public_key_and_delete(&conn, &contacts[0].public_key.clone().to_vec()).unwrap(); let retrieved_contacts = ContactSql::index(&conn).unwrap(); assert_eq!(retrieved_contacts.len(), 2); @@ -315,16 +331,13 @@ mod test { .iter() .any(|v| v == &ContactSql::from(contacts[0].clone()))); - let c = ContactSql::find_by_public_key(&contacts[1].public_key.to_vec(), &conn).unwrap(); - c.update( - UpdateContact { + let _c = + ContactSql::find_by_public_key_and_update(&conn, &contacts[1].public_key.to_vec(), UpdateContact { alias: Some("Fred".to_string()), last_seen: None, latency: None, - }, - &conn, - ) - .unwrap(); + }) + .unwrap(); let c_updated = ContactSql::find_by_public_key(&contacts[1].public_key.to_vec(), &conn).unwrap(); assert_eq!(c_updated.alias, "Fred".to_string()); diff --git a/base_layer/wallet/src/output_manager_service/storage/sqlite_db/mod.rs b/base_layer/wallet/src/output_manager_service/storage/sqlite_db/mod.rs index 80d9e289a3..ad13c578a5 100644 --- a/base_layer/wallet/src/output_manager_service/storage/sqlite_db/mod.rs +++ b/base_layer/wallet/src/output_manager_service/storage/sqlite_db/mod.rs @@ -28,7 +28,12 @@ use std::{ use chacha20poly1305::XChaCha20Poly1305; use chrono::NaiveDateTime; use derivative::Derivative; -use diesel::{prelude::*, result::Error as DieselError, SqliteConnection}; +use diesel::{ + prelude::*, + r2d2::{ConnectionManager, PooledConnection}, + result::Error as DieselError, + SqliteConnection, +}; use log::*; pub use new_output_sql::NewOutputSql; pub use output_sql::OutputSql; @@ -662,28 +667,34 @@ impl OutputManagerBackend for OutputManagerSqliteDatabase { let conn = self.database_connection.get_pooled_connection()?; let acquire_lock = start.elapsed(); - let mut outputs_to_be_spent = Vec::with_capacity(outputs_to_send.len()); - - for i in outputs_to_send { - let output = OutputSql::find_by_commitment_and_cancelled(i.commitment.as_bytes(), false, &conn)?; - if output.status != (OutputStatus::Unspent as i32) { - return Err(OutputManagerStorageError::OutputAlreadySpent); - } - if output.status == (OutputStatus::EncumberedToBeSpent as i32) { - return Err(OutputManagerStorageError::OutputAlreadyEncumbered); - } - outputs_to_be_spent.push(output); + let mut commitments = Vec::with_capacity(outputs_to_send.len()); + for output in outputs_to_send { + commitments.push(output.commitment.as_bytes()); } + // Any output in the list without the `Unspent` status will invalidate the encumberance + if !OutputSql::find_by_commitments_excluding_status(commitments.clone(), OutputStatus::Unspent, &conn)? + .is_empty() + { + return Err(OutputManagerStorageError::OutputAlreadySpent); + }; - for o in outputs_to_be_spent { - o.update( - UpdateOutput { - status: Some(OutputStatus::ShortTermEncumberedToBeSpent), - spent_in_tx_id: Some(Some(tx_id)), - ..Default::default() - }, - &conn, - )?; + let count = OutputSql::update_by_commitments( + commitments, + UpdateOutput { + status: Some(OutputStatus::ShortTermEncumberedToBeSpent), + spent_in_tx_id: Some(Some(tx_id)), + ..Default::default() + }, + &conn, + )?; + if count != outputs_to_send.len() { + let msg = format!( + "Inconsistent short term encumbering! Lengths do not match - {} vs {}", + count, + outputs_to_send.len() + ); + error!(target: LOG_TARGET, "{}", msg,); + return Err(OutputManagerStorageError::UnexpectedResult(msg)); } for co in outputs_to_receive { @@ -715,29 +726,20 @@ impl OutputManagerBackend for OutputManagerSqliteDatabase { let conn = self.database_connection.get_pooled_connection()?; let acquire_lock = start.elapsed(); - let outputs_to_be_received = - OutputSql::find_by_tx_id_and_status(tx_id, OutputStatus::ShortTermEncumberedToBeReceived, &conn)?; - for o in &outputs_to_be_received { - o.update( - UpdateOutput { - status: Some(OutputStatus::EncumberedToBeReceived), - ..Default::default() - }, - &conn, - )?; - } + update_outputs_with_tx_id_and_status_to_new_status( + &conn, + tx_id, + OutputStatus::ShortTermEncumberedToBeReceived, + OutputStatus::EncumberedToBeReceived, + )?; + + update_outputs_with_tx_id_and_status_to_new_status( + &conn, + tx_id, + OutputStatus::ShortTermEncumberedToBeSpent, + OutputStatus::EncumberedToBeSpent, + )?; - let outputs_to_be_spent = - OutputSql::find_by_tx_id_and_status(tx_id, OutputStatus::ShortTermEncumberedToBeSpent, &conn)?; - for o in &outputs_to_be_spent { - o.update( - UpdateOutput { - status: Some(OutputStatus::EncumberedToBeSpent), - ..Default::default() - }, - &conn, - )?; - } if start.elapsed().as_millis() > 0 { trace!( target: LOG_TARGET, @@ -757,27 +759,14 @@ impl OutputManagerBackend for OutputManagerSqliteDatabase { let conn = self.database_connection.get_pooled_connection()?; let acquire_lock = start.elapsed(); - let outputs_to_be_received = OutputSql::index_status(OutputStatus::ShortTermEncumberedToBeReceived, &conn)?; - for o in &outputs_to_be_received { - o.update( - UpdateOutput { - status: Some(OutputStatus::CancelledInbound), - ..Default::default() - }, - &conn, - )?; - } + diesel::update(outputs::table.filter(outputs::status.eq(OutputStatus::ShortTermEncumberedToBeReceived as i32))) + .set((outputs::status.eq(OutputStatus::CancelledInbound as i32),)) + .execute(&conn)?; + + diesel::update(outputs::table.filter(outputs::status.eq(OutputStatus::ShortTermEncumberedToBeSpent as i32))) + .set((outputs::status.eq(OutputStatus::Unspent as i32),)) + .execute(&conn)?; - let outputs_to_be_spent = OutputSql::index_status(OutputStatus::ShortTermEncumberedToBeSpent, &conn)?; - for o in &outputs_to_be_spent { - o.update( - UpdateOutput { - status: Some(OutputStatus::Unspent), - ..Default::default() - }, - &conn, - )?; - } if start.elapsed().as_millis() > 0 { trace!( target: LOG_TARGET, @@ -922,12 +911,16 @@ impl OutputManagerBackend for OutputManagerSqliteDatabase { Ok(()) } + // This is typically used by a receiver after the finalized transaction has been broadcast/returned by the sender + // as the sender has to finalize the signature that was partially constructed by the receiver fn update_output_metadata_signature(&self, output: &TransactionOutput) -> Result<(), OutputManagerStorageError> { let start = Instant::now(); let conn = self.database_connection.get_pooled_connection()?; let acquire_lock = start.elapsed(); let db_output = OutputSql::find_by_commitment_and_cancelled(&output.commitment.to_vec(), false, &conn)?; db_output.update( + // Note: Only the `nonce` and `u` portion needs to be updated at this time as the `v` portion is already + // correct UpdateOutput { metadata_signature_nonce: Some(output.metadata_signature.public_nonce().to_vec()), metadata_signature_u_key: Some(output.metadata_signature.u().to_vec()), @@ -1095,16 +1088,12 @@ impl OutputManagerBackend for OutputManagerSqliteDatabase { .execute(&conn) .num_rows_affected_or_not_found(1)?; } else { - let output = OutputSql::find_by_tx_id_and_status(tx_id, OutputStatus::AbandonedCoinbase, &conn)?; - for o in output { - o.update( - UpdateOutput { - status: Some(OutputStatus::EncumberedToBeReceived), - ..Default::default() - }, - &conn, - )?; - } + update_outputs_with_tx_id_and_status_to_new_status( + &conn, + tx_id, + OutputStatus::AbandonedCoinbase, + OutputStatus::EncumberedToBeReceived, + )?; }; if start.elapsed().as_millis() > 0 { trace!( @@ -1123,17 +1112,14 @@ impl OutputManagerBackend for OutputManagerSqliteDatabase { let start = Instant::now(); let conn = self.database_connection.get_pooled_connection()?; let acquire_lock = start.elapsed(); - let outputs = OutputSql::find_by_tx_id_and_status(tx_id, OutputStatus::CancelledInbound, &conn)?; - for o in outputs { - o.update( - UpdateOutput { - status: Some(OutputStatus::EncumberedToBeReceived), - ..Default::default() - }, - &conn, - )?; - } + update_outputs_with_tx_id_and_status_to_new_status( + &conn, + tx_id, + OutputStatus::CancelledInbound, + OutputStatus::EncumberedToBeReceived, + )?; + if start.elapsed().as_millis() > 0 { trace!( target: LOG_TARGET, @@ -1233,6 +1219,26 @@ impl OutputManagerBackend for OutputManagerSqliteDatabase { } } +fn update_outputs_with_tx_id_and_status_to_new_status( + conn: &PooledConnection>, + tx_id: TxId, + from_status: OutputStatus, + to_status: OutputStatus, +) -> Result<(), OutputManagerStorageError> { + diesel::update( + outputs::table + .filter( + outputs::received_in_tx_id + .eq(Some(tx_id.as_u64() as i64)) + .or(outputs::spent_in_tx_id.eq(Some(tx_id.as_u64() as i64))), + ) + .filter(outputs::status.eq(from_status as i32)), + ) + .set(outputs::status.eq(to_status as i32)) + .execute(conn)?; + Ok(()) +} + /// These are the fields that can be updated for an Output #[derive(Default)] pub struct UpdateOutput { diff --git a/base_layer/wallet/src/output_manager_service/storage/sqlite_db/output_sql.rs b/base_layer/wallet/src/output_manager_service/storage/sqlite_db/output_sql.rs index 1157e7b173..0357c70fa4 100644 --- a/base_layer/wallet/src/output_manager_service/storage/sqlite_db/output_sql.rs +++ b/base_layer/wallet/src/output_manager_service/storage/sqlite_db/output_sql.rs @@ -483,6 +483,29 @@ impl OutputSql { .first::(conn)?) } + pub fn find_by_commitments_excluding_status( + commitments: Vec<&[u8]>, + status: OutputStatus, + conn: &SqliteConnection, + ) -> Result, OutputManagerStorageError> { + Ok(outputs::table + .filter(outputs::commitment.eq_any(commitments)) + .filter(outputs::status.ne(status as i32)) + .load(conn)?) + } + + pub fn update_by_commitments( + commitments: Vec<&[u8]>, + updated_output: UpdateOutput, + conn: &SqliteConnection, + ) -> Result { + Ok( + diesel::update(outputs::table.filter(outputs::commitment.eq_any(commitments))) + .set(UpdateOutputSql::from(updated_output)) + .execute(conn)?, + ) + } + pub fn find_by_commitment_and_cancelled( commitment: &[u8], cancelled: bool, diff --git a/base_layer/wallet/src/transaction_service/protocols/transaction_receive_protocol.rs b/base_layer/wallet/src/transaction_service/protocols/transaction_receive_protocol.rs index 4eb3559efe..ff1d55ef7f 100644 --- a/base_layer/wallet/src/transaction_service/protocols/transaction_receive_protocol.rs +++ b/base_layer/wallet/src/transaction_service/protocols/transaction_receive_protocol.rs @@ -390,7 +390,7 @@ where let finalized_outputs = finalized_transaction.body.outputs(); - // Update output metadata signature if not valid + // Update output metadata signature if not valid (typically the receiver after the sender finalized) match finalized_outputs .iter() .find(|output| output.hash() == rtp_output.hash()) diff --git a/common_sqlite/src/connection_options.rs b/common_sqlite/src/connection_options.rs index fdac3e8213..5fdbb3f98b 100644 --- a/common_sqlite/src/connection_options.rs +++ b/common_sqlite/src/connection_options.rs @@ -48,15 +48,15 @@ impl ConnectionOptions { impl diesel::r2d2::CustomizeConnection for ConnectionOptions { fn on_acquire(&self, conn: &mut SqliteConnection) -> Result<(), diesel::r2d2::Error> { (|| { + if let Some(d) = self.busy_timeout { + conn.batch_execute(&format!("PRAGMA busy_timeout = {};", d.as_millis()))?; + } if self.enable_wal { conn.batch_execute("PRAGMA journal_mode = WAL; PRAGMA synchronous = NORMAL;")?; } if self.enable_foreign_keys { conn.batch_execute("PRAGMA foreign_keys = ON;")?; } - if let Some(d) = self.busy_timeout { - conn.batch_execute(&format!("PRAGMA busy_timeout = {};", d.as_millis()))?; - } Ok(()) })() .map_err(diesel::r2d2::Error::QueryError)