diff --git a/Cargo.lock b/Cargo.lock index d674d81979..06ba280191 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4513,8 +4513,6 @@ dependencies = [ "log", "log4rs 1.0.0", "multiaddr", - "opentelemetry", - "opentelemetry-jaeger", "path-clean", "prost-build", "serde 1.0.130", @@ -4527,8 +4525,6 @@ dependencies = [ "thiserror", "toml 0.5.8", "tracing", - "tracing-opentelemetry", - "tracing-subscriber", ] [[package]] diff --git a/base_layer/core/src/base_node/state_machine_service/states/horizon_state_sync/horizon_state_synchronization.rs b/base_layer/core/src/base_node/state_machine_service/states/horizon_state_sync/horizon_state_synchronization.rs index 52c1556bb6..75ff753d50 100644 --- a/base_layer/core/src/base_node/state_machine_service/states/horizon_state_sync/horizon_state_synchronization.rs +++ b/base_layer/core/src/base_node/state_machine_service/states/horizon_state_sync/horizon_state_synchronization.rs @@ -159,7 +159,7 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> { remote_num_kernels - local_num_kernels, ); - let latency = client.get_last_request_latency().await?; + let latency = client.get_last_request_latency(); debug!( target: LOG_TARGET, "Initiating kernel sync with peer `{}` (latency = {}ms)", @@ -287,7 +287,7 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> { let end = remote_num_outputs; let end_hash = to_header.hash(); - let latency = client.get_last_request_latency().await?; + let latency = client.get_last_request_latency(); debug!( target: LOG_TARGET, "Initiating output sync with peer `{}` (latency = {}ms)", diff --git a/base_layer/core/src/base_node/sync/header_sync/synchronizer.rs b/base_layer/core/src/base_node/sync/header_sync/synchronizer.rs index 4ff044218f..ca594087b6 100644 --- a/base_layer/core/src/base_node/sync/header_sync/synchronizer.rs +++ b/base_layer/core/src/base_node/sync/header_sync/synchronizer.rs @@ -218,7 +218,7 @@ impl<'a, B: BlockchainBackend + 'static> HeaderSynchronizer<'a, B> { mut conn: PeerConnection, ) -> Result<(), BlockHeaderSyncError> { let mut client = conn.connect_rpc::().await?; - let latency = client.get_last_request_latency().await?; + let latency = client.get_last_request_latency(); debug!( target: LOG_TARGET, "Initiating header sync with peer `{}` (sync latency = {}ms)", diff --git a/base_layer/wallet/src/base_node_service/monitor.rs b/base_layer/wallet/src/base_node_service/monitor.rs index 11003eff8b..9a7df7630b 100644 --- a/base_layer/wallet/src/base_node_service/monitor.rs +++ b/base_layer/wallet/src/base_node_service/monitor.rs @@ -150,7 +150,7 @@ where timer.elapsed().as_millis() ); - let latency = match client.get_last_request_latency().await? { + let latency = match client.get_last_request_latency() { Some(latency) => latency, None => continue, }; diff --git a/base_layer/wallet/src/utxo_scanner_service/utxo_scanning.rs b/base_layer/wallet/src/utxo_scanner_service/utxo_scanning.rs index d5afe96fd1..ed21d9d029 100644 --- a/base_layer/wallet/src/utxo_scanner_service/utxo_scanning.rs +++ b/base_layer/wallet/src/utxo_scanner_service/utxo_scanning.rs @@ -268,7 +268,7 @@ where TBackend: WalletBackend + 'static .connect_rpc_using_builder(BaseNodeSyncRpcClient::builder().with_deadline(Duration::from_secs(60))) .await?; - let latency = client.get_last_request_latency().await?; + let latency = client.get_last_request_latency(); self.publish_event(UtxoScannerEvent::ConnectedToBaseNode( peer.clone(), latency.unwrap_or_default(), diff --git a/base_layer/wallet/tests/transaction_service/service.rs b/base_layer/wallet/tests/transaction_service/service.rs index d9b058ad49..eb0593bb67 100644 --- a/base_layer/wallet/tests/transaction_service/service.rs +++ b/base_layer/wallet/tests/transaction_service/service.rs @@ -944,7 +944,7 @@ fn test_htlc_send_and_claim() { let bob_connection = run_migration_and_create_sqlite_connection(&bob_db_path, 16).unwrap(); let shutdown = Shutdown::new(); - let (alice_ts, alice_oms, _alice_comms, mut alice_connectivity) = setup_transaction_service( + let (mut alice_ts, mut alice_oms, _alice_comms, mut alice_connectivity) = setup_transaction_service( &mut runtime, alice_node_identity, vec![], @@ -998,10 +998,8 @@ fn test_htlc_send_and_claim() { .expect("Alice sending HTLC transaction") }); - let mut alice_ts_clone2 = alice_ts.clone(); - let mut alice_oms_clone = alice_oms.clone(); runtime.block_on(async move { - let completed_tx = alice_ts_clone2 + let completed_tx = alice_ts .get_completed_transaction(tx_id) .await .expect("Could not find completed HTLC tx"); @@ -1009,7 +1007,7 @@ fn test_htlc_send_and_claim() { let fees = completed_tx.fee; assert_eq!( - alice_oms_clone.get_balance().await.unwrap().pending_incoming_balance, + alice_oms.get_balance().await.unwrap().pending_incoming_balance, initial_wallet_value - value - fees ); }); diff --git a/common/Cargo.toml b/common/Cargo.toml index 8dd805cedd..67cb608d9b 100644 --- a/common/Cargo.toml +++ b/common/Cargo.toml @@ -27,12 +27,6 @@ sha2 = "0.9.5" path-clean = "0.1.0" tari_storage = { version = "^0.21", path = "../infrastructure/storage"} tracing = "0.1.26" -tracing-opentelemetry = "0.15.0" -tracing-subscriber = "0.2.20" - -# network tracing, rt-tokio for async batch export -opentelemetry = { version = "0.16", default-features = false, features = ["trace","rt-tokio"] } -opentelemetry-jaeger = { version="0.15", features=["rt-tokio"]} anyhow = { version = "1.0", optional = true } git2 = { version = "0.8", optional = true } diff --git a/comms/rpc_macros/src/generator.rs b/comms/rpc_macros/src/generator.rs index 312e318284..d3fe17df88 100644 --- a/comms/rpc_macros/src/generator.rs +++ b/comms/rpc_macros/src/generator.rs @@ -208,8 +208,8 @@ impl RpcCodeGenerator { #client_methods - pub async fn get_last_request_latency(&mut self) -> Result, #dep_mod::RpcError> { - self.inner.get_last_request_latency().await + pub fn get_last_request_latency(&mut self) -> Option { + self.inner.get_last_request_latency() } pub async fn ping(&mut self) -> Result { diff --git a/comms/src/protocol/rpc/client/mod.rs b/comms/src/protocol/rpc/client/mod.rs index bdaa57c368..52abf1049f 100644 --- a/comms/src/protocol/rpc/client/mod.rs +++ b/comms/src/protocol/rpc/client/mod.rs @@ -73,7 +73,7 @@ use std::{ use tari_shutdown::{Shutdown, ShutdownSignal}; use tokio::{ io::{AsyncRead, AsyncWrite}, - sync::{mpsc, oneshot, Mutex}, + sync::{mpsc, oneshot, watch, Mutex}, time, }; use tower::{Service, ServiceExt}; @@ -105,7 +105,8 @@ impl RpcClient { let (request_tx, request_rx) = mpsc::channel(1); let shutdown = Shutdown::new(); let shutdown_signal = shutdown.to_signal(); - let connector = ClientConnector::new(request_tx, shutdown); + let (last_request_latency_tx, last_request_latency_rx) = watch::channel(None); + let connector = ClientConnector::new(request_tx, last_request_latency_rx, shutdown); let (ready_tx, ready_rx) = oneshot::channel(); let tracing_id = tracing::Span::current().id(); task::spawn({ @@ -116,6 +117,7 @@ impl RpcClient { config, node_id, request_rx, + last_request_latency_tx, framed, ready_tx, protocol_name, @@ -172,7 +174,7 @@ impl RpcClient { } /// Return the latency of the last request - pub fn get_last_request_latency(&mut self) -> impl Future, RpcError>> + '_ { + pub fn get_last_request_latency(&mut self) -> Option { self.connector.get_last_request_latency() } @@ -315,13 +317,19 @@ impl Default for RpcClientConfig { #[derive(Clone)] pub struct ClientConnector { inner: mpsc::Sender, + last_request_latency_rx: watch::Receiver>, shutdown: Arc>, } impl ClientConnector { - pub(self) fn new(sender: mpsc::Sender, shutdown: Shutdown) -> Self { + pub(self) fn new( + sender: mpsc::Sender, + last_request_latency_rx: watch::Receiver>, + shutdown: Shutdown, + ) -> Self { Self { inner: sender, + last_request_latency_rx, shutdown: Arc::new(Mutex::new(shutdown)), } } @@ -331,14 +339,8 @@ impl ClientConnector { lock.trigger(); } - pub async fn get_last_request_latency(&mut self) -> Result, RpcError> { - let (reply, reply_rx) = oneshot::channel(); - self.inner - .send(ClientRequest::GetLastRequestLatency(reply)) - .await - .map_err(|_| RpcError::ClientClosed)?; - - reply_rx.await.map_err(|_| RpcError::RequestCancelled) + pub fn get_last_request_latency(&mut self) -> Option { + *self.last_request_latency_rx.borrow() } pub async fn send_ping(&mut self) -> Result { @@ -391,12 +393,12 @@ struct RpcClientWorker { config: RpcClientConfig, node_id: NodeId, request_rx: mpsc::Receiver, + last_request_latency_tx: watch::Sender>, framed: CanonicalFraming, // Request ids are limited to u16::MAX because varint encoding is used over the wire and the magnitude of the value // sent determines the byte size. A u16 will be more than enough for the purpose next_request_id: u16, ready_tx: Option>>, - last_request_latency: Option, protocol_id: ProtocolId, shutdown_signal: ShutdownSignal, } @@ -404,10 +406,12 @@ struct RpcClientWorker { impl RpcClientWorker where TSubstream: AsyncRead + AsyncWrite + Unpin + Send + StreamId { + #[allow(clippy::too_many_arguments)] pub(self) fn new( config: RpcClientConfig, node_id: NodeId, request_rx: mpsc::Receiver, + last_request_latency_tx: watch::Sender>, framed: CanonicalFraming, ready_tx: oneshot::Sender>, protocol_id: ProtocolId, @@ -420,7 +424,7 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin + Send + StreamId framed, next_request_id: 0, ready_tx: Some(ready_tx), - last_request_latency: None, + last_request_latency_tx, protocol_id, shutdown_signal, } @@ -454,7 +458,7 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin + Send + StreamId self.protocol_name(), latency ); - self.last_request_latency = Some(latency); + let _ = self.last_request_latency_tx.send(Some(latency)); if let Some(r) = self.ready_tx.take() { let _ = r.send(Ok(())); } @@ -514,9 +518,6 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin + Send + StreamId SendRequest { request, reply } => { self.do_request_response(request, reply).await?; }, - GetLastRequestLatency(reply) => { - let _ = reply.send(self.last_request_latency); - }, SendPing(reply) => { self.do_ping_pong(reply).await?; }, @@ -647,7 +648,7 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin + Send + StreamId let resp = match self.read_response(request_id).await { Ok(resp) => { if let Some(t) = timer.take() { - self.last_request_latency = Some(t.elapsed()); + let _ = self.last_request_latency_tx.send(Some(t.elapsed())); } event!(Level::TRACE, "Message received"); trace!( @@ -821,7 +822,6 @@ pub enum ClientRequest { request: BaseRequest, reply: oneshot::Sender, RpcStatus>>>, }, - GetLastRequestLatency(oneshot::Sender>), SendPing(oneshot::Sender>), } diff --git a/comms/src/protocol/rpc/client/tests.rs b/comms/src/protocol/rpc/client/tests.rs index 0dc4a4c595..d12faeb7d4 100644 --- a/comms/src/protocol/rpc/client/tests.rs +++ b/comms/src/protocol/rpc/client/tests.rs @@ -25,7 +25,7 @@ use crate::{ protocol::{ rpc::{ test::{ - greeting_service::{GreetingClient, GreetingServer, GreetingService}, + greeting_service::{GreetingClient, GreetingServer, GreetingService, SlowStreamRequest}, mock::create_mocked_rpc_context, }, NamedProtocolService, @@ -39,9 +39,11 @@ use crate::{ runtime::task, test_utils::mocks::{new_peer_connection_mock_pair, PeerConnectionMockState}, }; +use std::{env, time::Duration}; use tari_shutdown::Shutdown; use tari_test_utils::{async_assert_eventually, unpack_enum}; use tokio::sync::mpsc; +use tokio_stream::StreamExt; async fn setup(num_concurrent_sessions: usize) -> (PeerConnection, PeerConnectionMockState, Shutdown) { let (conn1, conn1_state, conn2, conn2_state) = new_peer_connection_mock_pair().await; @@ -171,3 +173,33 @@ mod lazy_pool { unpack_enum!(RpcClientPoolError::PeerConnectionDropped { .. } = err); } } + +mod last_request_latency { + use super::*; + + #[runtime::test] + async fn it_returns_the_latency_until_the_first_response() { + let (mut conn, _, _shutdown) = setup(1).await; + + let mut client = conn.connect_rpc::().await.unwrap(); + + let resp = client + .slow_stream(SlowStreamRequest { + num_items: 100, + item_size: 10, + delay_ms: 10, + }) + .await + .unwrap(); + + resp.collect::>().await.into_iter().for_each(|r| { + r.unwrap(); + }); + + let latency = client.get_last_request_latency().unwrap(); + // CI could be really slow, so to prevent flakiness exclude the assert + if env::var("CI").is_err() { + assert!(latency < Duration::from_millis(100)); + } + } +} diff --git a/comms/src/protocol/rpc/test/greeting_service.rs b/comms/src/protocol/rpc/test/greeting_service.rs index d974f433cc..00d3aad61e 100644 --- a/comms/src/protocol/rpc/test/greeting_service.rs +++ b/comms/src/protocol/rpc/test/greeting_service.rs @@ -447,8 +447,8 @@ impl GreetingClient { self.inner.server_streaming(request, 8).await } - pub async fn get_last_request_latency(&mut self) -> Result, RpcError> { - self.inner.get_last_request_latency().await + pub fn get_last_request_latency(&mut self) -> Option { + self.inner.get_last_request_latency() } pub async fn ping(&mut self) -> Result { diff --git a/comms/src/protocol/rpc/test/smoke.rs b/comms/src/protocol/rpc/test/smoke.rs index 8ca1bfce52..0fc1f05256 100644 --- a/comms/src/protocol/rpc/test/smoke.rs +++ b/comms/src/protocol/rpc/test/smoke.rs @@ -135,7 +135,7 @@ async fn request_response_errors_and_streaming() { .unwrap(); // Latency is available "for free" as part of the connect protocol - assert!(client.get_last_request_latency().await.unwrap().is_some()); + assert!(client.get_last_request_latency().is_some()); let resp = client .say_hello(SayHelloRequest {