Skip to content

Commit

Permalink
Some improvements of peers_exchange.
Browse files Browse the repository at this point in the history
  • Loading branch information
artemii235 committed Dec 11, 2020
1 parent 2fb3b07 commit ad076e1
Show file tree
Hide file tree
Showing 2 changed files with 119 additions and 8 deletions.
7 changes: 5 additions & 2 deletions mm2src/mm2_libp2p/src/atomicdex_behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -519,9 +519,12 @@ fn announce_my_addresses(swarm: &mut AtomicDexSwarm) {
}
false
})
.take(1)
.cloned()
.collect();
swarm.announce_listeners(global_listeners);
if !global_listeners.is_empty() {
swarm.announce_listeners(global_listeners);
}
}

/// Creates and spawns new AdexBehaviour Swarm returning:
Expand Down Expand Up @@ -610,7 +613,7 @@ pub fn start_gossipsub(
gossipsub,
floodsub,
request_response,
peers_exchange: PeersExchange::new(),
peers_exchange: PeersExchange::new(port),
ping,
};
libp2p::swarm::SwarmBuilder::new(transport, adex_behavior, local_peer_id.clone())
Expand Down
120 changes: 114 additions & 6 deletions mm2src/mm2_libp2p/src/peers_exchange.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::request_response::Codec;
use futures::StreamExt;
use libp2p::swarm::NetworkBehaviour;
use libp2p::{multiaddr::Multiaddr,
use libp2p::{multiaddr::{Multiaddr, Protocol},
request_response::{handler::RequestProtocol, ProtocolName, ProtocolSupport, RequestResponse,
RequestResponseConfig, RequestResponseEvent, RequestResponseMessage},
swarm::{NetworkBehaviourAction, NetworkBehaviourEventProcess, PollParameters},
Expand Down Expand Up @@ -33,6 +33,7 @@ impl ProtocolName for PeersExchangeProtocol {

type PeersExchangeCodec = Codec<PeersExchangeProtocol, PeersExchangeRequest, PeersExchangeResponse>;

const DEFAULT_PEERS_NUM: usize = 20;
const REQUEST_PEERS_INITIAL_DELAY: u64 = 60;
const REQUEST_PEERS_INTERVAL: u64 = 300;
const MAX_PEERS: usize = 100;
Expand Down Expand Up @@ -79,11 +80,13 @@ pub struct PeersExchange {
events: VecDeque<NetworkBehaviourAction<RequestProtocol<PeersExchangeCodec>, ()>>,
#[behaviour(ignore)]
maintain_peers_interval: Interval,
#[behaviour(ignore)]
netid_port: u16,
}

#[allow(clippy::new_without_default)]
impl PeersExchange {
pub fn new() -> Self {
pub fn new(netid_port: u16) -> Self {
let codec = Codec::default();
let protocol = iter::once((PeersExchangeProtocol::Version1, ProtocolSupport::Full));
let config = RequestResponseConfig::default();
Expand All @@ -96,6 +99,7 @@ impl PeersExchange {
Instant::now() + Duration::from_secs(REQUEST_PEERS_INITIAL_DELAY),
Duration::from_secs(REQUEST_PEERS_INTERVAL),
),
netid_port,
}
}

Expand All @@ -110,7 +114,6 @@ impl PeersExchange {
result
}

#[allow(unused)]
fn forget_peer(&mut self, peer: &PeerId) {
self.known_peers.retain(|known_peer| known_peer != peer);
self.forget_peer_addresses(peer);
Expand All @@ -123,6 +126,15 @@ impl PeersExchange {
}

pub fn add_peer_addresses(&mut self, peer: &PeerId, addresses: PeerAddresses) {
if addresses.len() > 1 {
return;
}

for address in addresses.iter() {
if !self.validate_global_multiaddr(address) {
return;
}
}
if !self.known_peers.contains(&peer) && !addresses.is_empty() {
self.known_peers.push(peer.clone());
}
Expand All @@ -148,7 +160,6 @@ impl PeersExchange {
}

fn request_known_peers_from_random_peer(&mut self) {
const DEFAULT_PEERS_NUM: usize = 20;
let mut rng = thread_rng();
if let Some(from_peer) = self.known_peers.choose(&mut rng) {
info!("Try to request {} peers from peer {}", DEFAULT_PEERS_NUM, from_peer);
Expand Down Expand Up @@ -180,6 +191,61 @@ impl PeersExchange {
}
}

fn validate_global_multiaddr(&self, address: &Multiaddr) -> bool {
let mut components = address.iter();
match components.next() {
Some(maybe_ip_v4) => match maybe_ip_v4 {
Protocol::Ip4(addr) => {
if !addr.is_global() {
return false;
}
},
_ => return false,
},
None => return false,
}

match components.next() {
Some(maybe_ip_v4) => match maybe_ip_v4 {
Protocol::Tcp(port) => {
if port != self.netid_port {
return false;
}
},
_ => return false,
},
None => return false,
}

if let Some(_) = components.next() {
return false;
}
true
}

fn validate_get_known_peers_response(&self, response: &HashMap<PeerIdSerde, PeerAddresses>) -> bool {
if response.is_empty() {
return false;
}

if response.len() > DEFAULT_PEERS_NUM {
return false;
}

for addresses in response.values() {
if addresses.is_empty() {
return false;
}

for address in addresses {
if !self.validate_global_multiaddr(address) {
return false;
}
}
}
true
}

fn poll(
&mut self,
cx: &mut Context,
Expand All @@ -200,7 +266,7 @@ impl PeersExchange {
impl NetworkBehaviourEventProcess<RequestResponseEvent<PeersExchangeRequest, PeersExchangeResponse>> for PeersExchange {
fn inject_event(&mut self, event: RequestResponseEvent<PeersExchangeRequest, PeersExchangeResponse>) {
match event {
RequestResponseEvent::Message { message, .. } => match message {
RequestResponseEvent::Message { message, peer } => match message {
RequestResponseMessage::Request { request, channel, .. } => match request {
PeersExchangeRequest::GetKnownPeers { num } => {
let response = PeersExchangeResponse::KnownPeers {
Expand All @@ -211,6 +277,13 @@ impl NetworkBehaviourEventProcess<RequestResponseEvent<PeersExchangeRequest, Pee
},
RequestResponseMessage::Response { response, .. } => match response {
PeersExchangeResponse::KnownPeers { peers } => {
if !self.validate_get_known_peers_response(&peers) {
// if peer provides invalid response forget it and try to request from other peer
self.forget_peer(&peer);
self.request_known_peers_from_random_peer();
return;
}

info!("Got peers {:?}", peers);
peers.into_iter().for_each(|(peer, addresses)| {
self.add_peer_addresses(&peer.0, addresses);
Expand All @@ -227,6 +300,8 @@ impl NetworkBehaviourEventProcess<RequestResponseEvent<PeersExchangeRequest, Pee
"Outbound failure {:?} while requesting {:?} to peer {}",
error, request_id, peer
);
self.forget_peer(&peer);
self.request_known_peers_from_random_peer();
},
RequestResponseEvent::InboundFailure { peer, error, .. } => {
error!(
Expand All @@ -240,8 +315,11 @@ impl NetworkBehaviourEventProcess<RequestResponseEvent<PeersExchangeRequest, Pee

#[cfg(test)]
mod tests {
use crate::peers_exchange::PeerIdSerde;
use crate::peers_exchange::{PeerIdSerde, PeersExchange};
use crate::PeerId;
use libp2p::core::Multiaddr;
use std::collections::{HashMap, HashSet};
use std::iter::FromIterator;

#[test]
fn test_peer_id_serde() {
Expand All @@ -250,4 +328,34 @@ mod tests {
let deserialized: PeerIdSerde = rmp_serde::from_read_ref(&serialized).unwrap();
assert_eq!(peer_id.0, deserialized.0);
}

#[test]
fn test_validate_get_known_peers_response() {
let behaviour = PeersExchange::new(3000);
let response = HashMap::default();
assert!(!behaviour.validate_get_known_peers_response(&response));

let response = HashMap::from_iter(vec![(PeerIdSerde(PeerId::random()), HashSet::new())]);
assert!(!behaviour.validate_get_known_peers_response(&response));

let address: Multiaddr = "/ip4/127.0.0.1/tcp/3000".parse().unwrap();
let response = HashMap::from_iter(vec![(PeerIdSerde(PeerId::random()), HashSet::from_iter(vec![address]))]);
assert!(!behaviour.validate_get_known_peers_response(&response));

let address: Multiaddr = "/ip4/216.58.210.142/tcp/3000".parse().unwrap();
let response = HashMap::from_iter(vec![(PeerIdSerde(PeerId::random()), HashSet::from_iter(vec![address]))]);
assert!(behaviour.validate_get_known_peers_response(&response));

let address: Multiaddr = "/ip4/216.58.210.142/tcp/3000/tcp/2000".parse().unwrap();
let response = HashMap::from_iter(vec![(PeerIdSerde(PeerId::random()), HashSet::from_iter(vec![address]))]);
assert!(!behaviour.validate_get_known_peers_response(&response));

let address: Multiaddr = "/ip4/216.58.210.142/tcp/3001".parse().unwrap();
let response = HashMap::from_iter(vec![(PeerIdSerde(PeerId::random()), HashSet::from_iter(vec![address]))]);
assert!(!behaviour.validate_get_known_peers_response(&response));

let address: Multiaddr = "/ip4/216.58.210.142".parse().unwrap();
let response = HashMap::from_iter(vec![(PeerIdSerde(PeerId::random()), HashSet::from_iter(vec![address]))]);
assert!(!behaviour.validate_get_known_peers_response(&response));
}
}

0 comments on commit ad076e1

Please sign in to comment.