Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf(comms)!: optimise connection establishment #3658

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 0 additions & 61 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion applications/tari_base_node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ tracing-subscriber = "0.2.20"

# Metrics
tari_metrics = { path = "../../infrastructure/metrics", optional = true }
warp = { version = "0.3.1", optional = true }
warp = { version = "0.3.1", optional = true, default-features = false }
reqwest = { version = "0.11.4", default-features = false, optional = true }

[features]
Expand Down
4 changes: 2 additions & 2 deletions base_layer/p2p/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ pub use tari_common::configuration::Network;
pub const DEFAULT_DNS_NAME_SERVER: &str = "1.1.1.1:853/cloudflare-dns.com";

/// Major network version. Peers will refuse connections if this value differs
pub const MAJOR_NETWORK_VERSION: u32 = 0;
pub const MAJOR_NETWORK_VERSION: u8 = 0;
/// Minor network version. This should change with each time the network protocol has changed in a backward-compatible
/// way.
pub const MINOR_NETWORK_VERSION: u32 = 0;
pub const MINOR_NETWORK_VERSION: u8 = 0;
14 changes: 13 additions & 1 deletion comms/dht/tests/dht.rs
Original file line number Diff line number Diff line change
Expand Up @@ -550,8 +550,11 @@ async fn dht_propagate_dedup() {

let mut node_A_messaging = node_A.messaging_events.subscribe();
let mut node_B_messaging = node_B.messaging_events.subscribe();
let mut node_B_messaging2 = node_B.messaging_events.subscribe();
let mut node_C_messaging = node_C.messaging_events.subscribe();
let mut node_C_messaging2 = node_C.messaging_events.subscribe();
let mut node_D_messaging = node_D.messaging_events.subscribe();
let mut node_D_messaging2 = node_D.messaging_events.subscribe();

#[derive(Clone, PartialEq, ::prost::Message)]
struct Person {
Expand Down Expand Up @@ -596,6 +599,11 @@ async fn dht_propagate_dedup() {
let node_C_id = node_C.node_identity().node_id().clone();
let node_D_id = node_D.node_identity().node_id().clone();

// Ensure that the message has propagated before disconnecting everyone
let _ = node_B_messaging2.recv().await.unwrap();
let _ = node_C_messaging2.recv().await.unwrap();
let _ = node_D_messaging2.recv().await.unwrap();

node_A.shutdown().await;
node_B.shutdown().await;
node_C.shutdown().await;
Expand All @@ -611,7 +619,11 @@ async fn dht_propagate_dedup() {
let received = filter_received(collect_try_recv!(node_B_messaging, timeout = Duration::from_secs(20)));
let recv_count = count_messages_received(&received, &[&node_A_id, &node_C_id]);
// Expected race condition: If A->B->C before A->C then C->B does not happen
assert!((1..=2).contains(&recv_count));
assert!(
(1..=2).contains(&recv_count),
"expected recv_count to be in [1-2] but was {}",
recv_count
);

let received = filter_received(collect_try_recv!(node_C_messaging, timeout = Duration::from_secs(20)));
let recv_count = count_messages_received(&received, &[&node_A_id, &node_B_id]);
Expand Down
2 changes: 1 addition & 1 deletion comms/src/builder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ impl CommsBuilder {
}

/// Set a network major and minor version as per [RFC-173 Versioning](https://rfc.tari.com/RFC-0173_Versioning.html)
pub fn with_node_version(mut self, major_version: u32, minor_version: u32) -> Self {
pub fn with_node_version(mut self, major_version: u8, minor_version: u8) -> Self {
self.connection_manager_config.network_info.major_version = major_version;
self.connection_manager_config.network_info.minor_version = minor_version;
self
Expand Down
25 changes: 9 additions & 16 deletions comms/src/connection_manager/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,13 @@

use std::{convert::TryFrom, net::Ipv6Addr};

use futures::StreamExt;
use log::*;
use tokio::io::{AsyncRead, AsyncWrite};

use super::types::ConnectionDirection;
use crate::{
connection_manager::error::ConnectionManagerError,
multiaddr::{Multiaddr, Protocol},
multiplexing::Yamux,
peer_manager::{NodeId, NodeIdentity, Peer, PeerFeatures, PeerFlags},
proto::identity::PeerIdentityMsg,
protocol,
Expand All @@ -43,30 +42,24 @@ const LOG_TARGET: &str = "comms::connection_manager::common";
/// The maximum size of the peer's user agent string. If the peer sends a longer string it is truncated.
const MAX_USER_AGENT_LEN: usize = 100;

pub async fn perform_identity_exchange<'p, P: IntoIterator<Item = &'p ProtocolId>>(
muxer: &mut Yamux,
pub async fn perform_identity_exchange<
'p,
P: IntoIterator<Item = &'p ProtocolId>,
TSocket: AsyncRead + AsyncWrite + Unpin,
>(
socket: &mut TSocket,
node_identity: &NodeIdentity,
direction: ConnectionDirection,
our_supported_protocols: P,
network_info: NodeNetworkInfo,
) -> Result<PeerIdentityMsg, ConnectionManagerError> {
let mut control = muxer.get_yamux_control();
let stream = match direction {
ConnectionDirection::Inbound => muxer
.incoming_mut()
.next()
.await
.ok_or(ConnectionManagerError::IncomingListenerStreamClosed)?,
ConnectionDirection::Outbound => control.open_stream().await?,
};

debug!(
target: LOG_TARGET,
"{} substream opened to peer. Performing identity exchange.", direction
"{} socket opened to peer. Performing identity exchange.", direction
);

let peer_identity =
protocol::identity_exchange(node_identity, direction, our_supported_protocols, network_info, stream).await?;
protocol::identity_exchange(node_identity, direction, our_supported_protocols, network_info, socket).await?;

Ok(peer_identity)
}
Expand Down
30 changes: 15 additions & 15 deletions comms/src/connection_manager/dialer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,7 @@ where
async fn perform_socket_upgrade_procedure(
peer_manager: Arc<PeerManager>,
node_identity: Arc<NodeIdentity>,
socket: NoiseSocket<TTransport::Output>,
mut socket: NoiseSocket<TTransport::Output>,
dialed_addr: Multiaddr,
authenticated_public_key: CommsPublicKey,
conn_man_notifier: mpsc::Sender<ConnectionManagerEvent>,
Expand All @@ -361,43 +361,36 @@ where
cancel_signal: ShutdownSignal,
) -> Result<PeerConnection, ConnectionManagerError> {
static CONNECTION_DIRECTION: ConnectionDirection = ConnectionDirection::Outbound;
let mut muxer = Yamux::upgrade_connection(socket, CONNECTION_DIRECTION)
.await
.map_err(|err| ConnectionManagerError::YamuxUpgradeFailure(err.to_string()))?;

debug!(
target: LOG_TARGET,
"Starting peer identity exchange for peer with public key '{}'", authenticated_public_key
);
if cancel_signal.is_terminated() {
return Err(ConnectionManagerError::DialCancelled);
}

// Check if we know the peer and if it is banned
let known_peer = common::find_unbanned_peer(&peer_manager, &authenticated_public_key).await?;

let peer_identity = common::perform_identity_exchange(
&mut muxer,
&mut socket,
&node_identity,
CONNECTION_DIRECTION,
&our_supported_protocols,
config.network_info.clone(),
)
.await?;

if cancel_signal.is_terminated() {
muxer.get_yamux_control().close().await?;
return Err(ConnectionManagerError::DialCancelled);
}

let features = PeerFeatures::from_bits_truncate(peer_identity.features);
trace!(
debug!(
target: LOG_TARGET,
"Peer identity exchange succeeded on Outbound connection for peer '{}' (Features = {:?})",
authenticated_public_key,
features
);
trace!(target: LOG_TARGET, "{:?}", peer_identity);

// Check if we know the peer and if it is banned
let known_peer = common::find_unbanned_peer(&peer_manager, &authenticated_public_key).await?;

let (peer_node_id, their_supported_protocols) = common::validate_and_add_peer_from_peer_identity(
&peer_manager,
known_peer,
Expand All @@ -409,7 +402,6 @@ where
.await?;

if cancel_signal.is_terminated() {
muxer.get_yamux_control().close().await?;
return Err(ConnectionManagerError::DialCancelled);
}

Expand All @@ -420,6 +412,14 @@ where
peer_node_id.short_str()
);

let muxer = Yamux::upgrade_connection(socket, CONNECTION_DIRECTION)
.map_err(|err| ConnectionManagerError::YamuxUpgradeFailure(err.to_string()))?;

if cancel_signal.is_terminated() {
muxer.get_yamux_control().close().await?;
return Err(ConnectionManagerError::DialCancelled);
}

peer_connection::create(
muxer,
dialed_addr,
Expand Down
26 changes: 16 additions & 10 deletions comms/src/connection_manager/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use std::{
atomic::{AtomicUsize, Ordering},
Arc,
},
time::Duration,
time::{Duration, Instant},
};

use futures::{future, FutureExt};
Expand Down Expand Up @@ -383,7 +383,8 @@ where
"Starting noise protocol upgrade for peer at address '{}'", peer_addr
);

let noise_socket = time::timeout(
let timer = Instant::now();
let mut noise_socket = time::timeout(
Duration::from_secs(30),
noise_config.upgrade_socket(socket, CONNECTION_DIRECTION),
)
Expand All @@ -394,21 +395,23 @@ where
.get_remote_public_key()
.ok_or(ConnectionManagerError::InvalidStaticPublicKey)?;

debug!(
target: LOG_TARGET,
"Noise socket upgrade completed in {:.2?} with public key '{}'",
timer.elapsed(),
authenticated_public_key
);

// Check if we know the peer and if it is banned
let known_peer = common::find_unbanned_peer(&peer_manager, &authenticated_public_key).await?;

let mut muxer = Yamux::upgrade_connection(noise_socket, CONNECTION_DIRECTION)
.await
.map_err(|err| ConnectionManagerError::YamuxUpgradeFailure(err.to_string()))?;

trace!(
debug!(
target: LOG_TARGET,
"Starting peer identity exchange for peer with public key '{}'",
authenticated_public_key
"Starting peer identity exchange for peer with public key '{}'", authenticated_public_key
);

let peer_identity = common::perform_identity_exchange(
&mut muxer,
&mut noise_socket,
&node_identity,
CONNECTION_DIRECTION,
&our_supported_protocols,
Expand Down Expand Up @@ -442,6 +445,9 @@ where
peer_node_id.short_str()
);

let muxer = Yamux::upgrade_connection(noise_socket, CONNECTION_DIRECTION)
.map_err(|err| ConnectionManagerError::YamuxUpgradeFailure(err.to_string()))?;

peer_connection::create(
muxer,
peer_addr,
Expand Down
6 changes: 3 additions & 3 deletions comms/src/connection_manager/tests/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ use crate::{
},
noise::NoiseConfig,
peer_manager::{NodeId, Peer, PeerFeatures, PeerFlags, PeerManagerError},
protocol::{ProtocolEvent, ProtocolId, Protocols, IDENTITY_PROTOCOL},
protocol::{ProtocolEvent, ProtocolId, Protocols},
runtime,
runtime::task,
test_utils::{
Expand Down Expand Up @@ -156,15 +156,15 @@ async fn dial_success() {
let mut conn_out = conn_man1.dial_peer(node_identity2.node_id().clone()).await.unwrap();
assert_eq!(conn_out.peer_node_id(), node_identity2.node_id());
let peer2 = peer_manager1.find_by_node_id(conn_out.peer_node_id()).await.unwrap();
assert_eq!(peer2.supported_protocols, [&IDENTITY_PROTOCOL, &TEST_PROTO]);
assert_eq!(peer2.supported_protocols, [&TEST_PROTO]);
assert_eq!(peer2.user_agent, "node2");

let event = subscription2.recv().await.unwrap();
unpack_enum!(ConnectionManagerEvent::PeerConnected(conn_in) = &*event);
assert_eq!(conn_in.peer_node_id(), node_identity1.node_id());

let peer1 = peer_manager2.find_by_node_id(node_identity1.node_id()).await.unwrap();
assert_eq!(peer1.supported_protocols(), [&IDENTITY_PROTOCOL, &TEST_PROTO]);
assert_eq!(peer1.supported_protocols(), [&TEST_PROTO]);
assert_eq!(peer1.user_agent, "node1");

let err = conn_out
Expand Down
Loading