Skip to content

Commit

Permalink
fix: remove delay from last request latency call
Browse files Browse the repository at this point in the history
  • Loading branch information
sdbondi committed Nov 16, 2021
1 parent ccf1da0 commit eb8b815
Show file tree
Hide file tree
Showing 12 changed files with 66 additions and 46 deletions.
4 changes: 0 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> {
remote_num_kernels - local_num_kernels,
);

let latency = client.get_last_request_latency().await?;
let latency = client.get_last_request_latency();
debug!(
target: LOG_TARGET,
"Initiating kernel sync with peer `{}` (latency = {}ms)",
Expand Down Expand Up @@ -287,7 +287,7 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> {
let end = remote_num_outputs;
let end_hash = to_header.hash();

let latency = client.get_last_request_latency().await?;
let latency = client.get_last_request_latency();
debug!(
target: LOG_TARGET,
"Initiating output sync with peer `{}` (latency = {}ms)",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ impl<'a, B: BlockchainBackend + 'static> HeaderSynchronizer<'a, B> {
mut conn: PeerConnection,
) -> Result<(), BlockHeaderSyncError> {
let mut client = conn.connect_rpc::<rpc::BaseNodeSyncRpcClient>().await?;
let latency = client.get_last_request_latency().await?;
let latency = client.get_last_request_latency();
debug!(
target: LOG_TARGET,
"Initiating header sync with peer `{}` (sync latency = {}ms)",
Expand Down
2 changes: 1 addition & 1 deletion base_layer/wallet/src/base_node_service/monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ where
timer.elapsed().as_millis()
);

let latency = match client.get_last_request_latency().await? {
let latency = match client.get_last_request_latency() {
Some(latency) => latency,
None => continue,
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ where TBackend: WalletBackend + 'static
.connect_rpc_using_builder(BaseNodeSyncRpcClient::builder().with_deadline(Duration::from_secs(60)))
.await?;

let latency = client.get_last_request_latency().await?;
let latency = client.get_last_request_latency();
self.publish_event(UtxoScannerEvent::ConnectedToBaseNode(
peer.clone(),
latency.unwrap_or_default(),
Expand Down
8 changes: 3 additions & 5 deletions base_layer/wallet/tests/transaction_service/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -944,7 +944,7 @@ fn test_htlc_send_and_claim() {
let bob_connection = run_migration_and_create_sqlite_connection(&bob_db_path, 16).unwrap();

let shutdown = Shutdown::new();
let (alice_ts, alice_oms, _alice_comms, mut alice_connectivity) = setup_transaction_service(
let (mut alice_ts, mut alice_oms, _alice_comms, mut alice_connectivity) = setup_transaction_service(
&mut runtime,
alice_node_identity,
vec![],
Expand Down Expand Up @@ -998,18 +998,16 @@ fn test_htlc_send_and_claim() {
.expect("Alice sending HTLC transaction")
});

let mut alice_ts_clone2 = alice_ts.clone();
let mut alice_oms_clone = alice_oms.clone();
runtime.block_on(async move {
let completed_tx = alice_ts_clone2
let completed_tx = alice_ts
.get_completed_transaction(tx_id)
.await
.expect("Could not find completed HTLC tx");

let fees = completed_tx.fee;

assert_eq!(
alice_oms_clone.get_balance().await.unwrap().pending_incoming_balance,
alice_oms.get_balance().await.unwrap().pending_incoming_balance,
initial_wallet_value - value - fees
);
});
Expand Down
6 changes: 0 additions & 6 deletions common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,6 @@ sha2 = "0.9.5"
path-clean = "0.1.0"
tari_storage = { version = "^0.21", path = "../infrastructure/storage"}
tracing = "0.1.26"
tracing-opentelemetry = "0.15.0"
tracing-subscriber = "0.2.20"

# network tracing, rt-tokio for async batch export
opentelemetry = { version = "0.16", default-features = false, features = ["trace","rt-tokio"] }
opentelemetry-jaeger = { version="0.15", features=["rt-tokio"]}

anyhow = { version = "1.0", optional = true }
git2 = { version = "0.8", optional = true }
Expand Down
4 changes: 2 additions & 2 deletions comms/rpc_macros/src/generator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,8 +208,8 @@ impl RpcCodeGenerator {

#client_methods

pub async fn get_last_request_latency(&mut self) -> Result<Option<std::time::Duration>, #dep_mod::RpcError> {
self.inner.get_last_request_latency().await
pub fn get_last_request_latency(&mut self) -> Option<std::time::Duration> {
self.inner.get_last_request_latency()
}

pub async fn ping(&mut self) -> Result<std::time::Duration, #dep_mod::RpcError> {
Expand Down
40 changes: 20 additions & 20 deletions comms/src/protocol/rpc/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ use std::{
use tari_shutdown::{Shutdown, ShutdownSignal};
use tokio::{
io::{AsyncRead, AsyncWrite},
sync::{mpsc, oneshot, Mutex},
sync::{mpsc, oneshot, watch, Mutex},
time,
};
use tower::{Service, ServiceExt};
Expand Down Expand Up @@ -105,7 +105,8 @@ impl RpcClient {
let (request_tx, request_rx) = mpsc::channel(1);
let shutdown = Shutdown::new();
let shutdown_signal = shutdown.to_signal();
let connector = ClientConnector::new(request_tx, shutdown);
let (last_request_latency_tx, last_request_latency_rx) = watch::channel(None);
let connector = ClientConnector::new(request_tx, last_request_latency_rx, shutdown);
let (ready_tx, ready_rx) = oneshot::channel();
let tracing_id = tracing::Span::current().id();
task::spawn({
Expand All @@ -116,6 +117,7 @@ impl RpcClient {
config,
node_id,
request_rx,
last_request_latency_tx,
framed,
ready_tx,
protocol_name,
Expand Down Expand Up @@ -172,7 +174,7 @@ impl RpcClient {
}

/// Return the latency of the last request
pub fn get_last_request_latency(&mut self) -> impl Future<Output = Result<Option<Duration>, RpcError>> + '_ {
pub fn get_last_request_latency(&mut self) -> Option<Duration> {
self.connector.get_last_request_latency()
}

Expand Down Expand Up @@ -315,13 +317,19 @@ impl Default for RpcClientConfig {
#[derive(Clone)]
pub struct ClientConnector {
inner: mpsc::Sender<ClientRequest>,
last_request_latency_rx: watch::Receiver<Option<Duration>>,
shutdown: Arc<Mutex<Shutdown>>,
}

impl ClientConnector {
pub(self) fn new(sender: mpsc::Sender<ClientRequest>, shutdown: Shutdown) -> Self {
pub(self) fn new(
sender: mpsc::Sender<ClientRequest>,
last_request_latency_rx: watch::Receiver<Option<Duration>>,
shutdown: Shutdown,
) -> Self {
Self {
inner: sender,
last_request_latency_rx,
shutdown: Arc::new(Mutex::new(shutdown)),
}
}
Expand All @@ -331,14 +339,8 @@ impl ClientConnector {
lock.trigger();
}

pub async fn get_last_request_latency(&mut self) -> Result<Option<Duration>, RpcError> {
let (reply, reply_rx) = oneshot::channel();
self.inner
.send(ClientRequest::GetLastRequestLatency(reply))
.await
.map_err(|_| RpcError::ClientClosed)?;

reply_rx.await.map_err(|_| RpcError::RequestCancelled)
pub fn get_last_request_latency(&mut self) -> Option<Duration> {
*self.last_request_latency_rx.borrow()
}

pub async fn send_ping(&mut self) -> Result<Duration, RpcError> {
Expand Down Expand Up @@ -391,23 +393,25 @@ struct RpcClientWorker<TSubstream> {
config: RpcClientConfig,
node_id: NodeId,
request_rx: mpsc::Receiver<ClientRequest>,
last_request_latency_tx: watch::Sender<Option<Duration>>,
framed: CanonicalFraming<TSubstream>,
// Request ids are limited to u16::MAX because varint encoding is used over the wire and the magnitude of the value
// sent determines the byte size. A u16 will be more than enough for the purpose
next_request_id: u16,
ready_tx: Option<oneshot::Sender<Result<(), RpcError>>>,
last_request_latency: Option<Duration>,
protocol_id: ProtocolId,
shutdown_signal: ShutdownSignal,
}

impl<TSubstream> RpcClientWorker<TSubstream>
where TSubstream: AsyncRead + AsyncWrite + Unpin + Send + StreamId
{
#[allow(clippy::too_many_arguments)]
pub(self) fn new(
config: RpcClientConfig,
node_id: NodeId,
request_rx: mpsc::Receiver<ClientRequest>,
last_request_latency_tx: watch::Sender<Option<Duration>>,
framed: CanonicalFraming<TSubstream>,
ready_tx: oneshot::Sender<Result<(), RpcError>>,
protocol_id: ProtocolId,
Expand All @@ -420,7 +424,7 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin + Send + StreamId
framed,
next_request_id: 0,
ready_tx: Some(ready_tx),
last_request_latency: None,
last_request_latency_tx,
protocol_id,
shutdown_signal,
}
Expand Down Expand Up @@ -454,7 +458,7 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin + Send + StreamId
self.protocol_name(),
latency
);
self.last_request_latency = Some(latency);
let _ = self.last_request_latency_tx.send(Some(latency));
if let Some(r) = self.ready_tx.take() {
let _ = r.send(Ok(()));
}
Expand Down Expand Up @@ -514,9 +518,6 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin + Send + StreamId
SendRequest { request, reply } => {
self.do_request_response(request, reply).await?;
},
GetLastRequestLatency(reply) => {
let _ = reply.send(self.last_request_latency);
},
SendPing(reply) => {
self.do_ping_pong(reply).await?;
},
Expand Down Expand Up @@ -647,7 +648,7 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin + Send + StreamId
let resp = match self.read_response(request_id).await {
Ok(resp) => {
if let Some(t) = timer.take() {
self.last_request_latency = Some(t.elapsed());
let _ = self.last_request_latency_tx.send(Some(t.elapsed()));
}
event!(Level::TRACE, "Message received");
trace!(
Expand Down Expand Up @@ -821,7 +822,6 @@ pub enum ClientRequest {
request: BaseRequest<Bytes>,
reply: oneshot::Sender<mpsc::Receiver<Result<Response<Bytes>, RpcStatus>>>,
},
GetLastRequestLatency(oneshot::Sender<Option<Duration>>),
SendPing(oneshot::Sender<Result<Duration, RpcStatus>>),
}

Expand Down
34 changes: 33 additions & 1 deletion comms/src/protocol/rpc/client/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use crate::{
protocol::{
rpc::{
test::{
greeting_service::{GreetingClient, GreetingServer, GreetingService},
greeting_service::{GreetingClient, GreetingServer, GreetingService, SlowStreamRequest},
mock::create_mocked_rpc_context,
},
NamedProtocolService,
Expand All @@ -39,9 +39,11 @@ use crate::{
runtime::task,
test_utils::mocks::{new_peer_connection_mock_pair, PeerConnectionMockState},
};
use std::{env, time::Duration};
use tari_shutdown::Shutdown;
use tari_test_utils::{async_assert_eventually, unpack_enum};
use tokio::sync::mpsc;
use tokio_stream::StreamExt;

async fn setup(num_concurrent_sessions: usize) -> (PeerConnection, PeerConnectionMockState, Shutdown) {
let (conn1, conn1_state, conn2, conn2_state) = new_peer_connection_mock_pair().await;
Expand Down Expand Up @@ -171,3 +173,33 @@ mod lazy_pool {
unpack_enum!(RpcClientPoolError::PeerConnectionDropped { .. } = err);
}
}

mod last_request_latency {
use super::*;

#[runtime::test]
async fn it_returns_the_latency_until_the_first_response() {
let (mut conn, _, _shutdown) = setup(1).await;

let mut client = conn.connect_rpc::<GreetingClient>().await.unwrap();

let resp = client
.slow_stream(SlowStreamRequest {
num_items: 100,
item_size: 10,
delay_ms: 10,
})
.await
.unwrap();

resp.collect::<Vec<_>>().await.into_iter().for_each(|r| {
r.unwrap();
});

let latency = client.get_last_request_latency().unwrap();
// CI could be really slow, so to prevent flakiness exclude the assert
if env::var("CI").is_err() {
assert!(latency < Duration::from_millis(100));
}
}
}
4 changes: 2 additions & 2 deletions comms/src/protocol/rpc/test/greeting_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -447,8 +447,8 @@ impl GreetingClient {
self.inner.server_streaming(request, 8).await
}

pub async fn get_last_request_latency(&mut self) -> Result<Option<Duration>, RpcError> {
self.inner.get_last_request_latency().await
pub fn get_last_request_latency(&mut self) -> Option<Duration> {
self.inner.get_last_request_latency()
}

pub async fn ping(&mut self) -> Result<Duration, RpcError> {
Expand Down
2 changes: 1 addition & 1 deletion comms/src/protocol/rpc/test/smoke.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ async fn request_response_errors_and_streaming() {
.unwrap();

// Latency is available "for free" as part of the connect protocol
assert!(client.get_last_request_latency().await.unwrap().is_some());
assert!(client.get_last_request_latency().is_some());

let resp = client
.say_hello(SayHelloRequest {
Expand Down

0 comments on commit eb8b815

Please sign in to comment.