Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: use wallet connectivity in wallet services #3391

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion applications/tari_console_wallet/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,4 +56,4 @@ default-features = false
features = ["crossterm"]

[features]
avx2 = []
avx2 = []
1 change: 0 additions & 1 deletion applications/tari_console_wallet/src/init/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,6 @@ pub async fn init_wallet(
base_node_query_timeout: config.base_node_query_timeout,
prevent_fee_gt_amount: config.prevent_fee_gt_amount,
event_channel_size: config.output_manager_event_channel_size,
base_node_update_publisher_channel_size: config.base_node_update_publisher_channel_size,
num_confirmations_required: config.transaction_num_confirmations_required,
..Default::default()
}),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use crate::ui::{components::Component, state::AppState};
use tari_wallet::connectivity_service::OnlineStatus;
use tari_wallet::connectivity_service::{OnlineStatus, WalletConnectivityInterface};
use tui::{
backend::Backend,
layout::Rect,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use std::sync::Arc;
use tari_comms::{connectivity::ConnectivityEvent, peer_manager::Peer};
use tari_wallet::{
base_node_service::{handle::BaseNodeEvent, service::BaseNodeState},
connectivity_service::WalletConnectivityInterface,
output_manager_service::{handle::OutputManagerEvent, TxId},
transaction_service::handle::TransactionEvent,
};
Expand Down Expand Up @@ -62,6 +63,7 @@ impl WalletEventMonitor {
let mut connectivity_events = self.app_state_inner.read().await.get_connectivity_event_stream();
let wallet_connectivity = self.app_state_inner.read().await.get_wallet_connectivity();
let mut connectivity_status = wallet_connectivity.get_connectivity_status_watch();
let mut base_node_changed = wallet_connectivity.get_current_base_node_watcher();

let mut base_node_events = self.app_state_inner.read().await.get_base_node_event_stream();
let mut software_update_notif = self
Expand Down Expand Up @@ -166,6 +168,13 @@ impl WalletEventMonitor {
Err(broadcast::error::RecvError::Closed) => {}
}
},
_ = base_node_changed.changed() => {
let peer = base_node_changed.borrow().as_ref().cloned();
if let Some(peer) = peer {
self.trigger_base_node_peer_refresh(peer).await;
self.trigger_balance_refresh();
}
}
result = base_node_events.recv() => {
match result {
Ok(msg) => {
Expand All @@ -174,10 +183,6 @@ impl WalletEventMonitor {
BaseNodeEvent::BaseNodeStateChanged(state) => {
self.trigger_base_node_state_refresh(state).await;
}
BaseNodeEvent::BaseNodePeerSet(peer) => {
self.trigger_base_node_peer_refresh(*peer).await;
self.trigger_balance_refresh();
}
}
},
Err(broadcast::error::RecvError::Lagged(n)) => {
Expand Down
2 changes: 1 addition & 1 deletion applications/tari_console_wallet/src/wallet_modes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ pub fn tui_mode(config: WalletModeConfig, mut wallet: WalletSqlite) -> Result<()
base_node_config.base_node_custom = base_node_custom.clone();
if let Some(peer) = base_node_custom {
base_node_selected = peer;
} else if let Some(peer) = handle.block_on(wallet.get_base_node_peer())? {
} else if let Some(peer) = handle.block_on(wallet.get_base_node_peer()) {
base_node_selected = peer;
}

Expand Down
15 changes: 8 additions & 7 deletions base_layer/wallet/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,27 +19,28 @@ tari_shutdown = { version = "^0.10", path = "../../infrastructure/shutdown" }
tari_storage = { version = "^0.10", path = "../../infrastructure/storage" }

aes-gcm = "^0.8"
async-trait = "0.1.50"
bincode = "1.3.1"
blake2 = "0.9.0"
chrono = { version = "0.4.6", features = ["serde"] }
crossbeam-channel = "0.3.8"
digest = "0.9.0"
diesel = { version = "1.4.7", features = ["sqlite", "serde_json", "chrono"] }
diesel_migrations = "1.4.0"
libsqlite3-sys = { version = ">=0.8.0, <0.13.0", features = ["bundled"], optional = true }
digest = "0.9.0"
fs2 = "0.3.0"
futures = { version = "^0.3.1", features = ["compat", "std"] }
libsqlite3-sys = { version = ">=0.8.0, <0.13.0", features = ["bundled"], optional = true }
lmdb-zero = "0.4.4"
log = "0.4.6"
log4rs = { version = "1.0.0", features = ["console_appender", "file_appender", "yaml_format"] }
lmdb-zero = "0.4.4"
rand = "0.8"
serde = { version = "1.0.89", features = ["derive"] }
serde_json = "1.0.39"
tokio = { version = "1.11", features = ["sync", "macros"] }
tower = "0.3.0-alpha.2"
tempfile = "3.1.0"
time = { version = "0.1.39" }
thiserror = "1.0.26"
bincode = "1.3.1"
time = { version = "0.1.39" }
tokio = { version = "1.11", features = ["sync", "macros"] }
tower = "0.3.0-alpha.2"

[dependencies.tari_core]
path = "../../base_layer/core"
Expand Down
24 changes: 0 additions & 24 deletions base_layer/wallet/src/base_node_service/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
use super::{error::BaseNodeServiceError, service::BaseNodeState};
use std::{sync::Arc, time::Duration};
use tari_common_types::chain_metadata::ChainMetadata;
use tari_comms::peer_manager::Peer;
use tari_service_framework::reply_channel::SenderService;
use tokio::sync::broadcast;
use tower::Service;
Expand All @@ -34,22 +33,17 @@ pub type BaseNodeEventReceiver = broadcast::Receiver<Arc<BaseNodeEvent>>;
#[derive(Debug)]
pub enum BaseNodeServiceRequest {
GetChainMetadata,
SetBaseNodePeer(Box<Peer>),
GetBaseNodePeer,
GetBaseNodeLatency,
}
/// API Response enum
#[derive(Debug)]
pub enum BaseNodeServiceResponse {
ChainMetadata(Option<ChainMetadata>),
BaseNodePeerSet,
BaseNodePeer(Option<Box<Peer>>),
Latency(Option<Duration>),
}
#[derive(Clone, Debug, Hash, PartialEq, Eq)]
pub enum BaseNodeEvent {
BaseNodeStateChanged(BaseNodeState),
BaseNodePeerSet(Box<Peer>),
}

/// The Base Node Service Handle is a struct that contains the interfaces used to communicate with a running
Expand Down Expand Up @@ -82,24 +76,6 @@ impl BaseNodeServiceHandle {
}
}

pub async fn set_base_node_peer(&mut self, peer: Peer) -> Result<(), BaseNodeServiceError> {
match self
.handle
.call(BaseNodeServiceRequest::SetBaseNodePeer(Box::new(peer)))
.await??
{
BaseNodeServiceResponse::BaseNodePeerSet => Ok(()),
_ => Err(BaseNodeServiceError::UnexpectedApiResponse),
}
}

pub async fn get_base_node_peer(&mut self) -> Result<Option<Peer>, BaseNodeServiceError> {
match self.handle.call(BaseNodeServiceRequest::GetBaseNodePeer).await?? {
BaseNodeServiceResponse::BaseNodePeer(peer) => Ok(peer.map(|p| *p)),
_ => Err(BaseNodeServiceError::UnexpectedApiResponse),
}
}

pub async fn get_base_node_latency(&mut self) -> Result<Option<Duration>, BaseNodeServiceError> {
match self.handle.call(BaseNodeServiceRequest::GetBaseNodeLatency).await?? {
BaseNodeServiceResponse::Latency(latency) => Ok(latency),
Expand Down
12 changes: 0 additions & 12 deletions base_layer/wallet/src/base_node_service/mock_base_node_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,24 +103,12 @@ impl MockBaseNodeService {
}
}

fn set_base_node_peer(&mut self, peer: Peer) {
self.base_node_peer = Some(peer);
}

/// This handler is called when requests arrive from the various streams
fn handle_request(
&mut self,
request: BaseNodeServiceRequest,
) -> Result<BaseNodeServiceResponse, BaseNodeServiceError> {
match request {
BaseNodeServiceRequest::SetBaseNodePeer(peer) => {
self.set_base_node_peer(*peer);
Ok(BaseNodeServiceResponse::BaseNodePeerSet)
},
BaseNodeServiceRequest::GetBaseNodePeer => {
let peer = self.base_node_peer.clone();
Ok(BaseNodeServiceResponse::BaseNodePeer(peer.map(Box::new)))
},
BaseNodeServiceRequest::GetChainMetadata => Ok(BaseNodeServiceResponse::ChainMetadata(
self.state.chain_metadata.clone(),
)),
Expand Down
18 changes: 11 additions & 7 deletions base_layer/wallet/src/base_node_service/monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use crate::{
handle::{BaseNodeEvent, BaseNodeEventSender},
service::BaseNodeState,
},
connectivity_service::WalletConnectivityHandle,
connectivity_service::WalletConnectivityInterface,
error::WalletStorageError,
storage::database::{WalletBackend, WalletDatabase},
};
Expand All @@ -42,20 +42,24 @@ use tokio::{sync::RwLock, time};

const LOG_TARGET: &str = "wallet::base_node_service::chain_metadata_monitor";

pub struct BaseNodeMonitor<T> {
pub struct BaseNodeMonitor<TBackend, TWalletConnectivity> {
interval: Duration,
state: Arc<RwLock<BaseNodeState>>,
db: WalletDatabase<T>,
wallet_connectivity: WalletConnectivityHandle,
db: WalletDatabase<TBackend>,
wallet_connectivity: TWalletConnectivity,
event_publisher: BaseNodeEventSender,
}

impl<T: WalletBackend + 'static> BaseNodeMonitor<T> {
impl<TBackend, TWalletConnectivity> BaseNodeMonitor<TBackend, TWalletConnectivity>
where
TBackend: WalletBackend + 'static,
TWalletConnectivity: WalletConnectivityInterface,
{
pub fn new(
interval: Duration,
state: Arc<RwLock<BaseNodeState>>,
db: WalletDatabase<T>,
wallet_connectivity: WalletConnectivityHandle,
db: WalletDatabase<TBackend>,
wallet_connectivity: TWalletConnectivity,
event_publisher: BaseNodeEventSender,
) -> Self {
Self {
Expand Down
27 changes: 1 addition & 26 deletions base_layer/wallet/src/base_node_service/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
use super::{
config::BaseNodeServiceConfig,
error::BaseNodeServiceError,
handle::{BaseNodeEvent, BaseNodeEventSender, BaseNodeServiceRequest, BaseNodeServiceResponse},
handle::{BaseNodeEventSender, BaseNodeServiceRequest, BaseNodeServiceResponse},
};
use crate::{
base_node_service::monitor::BaseNodeMonitor,
Expand All @@ -35,7 +35,6 @@ use futures::{future, StreamExt};
use log::*;
use std::{sync::Arc, time::Duration};
use tari_common_types::chain_metadata::ChainMetadata;
use tari_comms::peer_manager::Peer;
use tari_service_framework::reply_channel::Receiver;
use tari_shutdown::ShutdownSignal;
use tokio::sync::RwLock;
Expand Down Expand Up @@ -153,12 +152,6 @@ where T: WalletBackend + 'static
Ok(())
}

async fn set_base_node_peer(&mut self, peer: Peer) -> Result<(), BaseNodeServiceError> {
self.wallet_connectivity.set_base_node(peer.clone()).await?;
self.publish_event(BaseNodeEvent::BaseNodePeerSet(Box::new(peer)));
Ok(())
}

/// This handler is called when requests arrive from the various streams
async fn handle_request(
&mut self,
Expand All @@ -169,14 +162,6 @@ where T: WalletBackend + 'static
"Handling Wallet Base Node Service Request: {:?}", request
);
match request {
BaseNodeServiceRequest::SetBaseNodePeer(peer) => {
self.set_base_node_peer(*peer).await?;
Ok(BaseNodeServiceResponse::BaseNodePeerSet)
},
BaseNodeServiceRequest::GetBaseNodePeer => {
let peer = self.wallet_connectivity.get_current_base_node_peer().map(Box::new);
Ok(BaseNodeServiceResponse::BaseNodePeer(peer))
},
BaseNodeServiceRequest::GetChainMetadata => match self.get_state().await.chain_metadata.clone() {
Some(metadata) => Ok(BaseNodeServiceResponse::ChainMetadata(Some(metadata))),
None => {
Expand All @@ -190,14 +175,4 @@ where T: WalletBackend + 'static
},
}
}

fn publish_event(&self, event: BaseNodeEvent) {
trace!(target: LOG_TARGET, "Publishing event: {:?}", event);
let _ = self.event_publisher.send(Arc::new(event)).map_err(|_| {
trace!(
target: LOG_TARGET,
"Could not publish BaseNodeEvent as there are no subscribers"
)
});
}
}
35 changes: 25 additions & 10 deletions base_layer/wallet/src/connectivity_service/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,11 @@
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use super::service::OnlineStatus;
use crate::connectivity_service::{error::WalletConnectivityError, watch::Watch};
use crate::{connectivity_service::WalletConnectivityInterface, util::watch::Watch};
use tari_comms::{
peer_manager::{NodeId, Peer},
protocol::rpc::RpcClientLease,
types::CommsPublicKey,
};
use tari_core::base_node::{rpc::BaseNodeWalletRpcClient, sync::rpc::BaseNodeSyncRpcClient};
use tokio::sync::{mpsc, oneshot, watch};
Expand Down Expand Up @@ -53,10 +54,16 @@ impl WalletConnectivityHandle {
online_status_rx,
}
}
}

#[async_trait::async_trait]
impl WalletConnectivityInterface for WalletConnectivityHandle {
fn set_base_node(&mut self, base_node_peer: Peer) {
self.base_node_watch.send(Some(base_node_peer));
}

pub async fn set_base_node(&mut self, base_node_peer: Peer) -> Result<(), WalletConnectivityError> {
self.base_node_watch.broadcast(Some(base_node_peer));
Ok(())
fn get_current_base_node_watcher(&self) -> watch::Receiver<Option<Peer>> {
self.base_node_watch.get_receiver()
}

/// Obtain a BaseNodeWalletRpcClient.
Expand All @@ -65,7 +72,7 @@ impl WalletConnectivityHandle {
/// node/nodes. It will block until this happens. The ONLY other time it will return is if the node is
/// shutting down, where it will return None. Use this function whenever no work can be done without a
/// BaseNodeWalletRpcClient RPC session.
pub async fn obtain_base_node_wallet_rpc_client(&mut self) -> Option<RpcClientLease<BaseNodeWalletRpcClient>> {
async fn obtain_base_node_wallet_rpc_client(&mut self) -> Option<RpcClientLease<BaseNodeWalletRpcClient>> {
let (reply_tx, reply_rx) = oneshot::channel();
// Under what conditions do the (1) mpsc channel and (2) oneshot channel error?
// (1) when the receiver has been dropped
Expand All @@ -88,7 +95,7 @@ impl WalletConnectivityHandle {
/// node/nodes. It will block until this happens. The ONLY other time it will return is if the node is
/// shutting down, where it will return None. Use this function whenever no work can be done without a
/// BaseNodeSyncRpcClient RPC session.
pub async fn obtain_base_node_sync_rpc_client(&mut self) -> Option<RpcClientLease<BaseNodeSyncRpcClient>> {
async fn obtain_base_node_sync_rpc_client(&mut self) -> Option<RpcClientLease<BaseNodeSyncRpcClient>> {
let (reply_tx, reply_rx) = oneshot::channel();
self.sender
.send(WalletConnectivityRequest::ObtainBaseNodeSyncRpcClient(reply_tx))
Expand All @@ -98,19 +105,27 @@ impl WalletConnectivityHandle {
reply_rx.await.ok()
}

pub fn get_connectivity_status(&mut self) -> OnlineStatus {
fn get_connectivity_status(&mut self) -> OnlineStatus {
*self.online_status_rx.borrow()
}

pub fn get_connectivity_status_watch(&self) -> watch::Receiver<OnlineStatus> {
fn get_connectivity_status_watch(&self) -> watch::Receiver<OnlineStatus> {
self.online_status_rx.clone()
}

pub fn get_current_base_node_peer(&self) -> Option<Peer> {
fn get_current_base_node_peer(&self) -> Option<Peer> {
self.base_node_watch.borrow().clone()
}

pub fn get_current_base_node_id(&self) -> Option<NodeId> {
fn get_current_base_node_peer_public_key(&self) -> Option<CommsPublicKey> {
self.base_node_watch.borrow().as_ref().map(|p| p.public_key.clone())
}

fn get_current_base_node_id(&self) -> Option<NodeId> {
self.base_node_watch.borrow().as_ref().map(|p| p.node_id.clone())
}

fn is_base_node_set(&self) -> bool {
self.base_node_watch.borrow().is_some()
}
}
8 changes: 6 additions & 2 deletions base_layer/wallet/src/connectivity_service/initializer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,12 @@
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use super::{handle::WalletConnectivityHandle, service::WalletConnectivityService, watch::Watch};
use crate::{base_node_service::config::BaseNodeServiceConfig, connectivity_service::service::OnlineStatus};
use super::{handle::WalletConnectivityHandle, service::WalletConnectivityService};
use crate::{
base_node_service::config::BaseNodeServiceConfig,
connectivity_service::service::OnlineStatus,
util::watch::Watch,
};
use tari_service_framework::{async_trait, ServiceInitializationError, ServiceInitializer, ServiceInitializerContext};
use tokio::sync::mpsc;

Expand Down
Loading