Skip to content

Commit

Permalink
refactor: use wallet connectivity in wallet services (#3391)
Browse files Browse the repository at this point in the history
* refactor: reintroduce generic substream in client rpc

Generic substream was removed in order to more closely test RPC/yamux
interaction but that was incorrect. The tests should have just been
updated without removing the generic. This PR reintroduces them so that
an RPC client mock can more easily be created.

* refactor: use wallet connectivity in wallet services

- use wallet connectivity in broadcast protocol
- remove base node update code in multiple services
- use wallet connectivity in transaction validation protocol
- use wallet connectivity in TXO validation task
- update/fix tests
  • Loading branch information
sdbondi authored Sep 28, 2021
1 parent 6774c70 commit d440afd
Show file tree
Hide file tree
Showing 57 changed files with 890 additions and 1,097 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion applications/tari_console_wallet/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,4 +56,4 @@ default-features = false
features = ["crossterm"]

[features]
avx2 = []
avx2 = []
1 change: 0 additions & 1 deletion applications/tari_console_wallet/src/init/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,6 @@ pub async fn init_wallet(
base_node_query_timeout: config.base_node_query_timeout,
prevent_fee_gt_amount: config.prevent_fee_gt_amount,
event_channel_size: config.output_manager_event_channel_size,
base_node_update_publisher_channel_size: config.base_node_update_publisher_channel_size,
num_confirmations_required: config.transaction_num_confirmations_required,
..Default::default()
}),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use crate::ui::{components::Component, state::AppState};
use tari_wallet::connectivity_service::OnlineStatus;
use tari_wallet::connectivity_service::{OnlineStatus, WalletConnectivityInterface};
use tui::{
backend::Backend,
layout::Rect,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use std::sync::Arc;
use tari_comms::{connectivity::ConnectivityEvent, peer_manager::Peer};
use tari_wallet::{
base_node_service::{handle::BaseNodeEvent, service::BaseNodeState},
connectivity_service::WalletConnectivityInterface,
output_manager_service::{handle::OutputManagerEvent, TxId},
transaction_service::handle::TransactionEvent,
};
Expand Down Expand Up @@ -62,6 +63,7 @@ impl WalletEventMonitor {
let mut connectivity_events = self.app_state_inner.read().await.get_connectivity_event_stream();
let wallet_connectivity = self.app_state_inner.read().await.get_wallet_connectivity();
let mut connectivity_status = wallet_connectivity.get_connectivity_status_watch();
let mut base_node_changed = wallet_connectivity.get_current_base_node_watcher();

let mut base_node_events = self.app_state_inner.read().await.get_base_node_event_stream();
let mut software_update_notif = self
Expand Down Expand Up @@ -166,6 +168,13 @@ impl WalletEventMonitor {
Err(broadcast::error::RecvError::Closed) => {}
}
},
_ = base_node_changed.changed() => {
let peer = base_node_changed.borrow().as_ref().cloned();
if let Some(peer) = peer {
self.trigger_base_node_peer_refresh(peer).await;
self.trigger_balance_refresh();
}
}
result = base_node_events.recv() => {
match result {
Ok(msg) => {
Expand All @@ -174,10 +183,6 @@ impl WalletEventMonitor {
BaseNodeEvent::BaseNodeStateChanged(state) => {
self.trigger_base_node_state_refresh(state).await;
}
BaseNodeEvent::BaseNodePeerSet(peer) => {
self.trigger_base_node_peer_refresh(*peer).await;
self.trigger_balance_refresh();
}
}
},
Err(broadcast::error::RecvError::Lagged(n)) => {
Expand Down
2 changes: 1 addition & 1 deletion applications/tari_console_wallet/src/wallet_modes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ pub fn tui_mode(config: WalletModeConfig, mut wallet: WalletSqlite) -> Result<()
base_node_config.base_node_custom = base_node_custom.clone();
if let Some(peer) = base_node_custom {
base_node_selected = peer;
} else if let Some(peer) = handle.block_on(wallet.get_base_node_peer())? {
} else if let Some(peer) = handle.block_on(wallet.get_base_node_peer()) {
base_node_selected = peer;
}

Expand Down
15 changes: 8 additions & 7 deletions base_layer/wallet/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,27 +19,28 @@ tari_shutdown = { version = "^0.10", path = "../../infrastructure/shutdown" }
tari_storage = { version = "^0.10", path = "../../infrastructure/storage" }

aes-gcm = "^0.8"
async-trait = "0.1.50"
bincode = "1.3.1"
blake2 = "0.9.0"
chrono = { version = "0.4.6", features = ["serde"] }
crossbeam-channel = "0.3.8"
digest = "0.9.0"
diesel = { version = "1.4.7", features = ["sqlite", "serde_json", "chrono"] }
diesel_migrations = "1.4.0"
libsqlite3-sys = { version = ">=0.8.0, <0.13.0", features = ["bundled"], optional = true }
digest = "0.9.0"
fs2 = "0.3.0"
futures = { version = "^0.3.1", features = ["compat", "std"] }
libsqlite3-sys = { version = ">=0.8.0, <0.13.0", features = ["bundled"], optional = true }
lmdb-zero = "0.4.4"
log = "0.4.6"
log4rs = { version = "1.0.0", features = ["console_appender", "file_appender", "yaml_format"] }
lmdb-zero = "0.4.4"
rand = "0.8"
serde = { version = "1.0.89", features = ["derive"] }
serde_json = "1.0.39"
tokio = { version = "1.11", features = ["sync", "macros"] }
tower = "0.3.0-alpha.2"
tempfile = "3.1.0"
time = { version = "0.1.39" }
thiserror = "1.0.26"
bincode = "1.3.1"
time = { version = "0.1.39" }
tokio = { version = "1.11", features = ["sync", "macros"] }
tower = "0.3.0-alpha.2"

[dependencies.tari_core]
path = "../../base_layer/core"
Expand Down
24 changes: 0 additions & 24 deletions base_layer/wallet/src/base_node_service/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
use super::{error::BaseNodeServiceError, service::BaseNodeState};
use std::{sync::Arc, time::Duration};
use tari_common_types::chain_metadata::ChainMetadata;
use tari_comms::peer_manager::Peer;
use tari_service_framework::reply_channel::SenderService;
use tokio::sync::broadcast;
use tower::Service;
Expand All @@ -34,22 +33,17 @@ pub type BaseNodeEventReceiver = broadcast::Receiver<Arc<BaseNodeEvent>>;
#[derive(Debug)]
pub enum BaseNodeServiceRequest {
GetChainMetadata,
SetBaseNodePeer(Box<Peer>),
GetBaseNodePeer,
GetBaseNodeLatency,
}
/// API Response enum
#[derive(Debug)]
pub enum BaseNodeServiceResponse {
ChainMetadata(Option<ChainMetadata>),
BaseNodePeerSet,
BaseNodePeer(Option<Box<Peer>>),
Latency(Option<Duration>),
}
#[derive(Clone, Debug, Hash, PartialEq, Eq)]
pub enum BaseNodeEvent {
BaseNodeStateChanged(BaseNodeState),
BaseNodePeerSet(Box<Peer>),
}

/// The Base Node Service Handle is a struct that contains the interfaces used to communicate with a running
Expand Down Expand Up @@ -82,24 +76,6 @@ impl BaseNodeServiceHandle {
}
}

pub async fn set_base_node_peer(&mut self, peer: Peer) -> Result<(), BaseNodeServiceError> {
match self
.handle
.call(BaseNodeServiceRequest::SetBaseNodePeer(Box::new(peer)))
.await??
{
BaseNodeServiceResponse::BaseNodePeerSet => Ok(()),
_ => Err(BaseNodeServiceError::UnexpectedApiResponse),
}
}

pub async fn get_base_node_peer(&mut self) -> Result<Option<Peer>, BaseNodeServiceError> {
match self.handle.call(BaseNodeServiceRequest::GetBaseNodePeer).await?? {
BaseNodeServiceResponse::BaseNodePeer(peer) => Ok(peer.map(|p| *p)),
_ => Err(BaseNodeServiceError::UnexpectedApiResponse),
}
}

pub async fn get_base_node_latency(&mut self) -> Result<Option<Duration>, BaseNodeServiceError> {
match self.handle.call(BaseNodeServiceRequest::GetBaseNodeLatency).await?? {
BaseNodeServiceResponse::Latency(latency) => Ok(latency),
Expand Down
12 changes: 0 additions & 12 deletions base_layer/wallet/src/base_node_service/mock_base_node_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,24 +103,12 @@ impl MockBaseNodeService {
}
}

fn set_base_node_peer(&mut self, peer: Peer) {
self.base_node_peer = Some(peer);
}

/// This handler is called when requests arrive from the various streams
fn handle_request(
&mut self,
request: BaseNodeServiceRequest,
) -> Result<BaseNodeServiceResponse, BaseNodeServiceError> {
match request {
BaseNodeServiceRequest::SetBaseNodePeer(peer) => {
self.set_base_node_peer(*peer);
Ok(BaseNodeServiceResponse::BaseNodePeerSet)
},
BaseNodeServiceRequest::GetBaseNodePeer => {
let peer = self.base_node_peer.clone();
Ok(BaseNodeServiceResponse::BaseNodePeer(peer.map(Box::new)))
},
BaseNodeServiceRequest::GetChainMetadata => Ok(BaseNodeServiceResponse::ChainMetadata(
self.state.chain_metadata.clone(),
)),
Expand Down
18 changes: 11 additions & 7 deletions base_layer/wallet/src/base_node_service/monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use crate::{
handle::{BaseNodeEvent, BaseNodeEventSender},
service::BaseNodeState,
},
connectivity_service::WalletConnectivityHandle,
connectivity_service::WalletConnectivityInterface,
error::WalletStorageError,
storage::database::{WalletBackend, WalletDatabase},
};
Expand All @@ -42,20 +42,24 @@ use tokio::{sync::RwLock, time};

const LOG_TARGET: &str = "wallet::base_node_service::chain_metadata_monitor";

pub struct BaseNodeMonitor<T> {
pub struct BaseNodeMonitor<TBackend, TWalletConnectivity> {
interval: Duration,
state: Arc<RwLock<BaseNodeState>>,
db: WalletDatabase<T>,
wallet_connectivity: WalletConnectivityHandle,
db: WalletDatabase<TBackend>,
wallet_connectivity: TWalletConnectivity,
event_publisher: BaseNodeEventSender,
}

impl<T: WalletBackend + 'static> BaseNodeMonitor<T> {
impl<TBackend, TWalletConnectivity> BaseNodeMonitor<TBackend, TWalletConnectivity>
where
TBackend: WalletBackend + 'static,
TWalletConnectivity: WalletConnectivityInterface,
{
pub fn new(
interval: Duration,
state: Arc<RwLock<BaseNodeState>>,
db: WalletDatabase<T>,
wallet_connectivity: WalletConnectivityHandle,
db: WalletDatabase<TBackend>,
wallet_connectivity: TWalletConnectivity,
event_publisher: BaseNodeEventSender,
) -> Self {
Self {
Expand Down
27 changes: 1 addition & 26 deletions base_layer/wallet/src/base_node_service/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
use super::{
config::BaseNodeServiceConfig,
error::BaseNodeServiceError,
handle::{BaseNodeEvent, BaseNodeEventSender, BaseNodeServiceRequest, BaseNodeServiceResponse},
handle::{BaseNodeEventSender, BaseNodeServiceRequest, BaseNodeServiceResponse},
};
use crate::{
base_node_service::monitor::BaseNodeMonitor,
Expand All @@ -35,7 +35,6 @@ use futures::{future, StreamExt};
use log::*;
use std::{sync::Arc, time::Duration};
use tari_common_types::chain_metadata::ChainMetadata;
use tari_comms::peer_manager::Peer;
use tari_service_framework::reply_channel::Receiver;
use tari_shutdown::ShutdownSignal;
use tokio::sync::RwLock;
Expand Down Expand Up @@ -153,12 +152,6 @@ where T: WalletBackend + 'static
Ok(())
}

async fn set_base_node_peer(&mut self, peer: Peer) -> Result<(), BaseNodeServiceError> {
self.wallet_connectivity.set_base_node(peer.clone()).await?;
self.publish_event(BaseNodeEvent::BaseNodePeerSet(Box::new(peer)));
Ok(())
}

/// This handler is called when requests arrive from the various streams
async fn handle_request(
&mut self,
Expand All @@ -169,14 +162,6 @@ where T: WalletBackend + 'static
"Handling Wallet Base Node Service Request: {:?}", request
);
match request {
BaseNodeServiceRequest::SetBaseNodePeer(peer) => {
self.set_base_node_peer(*peer).await?;
Ok(BaseNodeServiceResponse::BaseNodePeerSet)
},
BaseNodeServiceRequest::GetBaseNodePeer => {
let peer = self.wallet_connectivity.get_current_base_node_peer().map(Box::new);
Ok(BaseNodeServiceResponse::BaseNodePeer(peer))
},
BaseNodeServiceRequest::GetChainMetadata => match self.get_state().await.chain_metadata.clone() {
Some(metadata) => Ok(BaseNodeServiceResponse::ChainMetadata(Some(metadata))),
None => {
Expand All @@ -190,14 +175,4 @@ where T: WalletBackend + 'static
},
}
}

fn publish_event(&self, event: BaseNodeEvent) {
trace!(target: LOG_TARGET, "Publishing event: {:?}", event);
let _ = self.event_publisher.send(Arc::new(event)).map_err(|_| {
trace!(
target: LOG_TARGET,
"Could not publish BaseNodeEvent as there are no subscribers"
)
});
}
}
35 changes: 25 additions & 10 deletions base_layer/wallet/src/connectivity_service/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,11 @@
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use super::service::OnlineStatus;
use crate::connectivity_service::{error::WalletConnectivityError, watch::Watch};
use crate::{connectivity_service::WalletConnectivityInterface, util::watch::Watch};
use tari_comms::{
peer_manager::{NodeId, Peer},
protocol::rpc::RpcClientLease,
types::CommsPublicKey,
};
use tari_core::base_node::{rpc::BaseNodeWalletRpcClient, sync::rpc::BaseNodeSyncRpcClient};
use tokio::sync::{mpsc, oneshot, watch};
Expand Down Expand Up @@ -53,10 +54,16 @@ impl WalletConnectivityHandle {
online_status_rx,
}
}
}

#[async_trait::async_trait]
impl WalletConnectivityInterface for WalletConnectivityHandle {
fn set_base_node(&mut self, base_node_peer: Peer) {
self.base_node_watch.send(Some(base_node_peer));
}

pub async fn set_base_node(&mut self, base_node_peer: Peer) -> Result<(), WalletConnectivityError> {
self.base_node_watch.broadcast(Some(base_node_peer));
Ok(())
fn get_current_base_node_watcher(&self) -> watch::Receiver<Option<Peer>> {
self.base_node_watch.get_receiver()
}

/// Obtain a BaseNodeWalletRpcClient.
Expand All @@ -65,7 +72,7 @@ impl WalletConnectivityHandle {
/// node/nodes. It will block until this happens. The ONLY other time it will return is if the node is
/// shutting down, where it will return None. Use this function whenever no work can be done without a
/// BaseNodeWalletRpcClient RPC session.
pub async fn obtain_base_node_wallet_rpc_client(&mut self) -> Option<RpcClientLease<BaseNodeWalletRpcClient>> {
async fn obtain_base_node_wallet_rpc_client(&mut self) -> Option<RpcClientLease<BaseNodeWalletRpcClient>> {
let (reply_tx, reply_rx) = oneshot::channel();
// Under what conditions do the (1) mpsc channel and (2) oneshot channel error?
// (1) when the receiver has been dropped
Expand All @@ -88,7 +95,7 @@ impl WalletConnectivityHandle {
/// node/nodes. It will block until this happens. The ONLY other time it will return is if the node is
/// shutting down, where it will return None. Use this function whenever no work can be done without a
/// BaseNodeSyncRpcClient RPC session.
pub async fn obtain_base_node_sync_rpc_client(&mut self) -> Option<RpcClientLease<BaseNodeSyncRpcClient>> {
async fn obtain_base_node_sync_rpc_client(&mut self) -> Option<RpcClientLease<BaseNodeSyncRpcClient>> {
let (reply_tx, reply_rx) = oneshot::channel();
self.sender
.send(WalletConnectivityRequest::ObtainBaseNodeSyncRpcClient(reply_tx))
Expand All @@ -98,19 +105,27 @@ impl WalletConnectivityHandle {
reply_rx.await.ok()
}

pub fn get_connectivity_status(&mut self) -> OnlineStatus {
fn get_connectivity_status(&mut self) -> OnlineStatus {
*self.online_status_rx.borrow()
}

pub fn get_connectivity_status_watch(&self) -> watch::Receiver<OnlineStatus> {
fn get_connectivity_status_watch(&self) -> watch::Receiver<OnlineStatus> {
self.online_status_rx.clone()
}

pub fn get_current_base_node_peer(&self) -> Option<Peer> {
fn get_current_base_node_peer(&self) -> Option<Peer> {
self.base_node_watch.borrow().clone()
}

pub fn get_current_base_node_id(&self) -> Option<NodeId> {
fn get_current_base_node_peer_public_key(&self) -> Option<CommsPublicKey> {
self.base_node_watch.borrow().as_ref().map(|p| p.public_key.clone())
}

fn get_current_base_node_id(&self) -> Option<NodeId> {
self.base_node_watch.borrow().as_ref().map(|p| p.node_id.clone())
}

fn is_base_node_set(&self) -> bool {
self.base_node_watch.borrow().is_some()
}
}
8 changes: 6 additions & 2 deletions base_layer/wallet/src/connectivity_service/initializer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,12 @@
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use super::{handle::WalletConnectivityHandle, service::WalletConnectivityService, watch::Watch};
use crate::{base_node_service::config::BaseNodeServiceConfig, connectivity_service::service::OnlineStatus};
use super::{handle::WalletConnectivityHandle, service::WalletConnectivityService};
use crate::{
base_node_service::config::BaseNodeServiceConfig,
connectivity_service::service::OnlineStatus,
util::watch::Watch,
};
use tari_service_framework::{async_trait, ServiceInitializationError, ServiceInitializer, ServiceInitializerContext};
use tokio::sync::mpsc;

Expand Down
Loading

0 comments on commit d440afd

Please sign in to comment.