Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
Support multiple Kad protocol names
Browse files Browse the repository at this point in the history
  • Loading branch information
dmitry-markin committed Sep 15, 2022
1 parent 228557b commit 8770e13
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 31 deletions.
9 changes: 3 additions & 6 deletions client/network/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,7 @@ use sp_runtime::{
traits::{Block as BlockT, NumberFor},
Justifications,
};
use std::{
collections::HashSet,
time::Duration,
};
use std::{collections::HashSet, time::Duration};

pub use crate::request_responses::{InboundFailure, OutboundFailure, RequestId, ResponseFailure};

Expand Down Expand Up @@ -326,9 +323,9 @@ where
/// Add a self-reported address of a remote peer to the k-buckets of the supported
/// DHTs (`supported_protocols`).
pub fn add_self_reported_address(
&self,
&mut self,
peer_id: &PeerId,
supported_protocols: impl Iterator<Item = impl AsRef<[u8]>>,
supported_protocols: &[impl AsRef<[u8]>],
addr: Multiaddr,
) {
self.discovery.add_self_reported_address(peer_id, supported_protocols, addr);
Expand Down
41 changes: 25 additions & 16 deletions client/network/src/discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
//! of a node's address, you must call `add_self_reported_address`.

use crate::utils::LruHashSet;
use bytes::Bytes;
use futures::prelude::*;
use futures_timer::Delay;
use ip_network::IpNetwork;
Expand Down Expand Up @@ -321,7 +322,7 @@ impl DiscoveryBehaviour {
pub fn add_self_reported_address(
&mut self,
peer_id: &PeerId,
supported_protocols: impl Iterator<Item = impl AsRef<[u8]>>,
supported_protocols: &[impl AsRef<[u8]>],
addr: Multiaddr,
) {
if !self.allow_non_globals_in_dht && !self.can_add_to_dht(&addr) {
Expand All @@ -330,16 +331,25 @@ impl DiscoveryBehaviour {
}

let mut added = false;
for protocol in supported_protocols {
for kademlia in self.kademlias.values_mut() {
if protocol.as_ref() == kademlia.protocol_name() {
trace!(
target: "sub-libp2p",
"Adding self-reported address {} from {} to Kademlia DHT {}.",
addr, peer_id, String::from_utf8_lossy(kademlia.protocol_name()),
);
kademlia.add_address(peer_id, addr.clone());
added = true;
for kademlia in self.kademlias.values_mut() {
let kad_protocols: Vec<_> = kademlia
.protocol_names()
.iter()
.map(AsRef::as_ref)
.map(Bytes::copy_from_slice)
.collect();
'kad: for kad_protocol in kad_protocols {
for supported_protocol in supported_protocols {
if supported_protocol.as_ref() == kad_protocol {
trace!(
target: "sub-libp2p",
"Adding self-reported address {} from {} to Kademlia DHT {}.",
addr, peer_id, String::from_utf8_lossy(&kad_protocol),
);
kademlia.add_address(peer_id, addr.clone());
added = true;
break 'kad
}
}
}
}
Expand Down Expand Up @@ -1101,8 +1111,7 @@ mod tests {
.behaviour_mut()
.add_self_reported_address(
&other,
[protocol_name_from_protocol_id(&protocol_id)]
.iter(),
&[protocol_name_from_protocol_id(&protocol_id)],
addr,
);

Expand Down Expand Up @@ -1157,7 +1166,7 @@ mod tests {
// Add remote peer with unsupported protocol.
discovery.add_self_reported_address(
&remote_peer_id,
[protocol_name_from_protocol_id(&unsupported_protocol_id)].iter(),
&[protocol_name_from_protocol_id(&unsupported_protocol_id)],
remote_addr.clone(),
);

Expand All @@ -1174,7 +1183,7 @@ mod tests {
// Add remote peer with supported protocol.
discovery.add_self_reported_address(
&remote_peer_id,
[protocol_name_from_protocol_id(&supported_protocol_id)].iter(),
&[protocol_name_from_protocol_id(&supported_protocol_id)],
remote_addr.clone(),
);

Expand Down Expand Up @@ -1213,7 +1222,7 @@ mod tests {
// Add remote peer with `protocol_a` only.
discovery.add_self_reported_address(
&remote_peer_id,
[protocol_name_from_protocol_id(&protocol_a)].iter(),
&[protocol_name_from_protocol_id(&protocol_a)],
remote_addr.clone(),
);

Expand Down
21 changes: 12 additions & 9 deletions client/network/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,10 @@ use codec::Encode as _;
use futures::{channel::oneshot, prelude::*};
use libp2p::{
core::{either::EitherError, upgrade, ConnectedPoint, Executor},
identify::IdentifyInfo,
kad::record::Key as KademliaKey,
multiaddr,
ping::Failure as PingFailure,
identify::IdentifyInfo,
swarm::{
AddressScore, ConnectionError, ConnectionLimits, DialError, NetworkBehaviour,
PendingConnectionError, Swarm, SwarmBuilder, SwarmEvent,
Expand Down Expand Up @@ -283,6 +283,11 @@ where
let num_connected = Arc::new(AtomicUsize::new(0));
let is_major_syncing = Arc::new(AtomicBool::new(false));

let block_request_protocol_name = params.block_request_protocol_config.name.clone();
let state_request_protocol_name = params.state_request_protocol_config.name.clone();
let warp_sync_protocol_name =
params.warp_sync_protocol_config.as_ref().map(|c| c.name.clone());

// Build the swarm.
let (mut swarm, bandwidth): (Swarm<Behaviour<B, Client>>, _) = {
let user_agent = format!(
Expand Down Expand Up @@ -481,9 +486,9 @@ where
tx_handler_controller,
metrics,
boot_node_ids,
block_request_protocol_name: params.block_request_protocol_config.name,
state_request_protocol_name: params.state_request_protocol_config.name,
warp_sync_protocol_name: params.warp_sync_protocol_config.map(|c| c.name),
block_request_protocol_name,
state_request_protocol_name,
warp_sync_protocol_name,
})
}

Expand Down Expand Up @@ -1693,11 +1698,9 @@ where
listen_addrs.truncate(30);
}
for addr in listen_addrs {
self.network_service.behaviour().add_self_reported_address(
&peer_id,
protocols.iter(),
addr,
);
self.network_service
.behaviour()
.add_self_reported_address(&peer_id, &protocols, addr);
}
self.network_service
.behaviour()
Expand Down

0 comments on commit 8770e13

Please sign in to comment.