Skip to content

Commit

Permalink
fix: add timeout to protocol notifications + log improvements
Browse files Browse the repository at this point in the history
- protocol notifications now have a set "safety" timeout.
- add log for inbound comms pipeline concurrency usage
  • Loading branch information
sdbondi committed Aug 2, 2021
1 parent 0e0bfe0 commit a98a698
Show file tree
Hide file tree
Showing 9 changed files with 58 additions and 51 deletions.
2 changes: 1 addition & 1 deletion comms/dht/src/store_forward/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ where S: Service<DecryptedDhtMessage, Response = (), Error = PipelineError> + Se
}

if message.dht_header.message_type.is_saf_message() {
log_not_eligible("it is a SAF message");
log_not_eligible("it is a SAF protocol message");
return Ok(None);
}

Expand Down
10 changes: 9 additions & 1 deletion comms/src/bounded_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,15 @@ pub struct TrySpawnError;
pub struct BoundedExecutor {
inner: runtime::Handle,
semaphore: Arc<Semaphore>,
max_available: usize,
}

impl BoundedExecutor {
pub fn new(executor: runtime::Handle, num_permits: usize) -> Self {
Self {
inner: executor,
semaphore: Arc::new(Semaphore::new(num_permits)),
max_available: num_permits,
}
}

Expand All @@ -70,12 +72,18 @@ impl BoundedExecutor {
self.num_available() > 0
}

/// Returns the number tasks that can be spawned on this executor without blocking.
/// Returns the remaining number of tasks that can be spawned on this executor without waiting.
#[inline]
pub fn num_available(&self) -> usize {
self.semaphore.available_permits()
}

/// Returns the maximum number of concurrent tasks that can be spawned on this executor without waiting.
#[inline]
pub fn max_available(&self) -> usize {
self.max_available
}

pub fn try_spawn<F>(&self, future: F) -> Result<JoinHandle<F::Output>, TrySpawnError>
where
F: Future + Send + 'static,
Expand Down
26 changes: 18 additions & 8 deletions comms/src/connection_manager/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -392,15 +392,25 @@ where
node_id.short_str(),
proto_str
);
if let Err(err) = self
let notify_fut = self
.protocols
.notify(&protocol, ProtocolEvent::NewInboundSubstream(*node_id, stream))
.await
{
error!(
target: LOG_TARGET,
"Error sending NewSubstream notification for protocol '{}' because '{:?}'", proto_str, err
);
.notify(&protocol, ProtocolEvent::NewInboundSubstream(*node_id, stream));
match time::timeout(Duration::from_secs(10), notify_fut).await {
Ok(Err(err)) => {
error!(
target: LOG_TARGET,
"Error sending NewSubstream notification for protocol '{}' because '{:?}'", proto_str, err
);
},
Err(err) => {
error!(
target: LOG_TARGET,
"Error sending NewSubstream notification for protocol '{}' because {}", proto_str, err
);
},
_ => {
debug!(target: LOG_TARGET, "Protocol notification for '{}' sent", proto_str);
},
}
},

Expand Down
13 changes: 13 additions & 0 deletions comms/src/pipeline/inbound.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,18 @@ where
return;
}
let service = self.service.clone();

let num_available = self.executor.num_available();
let max_available = self.executor.max_available();
// Only emit this message if there is any concurrent usage
if num_available < max_available {
debug!(
target: LOG_TARGET,
"Inbound pipeline usage: {}/{}",
max_available - num_available,
max_available
);
}
// Call the service in it's own spawned task
self.executor
.spawn(async move {
Expand All @@ -80,6 +92,7 @@ where
})
.await;
}
info!(target: LOG_TARGET, "Inbound pipeline terminated: the stream completed");
}
}

Expand Down
33 changes: 0 additions & 33 deletions comms/src/protocol/messaging/consts.rs

This file was deleted.

20 changes: 16 additions & 4 deletions comms/src/protocol/messaging/extension.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use crate::{
message::InboundMessage,
pipeline,
protocol::{
messaging::{consts, protocol::MESSAGING_PROTOCOL, MessagingEventSender},
messaging::{protocol::MESSAGING_PROTOCOL, MessagingEventSender},
ProtocolExtension,
ProtocolExtensionContext,
ProtocolExtensionError,
Expand All @@ -38,6 +38,18 @@ use futures::channel::mpsc;
use std::fmt;
use tower::Service;

/// Buffer size for inbound messages from _all_ peers. This should be large enough to buffer quite a few incoming
/// messages before creating backpressure on peers speaking the messaging protocol.
pub const INBOUND_MESSAGE_BUFFER_SIZE: usize = 100;
/// Buffer size notifications that a peer wants to speak /tari/messaging. This buffer is used for all peers, but a low
/// value is ok because this events happen once (or less) per connecting peer. For e.g. a value of 10 would allow 10
/// peers to concurrently request to speak /tari/messaging.
pub const MESSAGING_PROTOCOL_EVENTS_BUFFER_SIZE: usize = 30;

/// Buffer size for requests to the messaging protocol. All outbound messages will be sent along this channel. Some
/// buffering may be required if the node needs to send many messages out at the same time.
pub const MESSAGING_REQUEST_BUFFER_SIZE: usize = 50;

pub struct MessagingProtocolExtension<TInPipe, TOutPipe, TOutReq> {
event_tx: MessagingEventSender,
pipeline: pipeline::Config<TInPipe, TOutPipe, TOutReq>,
Expand All @@ -60,11 +72,11 @@ where
TOutReq: Send + 'static,
{
fn install(self: Box<Self>, context: &mut ProtocolExtensionContext) -> Result<(), ProtocolExtensionError> {
let (proto_tx, proto_rx) = mpsc::channel(consts::MESSAGING_PROTOCOL_EVENTS_BUFFER_SIZE);
let (proto_tx, proto_rx) = mpsc::channel(MESSAGING_PROTOCOL_EVENTS_BUFFER_SIZE);
context.add_protocol(&[MESSAGING_PROTOCOL.clone()], proto_tx);

let (messaging_request_tx, messaging_request_rx) = mpsc::channel(consts::MESSAGING_REQUEST_BUFFER_SIZE);
let (inbound_message_tx, inbound_message_rx) = mpsc::channel(consts::INBOUND_MESSAGE_BUFFER_SIZE);
let (messaging_request_tx, messaging_request_rx) = mpsc::channel(MESSAGING_REQUEST_BUFFER_SIZE);
let (inbound_message_tx, inbound_message_rx) = mpsc::channel(INBOUND_MESSAGE_BUFFER_SIZE);

let messaging = MessagingProtocol::new(
Default::default(),
Expand Down
2 changes: 0 additions & 2 deletions comms/src/protocol/messaging/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@ pub use config::MessagingConfig;
mod extension;
pub use extension::MessagingProtocolExtension;

mod consts;

mod error;
mod inbound;
mod outbound;
Expand Down
2 changes: 1 addition & 1 deletion comms/src/protocol/rpc/server/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ where
<B::Service as Service<Request<Bytes>>>::Future: Send + 'static,
{
fn install(self: Box<Self>, context: &mut ProtocolExtensionContext) -> Result<(), ProtocolExtensionError> {
let (proto_notif_tx, proto_notif_rx) = mpsc::channel(10);
let (proto_notif_tx, proto_notif_rx) = mpsc::channel(20);
context.add_protocol(&self.protocol_names, proto_notif_tx);
let rpc_context = RpcCommsBackend::new(context.peer_manager(), context.connectivity());
task::spawn(self.serve(proto_notif_rx, rpc_context));
Expand Down
1 change: 0 additions & 1 deletion integration_tests/features/WalletBaseNodeSwitch.feature
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
Feature: Wallet Base Node Switch

@doit
Scenario: As a user I want to change base node for a wallet
Given I have a base node Node1 connected to all seed nodes
And I have a base node Node2 connected to all seed nodes
Expand Down

0 comments on commit a98a698

Please sign in to comment.