Skip to content

Commit

Permalink
Optimize output manager db operations
Browse files Browse the repository at this point in the history
Optimized output manager database operations to do as much work
as possible inside the SQL database instead of inside Rust code.
  • Loading branch information
hansieodendaal committed Sep 19, 2022
1 parent 92ceef9 commit c53a61f
Show file tree
Hide file tree
Showing 5 changed files with 163 additions and 120 deletions.
119 changes: 66 additions & 53 deletions base_layer/wallet/src/contacts_service/storage/sqlite_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,57 +80,43 @@ impl ContactsBackend for ContactsServiceSqliteDatabase {

match op {
WriteOperation::Upsert(kvp) => match *kvp {
DbKeyValuePair::Contact(k, c) => match ContactSql::find_by_public_key(&k.to_vec(), &conn) {
Ok(found_c) => {
let _contact_sql = found_c.update(
UpdateContact {
alias: Some(c.alias),
last_seen: None,
latency: None,
},
&conn,
)?;
},
Err(_) => {
DbKeyValuePair::Contact(k, c) => {
if ContactSql::find_by_public_key_and_update(&conn, &k.to_vec(), UpdateContact {
alias: Some(c.clone().alias),
last_seen: None,
latency: None,
})
.is_err()
{
ContactSql::from(c).commit(&conn)?;
},
}
},
DbKeyValuePair::LastSeen(..) => return Err(ContactsServiceStorageError::OperationNotSupported),
},
WriteOperation::UpdateLastSeen(kvp) => match *kvp {
DbKeyValuePair::LastSeen(node_id, date_time, latency) => {
match ContactSql::find_by_node_id(&node_id.to_vec(), &conn) {
Ok(found_c) => {
let contact = found_c.update(
UpdateContact {
alias: None,
last_seen: Some(Some(date_time)),
latency: Some(latency),
},
&conn,
)?;
return Ok(Some(DbValue::PublicKey(Box::new(
PublicKey::from_vec(&contact.public_key)
.map_err(|_| ContactsServiceStorageError::ConversionError)?,
))));
},
Err(e) => return Err(e),
}
let contact = ContactSql::find_by_node_id_and_update(&conn, &node_id.to_vec(), UpdateContact {
alias: None,
last_seen: Some(Some(date_time)),
latency: Some(latency),
})?;
return Ok(Some(DbValue::PublicKey(Box::new(
PublicKey::from_vec(&contact.public_key)
.map_err(|_| ContactsServiceStorageError::ConversionError)?,
))));
},
DbKeyValuePair::Contact(..) => return Err(ContactsServiceStorageError::OperationNotSupported),
},
WriteOperation::Remove(k) => match k {
DbKey::Contact(k) => match ContactSql::find_by_public_key(&k.to_vec(), &conn) {
DbKey::Contact(k) => match ContactSql::find_by_public_key_and_delete(&conn, &k.to_vec()) {
Ok(c) => {
c.delete(&conn)?;
return Ok(Some(DbValue::Contact(Box::new(Contact::try_from(c)?))));
},
Err(ContactsServiceStorageError::DieselError(DieselError::NotFound)) => (),
Err(e) => return Err(e),
},
DbKey::ContactId(id) => match ContactSql::find_by_node_id(&id.to_vec(), &conn) {
DbKey::ContactId(id) => match ContactSql::find_by_node_id_and_delete(&conn, &id.to_vec()) {
Ok(c) => {
c.delete(&conn)?;
return Ok(Some(DbValue::Contact(Box::new(Contact::try_from(c)?))));
},
Err(ContactsServiceStorageError::DieselError(DieselError::NotFound)) => (),
Expand Down Expand Up @@ -186,28 +172,58 @@ impl ContactSql {
.first::<ContactSql>(conn)?)
}

pub fn delete(&self, conn: &SqliteConnection) -> Result<(), ContactsServiceStorageError> {
let num_deleted =
diesel::delete(contacts::table.filter(contacts::public_key.eq(&self.public_key))).execute(conn)?;
/// Find a particular Contact by their public key, and update it if it exists, returning the affected record
pub fn find_by_public_key_and_update(
conn: &SqliteConnection,
public_key: &[u8],
updated_contact: UpdateContact,
) -> Result<ContactSql, ContactsServiceStorageError> {
// Note: `get_result` not implemented for SQLite
diesel::update(contacts::table.filter(contacts::public_key.eq(public_key)))
.set(updated_contact)
.execute(conn)
.num_rows_affected_or_not_found(1)?;
ContactSql::find_by_public_key(public_key, conn)
}

if num_deleted == 0 {
/// Find a particular Contact by their public key, and delete it if it exists, returning the affected record
pub fn find_by_public_key_and_delete(
conn: &SqliteConnection,
public_key: &[u8],
) -> Result<ContactSql, ContactsServiceStorageError> {
// Note: `get_result` not implemented for SQLite
let contact = ContactSql::find_by_public_key(public_key, conn)?;
if diesel::delete(contacts::table.filter(contacts::public_key.eq(public_key))).execute(conn)? == 0 {
return Err(ContactsServiceStorageError::ValuesNotFound);
}

Ok(())
Ok(contact)
}

pub fn update(
&self,
updated_contact: UpdateContact,
/// Find a particular Contact by their node ID, and update it if it exists, returning the affected record
pub fn find_by_node_id_and_update(
conn: &SqliteConnection,
node_id: &[u8],
updated_contact: UpdateContact,
) -> Result<ContactSql, ContactsServiceStorageError> {
diesel::update(contacts::table.filter(contacts::public_key.eq(&self.public_key)))
// Note: `get_result` not implemented for SQLite
diesel::update(contacts::table.filter(contacts::node_id.eq(node_id)))
.set(updated_contact)
.execute(conn)
.num_rows_affected_or_not_found(1)?;
ContactSql::find_by_node_id(node_id, conn)
}

ContactSql::find_by_public_key(&self.public_key, conn)
/// Find a particular Contact by their node ID, and delete it if it exists, returning the affected record
pub fn find_by_node_id_and_delete(
conn: &SqliteConnection,
node_id: &[u8],
) -> Result<ContactSql, ContactsServiceStorageError> {
// Note: `get_result` not implemented for SQLite
let contact = ContactSql::find_by_node_id(node_id, conn)?;
if diesel::delete(contacts::table.filter(contacts::node_id.eq(node_id))).execute(conn)? == 0 {
return Err(ContactsServiceStorageError::ValuesNotFound);
}
Ok(contact)
}
}

Expand Down Expand Up @@ -306,7 +322,7 @@ mod test {
.unwrap()
);

ContactSql::from(contacts[0].clone()).delete(&conn).unwrap();
ContactSql::find_by_public_key_and_delete(&conn, &contacts[0].public_key.clone().to_vec()).unwrap();

let retrieved_contacts = ContactSql::index(&conn).unwrap();
assert_eq!(retrieved_contacts.len(), 2);
Expand All @@ -315,16 +331,13 @@ mod test {
.iter()
.any(|v| v == &ContactSql::from(contacts[0].clone())));

let c = ContactSql::find_by_public_key(&contacts[1].public_key.to_vec(), &conn).unwrap();
c.update(
UpdateContact {
let _c =
ContactSql::find_by_public_key_and_update(&conn, &contacts[1].public_key.to_vec(), UpdateContact {
alias: Some("Fred".to_string()),
last_seen: None,
latency: None,
},
&conn,
)
.unwrap();
})
.unwrap();

let c_updated = ContactSql::find_by_public_key(&contacts[1].public_key.to_vec(), &conn).unwrap();
assert_eq!(c_updated.alias, "Fred".to_string());
Expand Down
134 changes: 71 additions & 63 deletions base_layer/wallet/src/output_manager_service/storage/sqlite_db/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,12 @@ use std::{
use chacha20poly1305::XChaCha20Poly1305;
use chrono::NaiveDateTime;
use derivative::Derivative;
use diesel::{prelude::*, result::Error as DieselError, SqliteConnection};
use diesel::{
prelude::*,
r2d2::{ConnectionManager, PooledConnection},
result::Error as DieselError,
SqliteConnection,
};
use log::*;
pub use new_output_sql::NewOutputSql;
pub use output_sql::OutputSql;
Expand Down Expand Up @@ -664,6 +669,14 @@ impl OutputManagerBackend for OutputManagerSqliteDatabase {

let mut outputs_to_be_spent = Vec::with_capacity(outputs_to_send.len());

let mut commitments = Vec::new();
for output in outputs_to_send.clone() {
commitments.push(output.commitment.as_bytes());
}
if !OutputSql::find_by_commitments_and_exclude_status(commitments, OutputStatus::Unspent, &conn)?.is_empty() {
return Err(OutputManagerStorageError::OutputAlreadySpent);
};

for i in outputs_to_send {
let output = OutputSql::find_by_commitment_and_cancelled(i.commitment.as_bytes(), false, &conn)?;
if output.status != (OutputStatus::Unspent as i32) {
Expand Down Expand Up @@ -715,29 +728,20 @@ impl OutputManagerBackend for OutputManagerSqliteDatabase {
let conn = self.database_connection.get_pooled_connection()?;
let acquire_lock = start.elapsed();

let outputs_to_be_received =
OutputSql::find_by_tx_id_and_status(tx_id, OutputStatus::ShortTermEncumberedToBeReceived, &conn)?;
for o in &outputs_to_be_received {
o.update(
UpdateOutput {
status: Some(OutputStatus::EncumberedToBeReceived),
..Default::default()
},
&conn,
)?;
}
update_outputs_with_tx_id_and_status_to_new_status(
&conn,
tx_id,
OutputStatus::ShortTermEncumberedToBeReceived,
OutputStatus::EncumberedToBeReceived,
)?;

update_outputs_with_tx_id_and_status_to_new_status(
&conn,
tx_id,
OutputStatus::ShortTermEncumberedToBeSpent,
OutputStatus::EncumberedToBeSpent,
)?;

let outputs_to_be_spent =
OutputSql::find_by_tx_id_and_status(tx_id, OutputStatus::ShortTermEncumberedToBeSpent, &conn)?;
for o in &outputs_to_be_spent {
o.update(
UpdateOutput {
status: Some(OutputStatus::EncumberedToBeSpent),
..Default::default()
},
&conn,
)?;
}
if start.elapsed().as_millis() > 0 {
trace!(
target: LOG_TARGET,
Expand All @@ -757,27 +761,14 @@ impl OutputManagerBackend for OutputManagerSqliteDatabase {
let conn = self.database_connection.get_pooled_connection()?;
let acquire_lock = start.elapsed();

let outputs_to_be_received = OutputSql::index_status(OutputStatus::ShortTermEncumberedToBeReceived, &conn)?;
for o in &outputs_to_be_received {
o.update(
UpdateOutput {
status: Some(OutputStatus::CancelledInbound),
..Default::default()
},
&conn,
)?;
}
diesel::update(outputs::table.filter(outputs::status.eq(OutputStatus::ShortTermEncumberedToBeReceived as i32)))
.set((outputs::status.eq(OutputStatus::CancelledInbound as i32),))
.execute(&conn)?;

diesel::update(outputs::table.filter(outputs::status.eq(OutputStatus::ShortTermEncumberedToBeSpent as i32)))
.set((outputs::status.eq(OutputStatus::Unspent as i32),))
.execute(&conn)?;

let outputs_to_be_spent = OutputSql::index_status(OutputStatus::ShortTermEncumberedToBeSpent, &conn)?;
for o in &outputs_to_be_spent {
o.update(
UpdateOutput {
status: Some(OutputStatus::Unspent),
..Default::default()
},
&conn,
)?;
}
if start.elapsed().as_millis() > 0 {
trace!(
target: LOG_TARGET,
Expand Down Expand Up @@ -922,12 +913,16 @@ impl OutputManagerBackend for OutputManagerSqliteDatabase {
Ok(())
}

// This is typically used by a receiver after the finalized transaction has been broadcast/returned by the sender
// as the sender has to finalize the signature that was partially constructed by the receiver
fn update_output_metadata_signature(&self, output: &TransactionOutput) -> Result<(), OutputManagerStorageError> {
let start = Instant::now();
let conn = self.database_connection.get_pooled_connection()?;
let acquire_lock = start.elapsed();
let db_output = OutputSql::find_by_commitment_and_cancelled(&output.commitment.to_vec(), false, &conn)?;
db_output.update(
// Note: Only the `nonce` and `u` portion needs to be updated at this time as the `v` portion is already
// correct
UpdateOutput {
metadata_signature_nonce: Some(output.metadata_signature.public_nonce().to_vec()),
metadata_signature_u_key: Some(output.metadata_signature.u().to_vec()),
Expand Down Expand Up @@ -1095,16 +1090,12 @@ impl OutputManagerBackend for OutputManagerSqliteDatabase {
.execute(&conn)
.num_rows_affected_or_not_found(1)?;
} else {
let output = OutputSql::find_by_tx_id_and_status(tx_id, OutputStatus::AbandonedCoinbase, &conn)?;
for o in output {
o.update(
UpdateOutput {
status: Some(OutputStatus::EncumberedToBeReceived),
..Default::default()
},
&conn,
)?;
}
update_outputs_with_tx_id_and_status_to_new_status(
&conn,
tx_id,
OutputStatus::AbandonedCoinbase,
OutputStatus::EncumberedToBeReceived,
)?;
};
if start.elapsed().as_millis() > 0 {
trace!(
Expand All @@ -1123,17 +1114,14 @@ impl OutputManagerBackend for OutputManagerSqliteDatabase {
let start = Instant::now();
let conn = self.database_connection.get_pooled_connection()?;
let acquire_lock = start.elapsed();
let outputs = OutputSql::find_by_tx_id_and_status(tx_id, OutputStatus::CancelledInbound, &conn)?;

for o in outputs {
o.update(
UpdateOutput {
status: Some(OutputStatus::EncumberedToBeReceived),
..Default::default()
},
&conn,
)?;
}
update_outputs_with_tx_id_and_status_to_new_status(
&conn,
tx_id,
OutputStatus::CancelledInbound,
OutputStatus::EncumberedToBeReceived,
)?;

if start.elapsed().as_millis() > 0 {
trace!(
target: LOG_TARGET,
Expand Down Expand Up @@ -1233,6 +1221,26 @@ impl OutputManagerBackend for OutputManagerSqliteDatabase {
}
}

fn update_outputs_with_tx_id_and_status_to_new_status(
conn: &PooledConnection<ConnectionManager<SqliteConnection>>,
tx_id: TxId,
from_status: OutputStatus,
to_status: OutputStatus,
) -> Result<(), OutputManagerStorageError> {
diesel::update(
outputs::table
.filter(
outputs::received_in_tx_id
.eq(Some(tx_id.as_u64() as i64))
.or(outputs::spent_in_tx_id.eq(Some(tx_id.as_u64() as i64))),
)
.filter(outputs::status.eq(from_status as i32)),
)
.set(outputs::status.eq(to_status as i32))
.execute(conn)?;
Ok(())
}

/// These are the fields that can be updated for an Output
#[derive(Default)]
pub struct UpdateOutput {
Expand Down
Loading

0 comments on commit c53a61f

Please sign in to comment.