Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[1 / 5] Optimize logic for gossiping assignments #4848

Merged
merged 3 commits into from
Jul 16, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 21 additions & 9 deletions polkadot/node/network/approval-distribution/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1431,6 +1431,21 @@ impl State {
let required_routing = topology.map_or(RequiredRouting::PendingTopology, |t| {
t.local_grid_neighbors().required_routing_by_index(validator_index, local)
});
// Peers that we will send the assignment to.
let mut peers = HashSet::new();

let peers_to_route_to = topology
.as_ref()
.map(|t| t.peers_to_route(required_routing))
.unwrap_or_default();

for peer in peers_to_route_to {
if !entry.known_by.contains_key(&peer) {
continue
}

peers.insert(peer);
}

// All the peers that know the relay chain block.
let peers_to_filter = entry.known_by();
Expand All @@ -1456,20 +1471,13 @@ impl State {
let n_peers_total = self.peer_views.len();
let source_peer = source.peer_id();

// Peers that we will send the assignment to.
let mut peers = Vec::new();

// Filter destination peers
for peer in peers_to_filter.into_iter() {
if Some(peer) == source_peer {
continue
}

if let Some(true) = topology
.as_ref()
.map(|t| t.local_grid_neighbors().route_to_peer(required_routing, &peer))
{
peers.push(peer);
if peers.contains(&peer) {
continue
}

Expand All @@ -1485,7 +1493,11 @@ impl State {

if route_random {
approval_entry.routing_info_mut().mark_randomly_sent(peer);
peers.push(peer);
peers.insert(peer);
}

if approval_entry.routing_info().random_routing.is_complete() {
break
}
}

Expand Down
12 changes: 8 additions & 4 deletions polkadot/node/network/approval-distribution/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2404,7 +2404,7 @@ fn propagates_locally_generated_assignment_to_both_dimensions() {
let assignments = vec![(cert.clone(), candidate_index)];
let approvals = vec![approval.clone()];

let assignment_sent_peers = assert_matches!(
let mut assignment_sent_peers = assert_matches!(
overseer_recv(overseer).await,
AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::SendValidationMessage(
sent_peers,
Expand All @@ -2428,12 +2428,14 @@ fn propagates_locally_generated_assignment_to_both_dimensions() {
assert_matches!(
overseer_recv(overseer).await,
AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::SendValidationMessage(
sent_peers,
mut sent_peers,
Versioned::V1(protocol_v1::ValidationProtocol::ApprovalDistribution(
protocol_v1::ApprovalDistributionMessage::Approvals(sent_approvals)
))
)) => {
// Random sampling is reused from the assignment.
sent_peers.sort();
assignment_sent_peers.sort();
assert_eq!(sent_peers, assignment_sent_peers);
assert_eq!(sent_approvals, approvals);
}
Expand Down Expand Up @@ -2678,7 +2680,7 @@ fn propagates_to_required_after_connect() {
let assignments = vec![(cert.clone(), candidate_index)];
let approvals = vec![approval.clone()];

let assignment_sent_peers = assert_matches!(
let mut assignment_sent_peers = assert_matches!(
overseer_recv(overseer).await,
AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::SendValidationMessage(
sent_peers,
Expand All @@ -2702,12 +2704,14 @@ fn propagates_to_required_after_connect() {
assert_matches!(
overseer_recv(overseer).await,
AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::SendValidationMessage(
sent_peers,
mut sent_peers,
Versioned::V1(protocol_v1::ValidationProtocol::ApprovalDistribution(
protocol_v1::ApprovalDistributionMessage::Approvals(sent_approvals)
))
)) => {
// Random sampling is reused from the assignment.
sent_peers.sort();
assignment_sent_peers.sort();
assert_eq!(sent_peers, assignment_sent_peers);
assert_eq!(sent_approvals, approvals);
}
Expand Down
22 changes: 22 additions & 0 deletions polkadot/node/network/protocol/src/grid_topology.rs
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,23 @@ impl SessionGridTopologyEntry {
self.topology.is_validator(peer)
}

/// Returns the list of peers to route based on the required routing.
pub fn peers_to_route(&self, required_routing: RequiredRouting) -> Vec<PeerId> {
match required_routing {
RequiredRouting::All => self.topology.peer_ids.iter().copied().collect(),
RequiredRouting::GridX => self.local_neighbors.peers_x.iter().copied().collect(),
RequiredRouting::GridY => self.local_neighbors.peers_y.iter().copied().collect(),
RequiredRouting::GridXY => self
.local_neighbors
.peers_x
.iter()
.chain(self.local_neighbors.peers_y.iter())
.copied()
.collect(),
RequiredRouting::None | RequiredRouting::PendingTopology => Vec::new(),
}
}

/// Updates the known peer ids for the passed authorities ids.
pub fn update_authority_ids(
&mut self,
Expand Down Expand Up @@ -524,6 +541,11 @@ impl RandomRouting {
pub fn inc_sent(&mut self) {
self.sent += 1
}

/// Returns `true` if we already took all the necessary samples.
pub fn is_complete(&self) -> bool {
alexggh marked this conversation as resolved.
Show resolved Hide resolved
self.sent >= self.target
}
}

/// Routing mode
Expand Down
Loading