From 3129ce8dd066ea16900ee8add4e608c1890c6545 Mon Sep 17 00:00:00 2001 From: Hansie Odendaal <39146854+hansieodendaal@users.noreply.github.com> Date: Thu, 7 Dec 2023 16:54:37 +0200 Subject: [PATCH] feat: add wallet ffi shutdown tests (#6007) Description --- - Added wallet FFI shutdown tests to prove that all services have been terminated when the FFI `wallet_destroy` command has been run. - Added additional safeguards in the FFI `wallet_destroy` method to ensure the wallet has shut down. Motivation and Context --- See #5984 How Has This Been Tested? --- Added `pub fn test_wallet_shutdown()` in the wallet FFI library. What process can a PR reviewer use to test or verify this change? --- - Review test code - Run the test Breaking Changes --- - [x] None - [ ] Requires data directory on base node to be deleted - [ ] Requires hard fork - [ ] Other - Please specify --- .../contacts/src/contacts_service/service.rs | 2 +- base_layer/wallet_ffi/Cargo.toml | 1 + base_layer/wallet_ffi/src/lib.rs | 308 ++++++++++++++++++ 3 files changed, 310 insertions(+), 1 deletion(-) diff --git a/base_layer/contacts/src/contacts_service/service.rs b/base_layer/contacts/src/contacts_service/service.rs index eed8d93cfe..955990abb1 100644 --- a/base_layer/contacts/src/contacts_service/service.rs +++ b/base_layer/contacts/src/contacts_service/service.rs @@ -199,7 +199,7 @@ where T: ContactsBackend + 'static let shutdown = self .shutdown_signal .take() - .expect("Output Manager Service initialized without shutdown signal"); + .expect("Contacts Service initialized without shutdown signal"); pin_mut!(shutdown); // Add all contacts as monitored peers to the liveness service diff --git a/base_layer/wallet_ffi/Cargo.toml b/base_layer/wallet_ffi/Cargo.toml index 325a0e3496..906a342424 100644 --- a/base_layer/wallet_ffi/Cargo.toml +++ b/base_layer/wallet_ffi/Cargo.toml @@ -48,6 +48,7 @@ tari_key_manager = { path = "../key_manager" } tari_common_types = { path = "../../base_layer/common_types"} tari_test_utils = { path = "../../infrastructure/test_utils"} tari_service_framework = { path = "../../base_layer/service_framework" } +tari_core = { path = "../../base_layer/core", default-features = false, features = ["base_node"] } borsh = "1.2" [build-dependencies] diff --git a/base_layer/wallet_ffi/src/lib.rs b/base_layer/wallet_ffi/src/lib.rs index 8b3dc14e86..ae525c3ef8 100644 --- a/base_layer/wallet_ffi/src/lib.rs +++ b/base_layer/wallet_ffi/src/lib.rs @@ -8249,8 +8249,22 @@ pub unsafe extern "C" fn emoji_set_destroy(emoji_set: *mut EmojiSet) { pub unsafe extern "C" fn wallet_destroy(wallet: *mut TariWallet) { if !wallet.is_null() { let mut w = Box::from_raw(wallet); + let wallet_comms = w.wallet.comms.clone(); w.shutdown.trigger(); w.runtime.block_on(w.wallet.wait_until_shutdown()); + // The wallet should be shutdown by now; these are just additional confirmations + loop { + if w.shutdown.is_triggered() && + wallet_comms.shutdown_signal().is_triggered() && + w.runtime + .block_on(wallet_comms.connectivity().get_connectivity_status()) + .is_err() + { + break; + }; + w.runtime + .block_on(async { tokio::time::sleep(Duration::from_millis(250)).await }); + } } } @@ -8575,6 +8589,7 @@ mod test { use once_cell::sync::Lazy; use tari_common_types::{emoji, transaction::TransactionStatus, types::PrivateKey}; use tari_comms::peer_manager::PeerFeatures; + use tari_contacts::contacts_service::types::{Direction, Message, MessageMetadata}; use tari_core::{ covenant, transactions::{ @@ -8583,6 +8598,7 @@ mod test { }, }; use tari_key_manager::{mnemonic::MnemonicLanguage, mnemonic_wordlists}; + use tari_p2p::initialization::MESSAGING_PROTOCOL_ID; use tari_script::script; use tari_test_utils::random; use tempfile::tempdir; @@ -11046,4 +11062,296 @@ mod test { let _spending_key = Box::from_raw(spending_key_ptr); } } + + #[test] + #[allow(clippy::too_many_lines)] + pub fn test_wallet_shutdown() { + unsafe { + let mut error = 0; + let error_ptr = &mut error as *mut c_int; + let mut recovery_in_progress = false; + let recovery_in_progress_ptr = &mut recovery_in_progress as *mut bool; + + // Create a new wallet for Alice + let db_name = CString::new(random::string(8).as_str()).unwrap(); + let alice_db_name_str: *const c_char = CString::into_raw(db_name) as *const c_char; + let temp_dir = tempdir().unwrap(); + let db_path = CString::new(temp_dir.path().to_str().unwrap()).unwrap(); + let alice_db_path_str: *const c_char = CString::into_raw(db_path) as *const c_char; + let alice_transport_type = transport_memory_create(); + let address = transport_memory_get_address(alice_transport_type, error_ptr); + let address_str = CStr::from_ptr(address).to_str().unwrap().to_owned(); + let alice_address_str = CString::new(address_str).unwrap().into_raw() as *const c_char; + let network = CString::new(NETWORK_STRING).unwrap(); + let alice_network_str: *const c_char = CString::into_raw(network) as *const c_char; + + let alice_config = comms_config_create( + alice_address_str, + alice_transport_type, + alice_db_name_str, + alice_db_path_str, + 20, + 10800, + error_ptr, + ); + let passphrase: *const c_char = CString::into_raw(CString::new("niao").unwrap()) as *const c_char; + let alice_wallet_ptr = wallet_create( + alice_config, + ptr::null(), + 0, + 0, + passphrase, + ptr::null(), + alice_network_str, + received_tx_callback, + received_tx_reply_callback, + received_tx_finalized_callback, + broadcast_callback, + mined_callback, + mined_unconfirmed_callback, + scanned_callback, + scanned_unconfirmed_callback, + transaction_send_result_callback, + tx_cancellation_callback, + txo_validation_complete_callback, + contacts_liveness_data_updated_callback, + balance_updated_callback, + transaction_validation_complete_callback, + saf_messages_received_callback, + connectivity_status_callback, + base_node_state_callback, + recovery_in_progress_ptr, + error_ptr, + ); + assert_eq!(error, 0); + string_destroy(alice_network_str as *mut c_char); + string_destroy(alice_db_name_str as *mut c_char); + string_destroy(alice_db_path_str as *mut c_char); + string_destroy(alice_address_str as *mut c_char); + transport_config_destroy(alice_transport_type); + comms_config_destroy(alice_config); + + // Create a new wallet for bob + let db_name = CString::new(random::string(8).as_str()).unwrap(); + let bob_db_name_str: *const c_char = CString::into_raw(db_name) as *const c_char; + let temp_dir = tempdir().unwrap(); + let db_path = CString::new(temp_dir.path().to_str().unwrap()).unwrap(); + let bob_db_path_str: *const c_char = CString::into_raw(db_path) as *const c_char; + let bob_transport_type = transport_memory_create(); + let address = transport_memory_get_address(bob_transport_type, error_ptr); + let address_str = CStr::from_ptr(address).to_str().unwrap().to_owned(); + let bob_address_str = CString::new(address_str).unwrap().into_raw() as *const c_char; + let network = CString::new(NETWORK_STRING).unwrap(); + let bob_network_str: *const c_char = CString::into_raw(network) as *const c_char; + + let bob_config = comms_config_create( + bob_address_str, + bob_transport_type, + bob_db_name_str, + bob_db_path_str, + 20, + 10800, + error_ptr, + ); + let passphrase: *const c_char = CString::into_raw(CString::new("niao").unwrap()) as *const c_char; + let bob_wallet_ptr = wallet_create( + bob_config, + ptr::null(), + 0, + 0, + passphrase, + ptr::null(), + bob_network_str, + received_tx_callback, + received_tx_reply_callback, + received_tx_finalized_callback, + broadcast_callback, + mined_callback, + mined_unconfirmed_callback, + scanned_callback, + scanned_unconfirmed_callback, + transaction_send_result_callback, + tx_cancellation_callback, + txo_validation_complete_callback, + contacts_liveness_data_updated_callback, + balance_updated_callback, + transaction_validation_complete_callback, + saf_messages_received_callback, + connectivity_status_callback, + base_node_state_callback, + recovery_in_progress_ptr, + error_ptr, + ); + assert_eq!(error, 0); + string_destroy(bob_network_str as *mut c_char); + string_destroy(bob_db_name_str as *mut c_char); + string_destroy(bob_db_path_str as *mut c_char); + string_destroy(bob_address_str as *mut c_char); + transport_config_destroy(bob_transport_type); + comms_config_destroy(bob_config); + + // Add some peers + // - Wallet peer for Alice (add Bob as a base node peer; not how it will be done in production but good + // enough for the test as we just need to make sure the wallet can connect to a peer) + let bob_wallet_comms = (*bob_wallet_ptr).wallet.comms.clone(); + let bob_node_identity = bob_wallet_comms.node_identity(); + let bob_peer_public_key_ptr = Box::into_raw(Box::new(bob_node_identity.public_key().clone())); + let bob_peer_address_ptr = + CString::into_raw(CString::new(bob_node_identity.first_public_address().unwrap().to_string()).unwrap()) + as *const c_char; + wallet_add_base_node_peer( + alice_wallet_ptr, + bob_peer_public_key_ptr, + bob_peer_address_ptr, + error_ptr, + ); + string_destroy(bob_peer_address_ptr as *mut c_char); + let _destroyed = Box::from_raw(bob_peer_public_key_ptr); + // - Wallet peer for Bob (add Alice as a base node peer; same as above) + let alice_wallet_comms = (*alice_wallet_ptr).wallet.comms.clone(); + let alice_node_identity = alice_wallet_comms.node_identity(); + let alice_peer_public_key_ptr = Box::into_raw(Box::new(alice_node_identity.public_key().clone())); + let alice_peer_address_ptr = CString::into_raw( + CString::new(alice_node_identity.first_public_address().unwrap().to_string()).unwrap(), + ) as *const c_char; + wallet_add_base_node_peer( + bob_wallet_ptr, + alice_peer_public_key_ptr, + alice_peer_address_ptr, + error_ptr, + ); + string_destroy(alice_peer_address_ptr as *mut c_char); + let _destroyed = Box::from_raw(alice_peer_public_key_ptr); + + // Add some contacts + // - Contact for Alice + let bob_wallet_address = TariWalletAddress::new(bob_node_identity.public_key().clone(), Network::LocalNet); + let alice_contact_alias_ptr: *const c_char = + CString::into_raw(CString::new("bob").unwrap()) as *const c_char; + let alice_contact_address_ptr = Box::into_raw(Box::new(bob_wallet_address.clone())); + let alice_contact_ptr = contact_create(alice_contact_alias_ptr, alice_contact_address_ptr, true, error_ptr); + tari_address_destroy(alice_contact_address_ptr); + assert!(wallet_upsert_contact(alice_wallet_ptr, alice_contact_ptr, error_ptr)); + contact_destroy(alice_contact_ptr); + // - Contact for Bob + let alice_wallet_address = + TariWalletAddress::new(alice_node_identity.public_key().clone(), Network::LocalNet); + let bob_contact_alias_ptr: *const c_char = + CString::into_raw(CString::new("alice").unwrap()) as *const c_char; + let bob_contact_address_ptr = Box::into_raw(Box::new(alice_wallet_address.clone())); + let bob_contact_ptr = contact_create(bob_contact_alias_ptr, bob_contact_address_ptr, true, error_ptr); + tari_address_destroy(bob_contact_address_ptr); + assert!(wallet_upsert_contact(bob_wallet_ptr, bob_contact_ptr, error_ptr)); + contact_destroy(bob_contact_ptr); + + // Use comms service - do `dial_peer` for both wallets (we do not 'assert!' here to not make the test flaky) + // Note: This loop is just to make sure we actually connect as the first attempts do not always succeed + let alice_wallet_runtime = &(*alice_wallet_ptr).runtime; + let bob_wallet_runtime = &(*bob_wallet_ptr).runtime; + let mut alice_dialed_bob = false; + let mut bob_dialed_alice = false; + let mut dial_count = 0; + loop { + dial_count += 1; + if !alice_dialed_bob { + alice_dialed_bob = alice_wallet_runtime + .block_on( + alice_wallet_comms + .connectivity() + .dial_peer(bob_node_identity.node_id().clone()), + ) + .is_ok(); + } + if !bob_dialed_alice { + bob_dialed_alice = bob_wallet_runtime + .block_on( + bob_wallet_comms + .connectivity() + .dial_peer(alice_node_identity.node_id().clone()), + ) + .is_ok(); + } + if alice_dialed_bob && bob_dialed_alice || dial_count > 10 { + break; + } + // Wait a bit before the next attempt + alice_wallet_runtime.block_on(async { tokio::time::sleep(Duration::from_millis(500)).await }); + } + + // Use contacts service - send some messages for both wallets + let mut alice_wallet_contacts_service = (*alice_wallet_ptr).wallet.contacts_service.clone(); + let mut bob_wallet_contacts_service = (*bob_wallet_ptr).wallet.contacts_service.clone(); + let mut alice_msg_count = 0; + let mut bob_msg_count = 0; + // Note: This loop is just to make sure we actually send a couple of messages as the first attempts do not + // always succeed (we do not 'assert!' here to not make the test flaky) + for i in 0..60 { + if alice_msg_count < 5 { + let alice_message_result = + alice_wallet_runtime.block_on(alice_wallet_contacts_service.send_message(Message { + body: vec![i], + metadata: vec![MessageMetadata::default()], + address: bob_wallet_address.clone(), + direction: Direction::Outbound, + stored_at: u64::from(i), + delivery_confirmation_at: None, + read_confirmation_at: None, + message_id: vec![i], + })); + if alice_message_result.is_ok() { + alice_msg_count += 1; + } + } + if bob_msg_count < 5 { + let bob_message_result = + bob_wallet_runtime.block_on(bob_wallet_contacts_service.send_message(Message { + body: vec![i], + metadata: vec![MessageMetadata::default()], + address: alice_wallet_address.clone(), + direction: Direction::Outbound, + stored_at: u64::from(i), + delivery_confirmation_at: None, + read_confirmation_at: None, + message_id: vec![i], + })); + if bob_message_result.is_ok() { + bob_msg_count += 1; + } + } + if alice_msg_count >= 5 && bob_msg_count >= 5 { + break; + } + // Wait a bit before the next attempt + alice_wallet_runtime.block_on(async { tokio::time::sleep(Duration::from_millis(500)).await }); + } + + // Trigger Alice wallet shutdown (same as `pub unsafe extern "C" fn wallet_destroy(wallet: *mut TariWallet)` + wallet_destroy(alice_wallet_ptr); + + // Bob's peer connection to Alice will still be active for a short while until Bob figures out Alice is + // gone, and a 'dial_peer' command to Alice from Bob may return the previous connection state, but it + // should not be possible to do anything with the connection. + let bob_comms_dial_peer = bob_wallet_runtime.block_on( + bob_wallet_comms + .connectivity() + .dial_peer(alice_node_identity.node_id().clone()), + ); + if let Ok(mut connection_to_alice) = bob_comms_dial_peer { + if bob_wallet_runtime + .block_on(connection_to_alice.open_substream(&MESSAGING_PROTOCOL_ID.clone())) + .is_ok() + { + panic!("Connection to Alice should not be active!"); + } + } + + // - Bob can still retrieve messages Alice sent + let bob_contacts_get_messages = + bob_wallet_runtime.block_on(bob_wallet_contacts_service.get_messages(alice_wallet_address, 1, 1)); + assert!(bob_contacts_get_messages.is_ok()); + + // Cleanup + wallet_destroy(bob_wallet_ptr); + } + } }