Skip to content

Commit

Permalink
Run prefix searching
Browse files Browse the repository at this point in the history
  • Loading branch information
ackintosh committed Dec 11, 2023
1 parent 8780276 commit 5f7788b
Showing 1 changed file with 80 additions and 26 deletions.
106 changes: 80 additions & 26 deletions beacon_node/lighthouse_network/src/discovery/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,8 @@ impl std::fmt::Debug for SubnetQuery {
enum QueryType {
/// We are searching for subnet peers.
Subnet(Vec<SubnetQuery>),
/// We are prefix searching for subnet peers.
PrefixSearch(Vec<SubnetQuery>),
/// We are searching for more peers without ENR or time constraints.
FindPeers,
}
Expand Down Expand Up @@ -791,16 +793,32 @@ impl<TSpec: EthSpec, TSlotClock: SlotClock> Discovery<TSpec, TSlotClock> {
// build the subnet predicate as a combination of the eth2_fork_predicate and the subnet predicate
let subnet_predicate = subnet_predicate::<TSpec>(filtered_subnets, &self.log);

debug!(
self.log,
"Starting grouped subnet query";
"subnets" => ?filtered_subnet_queries,
);
self.start_query(
QueryType::Subnet(filtered_subnet_queries),
TARGET_PEERS_FOR_GROUPED_QUERY,
subnet_predicate,
);
// TODO: Add CLI flag for this?
let prefix_search_for_subnet = true;

if prefix_search_for_subnet {
debug!(
self.log,
"Starting prefix search query";
"subnets" => ?filtered_subnet_queries,
);
self.start_query(
QueryType::PrefixSearch(filtered_subnet_queries),
TARGET_PEERS_FOR_GROUPED_QUERY,
subnet_predicate,
);
} else {
debug!(
self.log,
"Starting grouped subnet query";
"subnets" => ?filtered_subnet_queries,
);
self.start_query(
QueryType::Subnet(filtered_subnet_queries),
TARGET_PEERS_FOR_GROUPED_QUERY,
subnet_predicate,
);
}
}
}

Expand Down Expand Up @@ -831,22 +849,58 @@ impl<TSpec: EthSpec, TSlotClock: SlotClock> Discovery<TSpec, TSlotClock> {
&& (enr.tcp4().is_some() || enr.tcp6().is_some())
};

// General predicate
let predicate: Box<dyn Fn(&Enr) -> bool + Send> =
Box::new(move |enr: &Enr| eth2_fork_predicate(enr) && additional_predicate(enr));

// Build the future
let query_future = self
.discv5
// Generate a random target node id.
.find_node_predicate(NodeId::random(), predicate, target_peers)
.map(|v| QueryResult {
query_type: query,
result: v,
});
if let QueryType::PrefixSearch(subnet_queries) = query {
// Split the grouped subnet query into individual queries in order to prefix search.
for subnet_query in subnet_queries {
// Target node
let target_node = match &subnet_query.subnet {
Subnet::Attestation(subnet_id) => {
match self.prefix_mapping.get::<TSpec>(subnet_id) {
Ok(raw) => {
let raw_node_id: [u8; 32] = raw.into();
NodeId::from(raw_node_id)
}
Err(e) => {
warn!(self.log, "Failed to get target NodeId"; "error" => %e, "subnet_id" => ?subnet_id);
continue;
}
}
}
Subnet::SyncCommittee(_) => NodeId::random(),
};
// Build the future
let query_future = self
.discv5
.find_node_predicate(
target_node,
Box::new(eth2_fork_predicate.clone()),
target_peers,
)
.map(|v| QueryResult {
query_type: QueryType::PrefixSearch(vec![subnet_query]),
result: v,
});

// Add the future to active queries, to be executed.
self.active_queries.push(Box::pin(query_future));
// Add the future to active queries, to be executed.
self.active_queries.push(Box::pin(query_future));
}
} else {
// General predicate
let predicate: Box<dyn Fn(&Enr) -> bool + Send> =
Box::new(move |enr: &Enr| eth2_fork_predicate(enr) && additional_predicate(enr));
// Build the future
let query_future = self
.discv5
// Generate a random target node id.
.find_node_predicate(NodeId::random(), predicate, target_peers)
.map(|v| QueryResult {
query_type: query,
result: v,
});

// Add the future to active queries, to be executed.
self.active_queries.push(Box::pin(query_future));
}
}

/// Process the completed QueryResult returned from discv5.
Expand Down Expand Up @@ -878,7 +932,7 @@ impl<TSpec: EthSpec, TSlotClock: SlotClock> Discovery<TSpec, TSlotClock> {
}
}
}
QueryType::Subnet(queries) => {
QueryType::Subnet(queries) | QueryType::PrefixSearch(queries) => {
let subnets_searched_for: Vec<Subnet> =
queries.iter().map(|query| query.subnet).collect();
match query.result {
Expand Down

0 comments on commit 5f7788b

Please sign in to comment.