Skip to content

Commit

Permalink
Add randomization in sync retry batch peer selection (sigp#5822)
Browse files Browse the repository at this point in the history
* Add randomization in sync retry batch peer selection

* Use min

* Apply suggestions from code review

Co-authored-by: João Oliveira <hello@jxs.pt>

* Merge branch 'unstable' into peer-prio
  • Loading branch information
dapplion authored Jul 9, 2024
1 parent 9942c18 commit 2e2ccec
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 27 deletions.
32 changes: 15 additions & 17 deletions beacon_node/network/src/sync/backfill_sync/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -919,24 +919,22 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
// Find a peer to request the batch
let failed_peers = batch.failed_peers();

let new_peer = {
let mut priorized_peers = self
.network_globals
.peers
.read()
.synced_peers()
.map(|peer| {
(
failed_peers.contains(peer),
self.active_requests.get(peer).map(|v| v.len()).unwrap_or(0),
*peer,
)
})
.collect::<Vec<_>>();
let new_peer = self
.network_globals
.peers
.read()
.synced_peers()
.map(|peer| {
(
failed_peers.contains(peer),
self.active_requests.get(peer).map(|v| v.len()).unwrap_or(0),
rand::random::<u32>(),
*peer,
)
})
// Sort peers prioritizing unrelated peers with less active requests.
priorized_peers.sort_unstable();
priorized_peers.first().map(|&(_, _, peer)| peer)
};
.min()
.map(|(_, _, _, peer)| peer);

if let Some(peer) = new_peer {
self.participating_peers.insert(peer);
Expand Down
24 changes: 14 additions & 10 deletions beacon_node/network/src/sync/range_sync/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use beacon_chain::BeaconChainTypes;
use fnv::FnvHashMap;
use lighthouse_network::service::api_types::Id;
use lighthouse_network::{PeerAction, PeerId};
use rand::seq::SliceRandom;
use rand::{seq::SliceRandom, Rng};
use slog::{crit, debug, o, warn};
use std::collections::{btree_map::Entry, BTreeMap, HashSet};
use std::hash::{Hash, Hasher};
Expand Down Expand Up @@ -873,16 +873,20 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
// Find a peer to request the batch
let failed_peers = batch.failed_peers();

let new_peer = {
let mut priorized_peers = self
.peers
.iter()
.map(|(peer, requests)| (failed_peers.contains(peer), requests.len(), *peer))
.collect::<Vec<_>>();
let new_peer = self
.peers
.iter()
.map(|(peer, requests)| {
(
failed_peers.contains(peer),
requests.len(),
rand::thread_rng().gen::<u32>(),
*peer,
)
})
// Sort peers prioritizing unrelated peers with less active requests.
priorized_peers.sort_unstable();
priorized_peers.first().map(|&(_, _, peer)| peer)
};
.min()
.map(|(_, _, _, peer)| peer);

if let Some(peer) = new_peer {
self.send_batch(network, batch_id, peer)
Expand Down

0 comments on commit 2e2ccec

Please sign in to comment.