diff --git a/comms/src/connection_manager/listener.rs b/comms/src/connection_manager/listener.rs index e192e9a66d..bd9a65eca2 100644 --- a/comms/src/connection_manager/listener.rs +++ b/comms/src/connection_manager/listener.rs @@ -30,7 +30,11 @@ use super::{ }; use crate::{ bounded_executor::BoundedExecutor, - connection_manager::{liveness::LivenessSession, types::OneshotTrigger, wire_mode::WireMode}, + connection_manager::{ + liveness::LivenessSession, + types::OneshotTrigger, + wire_mode::{WireMode, LIVENESS_WIRE_MODE}, + }, multiaddr::Multiaddr, multiplexing::Yamux, noise::NoiseConfig, @@ -56,6 +60,7 @@ use log::*; use std::{ convert::TryInto, future::Future, + io::{Error, ErrorKind}, sync::{ atomic::{AtomicUsize, Ordering}, Arc, @@ -168,22 +173,34 @@ where } } - async fn read_wire_format(socket: &mut TTransport::Output, time_to_first_byte: Duration) -> Option { + async fn read_wire_format( + socket: &mut TTransport::Output, + time_to_first_byte: Duration, + ) -> Result { let mut buf = [0u8; 1]; - match time::timeout(time_to_first_byte, socket.read_exact(&mut buf)) - .await - .ok()? - { - Ok(_) => match buf[0].try_into().ok() { - Some(wf) => Some(wf), - None => { - warn!(target: LOG_TARGET, "Invalid wire format byte '{}'", buf[0]); - None + match time::timeout(time_to_first_byte, socket.read_exact(&mut buf)).await { + Ok(result) => match result { + Ok(_) => match buf[0].try_into().ok() { + Some(wf) => Ok(wf), + None => { + warn!(target: LOG_TARGET, "Invalid wire format byte '{}'", buf[0]); + Err(ErrorKind::InvalidData.into()) + }, + }, + Err(err) => { + warn!( + target: LOG_TARGET, + "Failed to read wire format byte due to error: {}", err + ); + Err(err) }, }, - Err(err) => { - warn!(target: LOG_TARGET, "Failed to read first byte: {}", err); - None + Err(elapsed) => { + warn!( + target: LOG_TARGET, + "Failed to read wire format byte within timeout of {:#?}. {}", time_to_first_byte, elapsed + ); + Err(elapsed.into()) }, } } @@ -227,7 +244,7 @@ where let inbound_fut = async move { match Self::read_wire_format(&mut socket, config.time_to_first_byte).await { - Some(WireMode::Comms(byte)) if byte == config.network_info.network_byte => { + Ok(WireMode::Comms(byte)) if byte == config.network_info.network_byte => { let this_node_id_str = node_identity.node_id().short_str(); let result = Self::perform_socket_upgrade_procedure( node_identity, @@ -268,7 +285,7 @@ where }, } }, - Some(WireMode::Comms(byte)) => { + Ok(WireMode::Comms(byte)) => { warn!( target: LOG_TARGET, "Peer at address '{}' sent invalid wire format byte. Expected {:x?} got: {:x?} ", @@ -277,7 +294,7 @@ where byte, ); }, - Some(WireMode::Liveness) => { + Ok(WireMode::Liveness) => { if liveness_session_count.load(Ordering::SeqCst) > 0 && Self::is_address_in_liveness_cidr_range(&peer_addr, &config.liveness_cidr_allowlist) { @@ -295,10 +312,15 @@ where let _ = socket.close().await; } }, - None => { + Err(err) => { warn!( target: LOG_TARGET, - "Peer at address '{}' failed to send valid wire format", peer_addr + "Peer at address '{}' failed to send wire format. Expected network byte {:x?} or liveness \ + byte {:x?} not received. Error: {}", + peer_addr, + config.network_info.network_byte, + LIVENESS_WIRE_MODE, + err ); }, } diff --git a/comms/src/connection_manager/wire_mode.rs b/comms/src/connection_manager/wire_mode.rs index e2421c078b..d8b5cac5d0 100644 --- a/comms/src/connection_manager/wire_mode.rs +++ b/comms/src/connection_manager/wire_mode.rs @@ -22,7 +22,7 @@ use std::convert::TryFrom; -const LIVENESS_WIRE_MODE: u8 = 0x46; // E +pub(crate) const LIVENESS_WIRE_MODE: u8 = 0x46; // E pub enum WireMode { Comms(u8),