Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cleanups #7

Merged
merged 10 commits into from
Aug 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 10 additions & 38 deletions beacon_node/lighthouse_network/src/discovery/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -383,20 +383,6 @@ impl<TSpec: EthSpec> Discovery<TSpec> {
self.discv5.table_entries_enr()
}

/// Returns the ENR of a known peer if it exists.
pub fn enr_of_peer(&mut self, peer_id: &PeerId) -> Option<Enr> {
// first search the local cache
if let Some(enr) = self.cached_enrs.get(peer_id) {
return Some(enr.clone());
}
// not in the local cache, look in the routing table
if let Ok(node_id) = enr_ext::peer_id_to_node_id(peer_id) {
self.discv5.find_enr(&node_id)
} else {
None
}
}

/// Updates the local ENR TCP port.
/// There currently isn't a case to update the address here. We opt for discovery to
/// automatically update the external address.
Expand Down Expand Up @@ -733,23 +719,6 @@ impl<TSpec: EthSpec> Discovery<TSpec> {
target_peers: usize,
additional_predicate: impl Fn(&Enr) -> bool + Send + 'static,
) {
// Make sure there are subnet queries included
let contains_queries = match &query {
QueryType::Subnet(queries) => !queries.is_empty(),
QueryType::FindPeers => true,
};

if !contains_queries {
debug!(
self.log,
"No subnets included in this request. Skipping discovery request."
);
return;
}

// Generate a random target node id.
let random_node = NodeId::random();

let enr_fork_id = match self.local_enr().eth2() {
Ok(v) => v,
Err(e) => {
Expand All @@ -773,7 +742,8 @@ impl<TSpec: EthSpec> Discovery<TSpec> {
// Build the future
let query_future = self
.discv5
.find_node_predicate(random_node, predicate, target_peers)
// Generate a random target node id.
.find_node_predicate(NodeId::random(), predicate, target_peers)
.map(|v| QueryResult {
query_type: query,
result: v,
Expand All @@ -797,12 +767,14 @@ impl<TSpec: EthSpec> Discovery<TSpec> {
}
Ok(r) => {
debug!(self.log, "Discovery query completed"; "peers_found" => r.len());
let mut results: HashMap<_, Option<Instant>> = HashMap::new();
r.into_iter().for_each(|enr| {
// cache the found ENR's
self.cached_enrs.put(enr.peer_id(), enr.clone());
results.insert(enr, None);
});
let results = r
.into_iter()
.map(|enr| {
// cache the found ENR's
self.cached_enrs.put(enr.peer_id(), enr.clone());
(enr, None)
})
.collect();
return Some(results);
}
Err(e) => {
Expand Down
34 changes: 16 additions & 18 deletions beacon_node/lighthouse_network/src/peer_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -315,8 +315,7 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
/// This function decides whether or not to dial these peers.
#[allow(clippy::mutable_key_type)]
pub fn peers_discovered(&mut self, results: HashMap<Enr, Option<Instant>>) {
let mut to_dial_peers = Vec::with_capacity(4);

let mut to_dial_peers = 0;
let connected_or_dialing = self.network_globals.connected_or_dialing_peers();
for (enr, min_ttl) in results {
// There are two conditions in deciding whether to dial this peer.
Expand All @@ -327,14 +326,8 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
// considered a priority. We have pre-allocated some extra priority slots for these
// peers as specified by PRIORITY_PEER_EXCESS. Therefore we dial these peers, even
// if we are already at our max_peer limit.
if (min_ttl.is_some()
&& connected_or_dialing + to_dial_peers.len() < self.max_priority_peers()
|| connected_or_dialing + to_dial_peers.len() < self.max_peers())
&& self
.network_globals
.peers
.read()
.should_dial(&enr.peer_id())
if min_ttl.is_some() && connected_or_dialing + to_dial_peers < self.max_priority_peers()
|| connected_or_dialing + to_dial_peers < self.max_peers()
{
// This should be updated with the peer dialing. In fact created once the peer is
// dialed
Expand All @@ -345,15 +338,13 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
.update_min_ttl(&enr.peer_id(), min_ttl);
}
debug!(self.log, "Dialing discovered peer"; "peer_id" => %enr.peer_id());
to_dial_peers.push(enr);
self.dial_peer(enr);
to_dial_peers += 1;
}
}

// Queue another discovery if we need to
self.maintain_peer_count(to_dial_peers.len());

// Dial the required peers
self.dial_peers(to_dial_peers);
self.maintain_peer_count(to_dial_peers);
}

/// A STATUS message has been received from a peer. This resets the status timer.
Expand Down Expand Up @@ -409,9 +400,16 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {

/* Notifications from the Swarm */

// A peer is being dialed.
pub fn dial_peers(&mut self, mut peers: Vec<Enr>) {
self.peers_to_dial.append(&mut peers);
/// A peer is being dialed.
pub fn dial_peer(&mut self, peer: Enr) {
if self
.network_globals
.peers
.read()
.should_dial(&peer.peer_id())
{
self.peers_to_dial.push(peer);
}
}

/// Reports if a peer is banned or not.
Expand Down
35 changes: 12 additions & 23 deletions beacon_node/lighthouse_network/src/service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1066,9 +1066,8 @@ impl<AppReqId: ReqId, TSpec: EthSpec> Network<AppReqId, TSpec> {
let peers_to_dial: Vec<Enr> = self
.discovery()
.cached_enrs()
.filter_map(|(peer_id, enr)| {
let peers = self.network_globals.peers.read();
if predicate(enr) && peers.should_dial(peer_id) {
.filter_map(|(_peer_id, enr)| {
if predicate(enr) {
Some(enr.clone())
} else {
None
Expand All @@ -1078,10 +1077,9 @@ impl<AppReqId: ReqId, TSpec: EthSpec> Network<AppReqId, TSpec> {

// Remove the ENR from the cache to prevent continual re-dialing on disconnects
peers_to_dial.iter().for_each(|enr| {
self.peer_manager_mut().dial_peer(enr.clone());
self.discovery_mut().remove_cached_enr(&enr.peer_id());
});

self.peer_manager_mut().dial_peers(peers_to_dial);
}

/* Sub-behaviour event handling functions */
Expand Down Expand Up @@ -1366,19 +1364,6 @@ impl<AppReqId: ReqId, TSpec: EthSpec> Network<AppReqId, TSpec> {
}
}

/// Handle a discovery event.
fn inject_discovery_event(
&mut self,
event: DiscoveredPeers,
) -> Option<NetworkEvent<AppReqId, TSpec>> {
// Inform the peer manager about discovered peers.
//
// The peer manager will subsequently decide which peers need to be dialed and then dial
// them.
self.peer_manager_mut().peers_discovered(event.peers);
None
}

/// Handle an identify event.
fn inject_identify_event(
&mut self,
Expand Down Expand Up @@ -1479,7 +1464,14 @@ impl<AppReqId: ReqId, TSpec: EthSpec> Network<AppReqId, TSpec> {
BehaviourEvent::BannedPeers(void) => void::unreachable(void),
BehaviourEvent::Gossipsub(ge) => self.inject_gs_event(ge),
BehaviourEvent::Eth2Rpc(re) => self.inject_rpc_event(re),
BehaviourEvent::Discovery(de) => self.inject_discovery_event(de),
// Inform the peer manager about discovered peers.
//
// The peer manager will subsequently decide which peers need to be dialed and then dial
// them.
BehaviourEvent::Discovery(DiscoveredPeers { peers }) => {
self.peer_manager_mut().peers_discovered(peers);
None
}
BehaviourEvent::Identify(ie) => self.inject_identify_event(ie),
BehaviourEvent::PeerManager(pe) => self.inject_pm_event(pe),
BehaviourEvent::ConnectionLimits(le) => void::unreachable(le),
Expand Down Expand Up @@ -1561,10 +1553,7 @@ impl<AppReqId: ReqId, TSpec: EthSpec> Network<AppReqId, TSpec> {
None
}
}
SwarmEvent::Dialing {
peer_id: _,
connection_id: _,
} => None,
SwarmEvent::Dialing { .. } => None,
};

if let Some(ev) = maybe_event {
Expand Down
2 changes: 1 addition & 1 deletion beacon_node/lighthouse_network/tests/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ pub fn build_config(port: u16, mut boot_nodes: Vec<Enr>) -> NetworkConfig {
.unwrap();

config.set_ipv4_listening_address(std::net::Ipv4Addr::UNSPECIFIED, port, port, port + 1);
config.enr_disc4_port = Some(port);
config.enr_udp4_port = Some(port);
config.enr_address = (Some(std::net::Ipv4Addr::LOCALHOST), None);
config.boot_nodes_enr.append(&mut boot_nodes);
config.network_dir = path.into_path();
Expand Down
2 changes: 1 addition & 1 deletion beacon_node/network/src/service/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ mod tests {
);

let mut config = NetworkConfig::default();
config.set_ipv4_listening_address(std::net::Ipv4Addr::UNSPECIFIED, 21212, 21212);
config.set_ipv4_listening_address(std::net::Ipv4Addr::UNSPECIFIED, 21212, 21212, 21212);
config.discv5_config.table_filter = |_| true; // Do not ignore local IPs
config.upnp_enabled = false;
config.boot_nodes_enr = enrs.clone();
Expand Down
Loading