Skip to content

Commit

Permalink
feat: implement dht pooled db connection (#3596)
Browse files Browse the repository at this point in the history
Description
---
Implemented the pooled SQLite db connection for the DHT storage in favour of the Rust based async read-write lock, which makes use of the much faster SQLite3 Write-ahead Log (WAL) mode.

Motivation and Context
---
Recent changes to the common implementation for SQLite pooled connections managed in the background via Diesel and SQLite3 enabled this improvement.

How Has This Been Tested?
---
- Unit tests
- Cucumber tests
- System-level tests [_**Edit:** Running a twice more intensive stress test than before (2x sender wallets with 3x 1000 transactions on one computer to 3x receiver wallets on another computer vs. 1x sender wallet with 3x 1000 transactions to 3x receiver wallets) the disk resource utilization was 90+% down from previous levels - ~100% disk utilization on both systems during all phases of the transactions life cycle down to <~5% with occasional spikes up to ~80%. All transactions reached mined confirmed in all 5 participating wallets.)_
  • Loading branch information
hansieodendaal authored Nov 24, 2021
1 parent 970f811 commit 2ac0757
Show file tree
Hide file tree
Showing 18 changed files with 336 additions and 432 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions base_layer/core/tests/base_node_rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
use std::{convert::TryFrom, sync::Arc, time::Duration};

use randomx_rs::RandomXFlag;
use tari_crypto::tari_utilities::epoch_time::EpochTime;
use tempfile::{tempdir, TempDir};

use tari_common::configuration::Network;
Expand Down Expand Up @@ -303,8 +304,7 @@ async fn test_get_height_at_time() {
let (_, service, base_node, request_mock, consensus_manager, block0, _utxo0, _temp_dir) = setup().await;

let mut prev_block = block0.clone();
let mut times = Vec::new();
times.push(prev_block.header().timestamp);
let mut times: Vec<EpochTime> = vec![prev_block.header().timestamp];
for _ in 0..10 {
tokio::time::sleep(Duration::from_secs(2)).await;
let new_block = base_node
Expand Down
4 changes: 2 additions & 2 deletions base_layer/key_manager/src/mnemonic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@ mod test {
"abandon".to_string(),
"tipico".to_string(),
];
assert_eq!(MnemonicLanguage::detect_language(&words2).is_err(), true);
assert!(MnemonicLanguage::detect_language(&words2).is_err());

// bounds check (last word is invalid)
let words3 = vec![
Expand All @@ -360,7 +360,7 @@ mod test {
"abandon".to_string(),
"topazio".to_string(),
];
assert_eq!(MnemonicLanguage::detect_language(&words3).is_err(), true);
assert!(MnemonicLanguage::detect_language(&words3).is_err());

// building up a word list: English/French + French -> French
let mut words = Vec::with_capacity(3);
Expand Down
4 changes: 2 additions & 2 deletions base_layer/wallet/src/contacts_service/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use diesel::result::Error as DieselError;
use tari_service_framework::reply_channel::TransportChannelError;
use thiserror::Error;

#[derive(Debug, Error, PartialEq)]
#[derive(Debug, Error)]
#[allow(clippy::large_enum_variant)]
pub enum ContactsServiceError {
#[error("Contact is not found")]
Expand All @@ -38,7 +38,7 @@ pub enum ContactsServiceError {
TransportChannelError(#[from] TransportChannelError),
}

#[derive(Debug, Error, PartialEq)]
#[derive(Debug, Error)]
pub enum ContactsServiceStorageError {
#[error("This write operation is not supported for provided DbKey")]
OperationNotSupported,
Expand Down
6 changes: 0 additions & 6 deletions base_layer/wallet/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,9 +170,3 @@ impl From<WalletStorageError> for ExitCodes {
}
}
}

impl PartialEq for WalletStorageError {
fn eq(&self, other: &Self) -> bool {
self == other
}
}
2 changes: 1 addition & 1 deletion base_layer/wallet/src/output_manager_service/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ pub enum OutputManagerError {
InvalidMessageError(String),
}

#[derive(Debug, Error, PartialEq)]
#[derive(Debug, Error)]
pub enum OutputManagerStorageError {
#[error("Tried to insert an output that already exists in the database")]
DuplicateOutput,
Expand Down
27 changes: 15 additions & 12 deletions base_layer/wallet/tests/contacts_service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,18 +87,21 @@ pub fn test_contacts_service() {
let (_secret_key, public_key) = PublicKey::random_keypair(&mut OsRng);

let contact = runtime.block_on(contacts_service.get_contact(public_key.clone()));
assert_eq!(
contact,
Err(ContactsServiceError::ContactsServiceStorageError(
ContactsServiceStorageError::ValueNotFound(DbKey::Contact(public_key.clone()))
))
);
assert_eq!(
runtime.block_on(contacts_service.remove_contact(public_key.clone())),
Err(ContactsServiceError::ContactsServiceStorageError(
ContactsServiceStorageError::ValueNotFound(DbKey::Contact(public_key))
))
);
match contact {
Ok(_) => panic!("There should be an error here"),
Err(ContactsServiceError::ContactsServiceStorageError(ContactsServiceStorageError::ValueNotFound(val))) => {
assert_eq!(val, DbKey::Contact(public_key.clone()))
},
_ => panic!("There should be a specific error here"),
}
let result = runtime.block_on(contacts_service.remove_contact(public_key.clone()));
match result {
Ok(_) => panic!("There should be an error here"),
Err(ContactsServiceError::ContactsServiceStorageError(ContactsServiceStorageError::ValueNotFound(val))) => {
assert_eq!(val, DbKey::Contact(public_key))
},
_ => panic!("There should be a specific error here"),
}

let _ = runtime
.block_on(contacts_service.remove_contact(contacts[0].public_key.clone()))
Expand Down
1 change: 0 additions & 1 deletion base_layer/wallet/tests/wallet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ use tari_wallet::{
handle::TransactionEvent,
storage::sqlite_db::TransactionServiceSqliteDatabase,
},
utxo_scanner_service::utxo_scanning::UtxoScannerService,
Wallet,
WalletConfig,
WalletSqlite,
Expand Down
6 changes: 0 additions & 6 deletions common_sqlite/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,3 @@ pub enum SqliteStorageError {
#[error("Diesel R2d2 error")]
DieselR2d2Error(String),
}

impl PartialEq for SqliteStorageError {
fn eq(&self, other: &Self) -> bool {
self == other
}
}
1 change: 1 addition & 0 deletions comms/dht/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ tari_crypto = { git = "https://github.com/tari-project/tari-crypto.git", branch
tari_utilities = { version = "^0.3" }
tari_shutdown = { version = "^0.21", path = "../../infrastructure/shutdown" }
tari_storage = { version = "^0.21", path = "../../infrastructure/storage" }
tari_common_sqlite = { path = "../../common_sqlite" }

anyhow = "1.0.32"
bitflags = "1.2.0"
Expand Down
26 changes: 11 additions & 15 deletions comms/dht/src/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,6 @@ impl DhtActor {
let offline_ts = self
.database
.get_metadata_value::<DateTime<Utc>>(DhtMetadataKey::OfflineTimestamp)
.await
.ok()
.flatten();
info!(
Expand Down Expand Up @@ -284,25 +283,24 @@ impl DhtActor {
},

_ = dedup_cache_trim_ticker.tick() => {
if let Err(err) = self.msg_hash_dedup_cache.trim_entries().await {
if let Err(err) = self.msg_hash_dedup_cache.trim_entries() {
error!(target: LOG_TARGET, "Error when trimming message dedup cache: {:?}", err);
}
},

_ = self.shutdown_signal.wait() => {
info!(target: LOG_TARGET, "DhtActor is shutting down because it received a shutdown signal.");
self.mark_shutdown_time().await;
self.mark_shutdown_time();
break Ok(());
},
}
}
}

async fn mark_shutdown_time(&self) {
fn mark_shutdown_time(&self) {
if let Err(err) = self
.database
.set_metadata_value(DhtMetadataKey::OfflineTimestamp, Utc::now())
.await
{
warn!(target: LOG_TARGET, "Failed to mark offline time: {:?}", err);
}
Expand All @@ -323,7 +321,7 @@ impl DhtActor {
} => {
let msg_hash_cache = self.msg_hash_dedup_cache.clone();
Box::pin(async move {
match msg_hash_cache.add_body_hash(message_hash, received_from).await {
match msg_hash_cache.add_body_hash(message_hash, received_from) {
Ok(hit_count) => {
let _ = reply_tx.send(hit_count);
},
Expand All @@ -341,7 +339,7 @@ impl DhtActor {
GetMsgHashHitCount(hash, reply_tx) => {
let msg_hash_cache = self.msg_hash_dedup_cache.clone();
Box::pin(async move {
let hit_count = msg_hash_cache.get_hit_count(hash).await?;
let hit_count = msg_hash_cache.get_hit_count(hash)?;
let _ = reply_tx.send(hit_count);
Ok(())
})
Expand All @@ -366,14 +364,14 @@ impl DhtActor {
GetMetadata(key, reply_tx) => {
let db = self.database.clone();
Box::pin(async move {
let _ = reply_tx.send(db.get_metadata_value_bytes(key).await.map_err(Into::into));
let _ = reply_tx.send(db.get_metadata_value_bytes(key).map_err(Into::into));
Ok(())
})
},
SetMetadata(key, value, reply_tx) => {
let db = self.database.clone();
Box::pin(async move {
match db.set_metadata_value_bytes(key, value).await {
match db.set_metadata_value_bytes(key, value) {
Ok(_) => {
debug!(target: LOG_TARGET, "Dht metadata '{}' set", key);
let _ = reply_tx.send(Ok(()));
Expand Down Expand Up @@ -727,8 +725,8 @@ mod test {
use tari_test_utils::random;

async fn db_connection() -> DbConnection {
let conn = DbConnection::connect_memory(random::string(8)).await.unwrap();
conn.migrate().await.unwrap();
let conn = DbConnection::connect_memory(random::string(8)).unwrap();
conn.migrate().unwrap();
conn
}

Expand Down Expand Up @@ -838,7 +836,6 @@ mod test {
let num_hits = actor
.msg_hash_dedup_cache
.add_body_hash(key.clone(), CommsPublicKey::default())
.await
.unwrap();
assert_eq!(num_hits, 1);
}
Expand All @@ -847,15 +844,14 @@ mod test {
let num_hits = actor
.msg_hash_dedup_cache
.add_body_hash(key.clone(), CommsPublicKey::default())
.await
.unwrap();
assert_eq!(num_hits, 2);
}

let dedup_cache_db = actor.msg_hash_dedup_cache.clone();
// The cleanup ticker starts when the actor is spawned; the first cleanup event will fire fairly soon after the
// task is running on a thread. To remove this race condition, we trim the cache in the test.
let num_trimmed = dedup_cache_db.trim_entries().await.unwrap();
let num_trimmed = dedup_cache_db.trim_entries().unwrap();
assert_eq!(num_trimmed, 10);
actor.spawn();

Expand All @@ -877,7 +873,7 @@ mod test {
}

// Trim the database of excess entries
dedup_cache_db.trim_entries().await.unwrap();
dedup_cache_db.trim_entries().unwrap();

// Verify that the last half of the signatures have been removed and can be re-inserted into cache
for key in signatures.iter().take(capacity * 2).skip(capacity) {
Expand Down
Loading

0 comments on commit 2ac0757

Please sign in to comment.