Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

network/strategy: Backoff and ban overloaded peers to avoid submitting the same request multiple times #5029

Merged
merged 25 commits into from
Aug 5, 2024
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
04c613a
strategy: Introduce LRU persistent disconnection state
lexnv Jul 15, 2024
edf3490
strategy/warp: Use persistent disconnection state
lexnv Jul 15, 2024
7991025
strategy/state: Use persistent disconnection state
lexnv Jul 15, 2024
6663731
strategy/chain_sync: Use persistent disconnection state
lexnv Jul 15, 2024
a836a3e
strategy/tests: Check behavior of persistent peers state
lexnv Jul 15, 2024
3466701
strategy: Forget peer state after backing off for more than 15 minutes
lexnv Jul 15, 2024
e6d2ee3
strategy/tests: Check backed off peers are not scheduled for requests
lexnv Jul 15, 2024
79d1850
strategy: Align backoff time with protocol timeout time
lexnv Jul 15, 2024
3e51525
strategy/persistent: Adjust constants, report bans and add logs
lexnv Jul 16, 2024
62f4054
strategy: Ban peers reported by the persistent store
lexnv Jul 16, 2024
02c26e0
sync/tests: Fix clippy
lexnv Jul 16, 2024
a6e76aa
sync/tests: Remove clone
lexnv Jul 16, 2024
2ac8fcd
sync/tests: Cargo fmt
lexnv Jul 16, 2024
2b7faba
sync/tests: Fix clippy
lexnv Jul 17, 2024
177dec9
sync/tests: Adjust state management expectations
lexnv Jul 17, 2024
97bfaab
Merge remote-tracking branch 'origin/master' into lexnv/fix-multiple-…
lexnv Jul 30, 2024
6efa4d1
peer-state: Rename disconnected peer state and make it private
lexnv Jul 30, 2024
48534c2
peer-state: Rename to disconnected peers for clarity
lexnv Jul 30, 2024
0a47d68
peer-state: Rename remove to on_disconnect
lexnv Jul 30, 2024
d993f59
engine: Disconnect logs
lexnv Jul 30, 2024
bfc32b9
disconnected-peers: Address feedback
lexnv Jul 30, 2024
7f90cee
Add PR doc for better documentation
lexnv Jul 30, 2024
c0d0f3c
disconnected-peers: Fix test
lexnv Jul 30, 2024
16e55cf
disconnected-peers: Raname to on_disconnect_during_request
lexnv Jul 31, 2024
00a5cb3
strategy: Adjust testing
lexnv Jul 31, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions substrate/client/network/sync/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -536,6 +536,10 @@ where
},
BlockAnnounceValidationResult::Failure { peer_id, disconnect } => {
if disconnect {
log::debug!(
target: LOG_TARGET,
"Disconnecting peer {peer_id} due to block announce validation failure",
);
self.network_service
.disconnect_peer(peer_id, self.block_announce_protocol_name.clone());
}
Expand Down
1 change: 1 addition & 0 deletions substrate/client/network/sync/src/strategy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
//! and specific syncing algorithms.

pub mod chain_sync;
mod disconnected_peers;
mod state;
pub mod state_sync;
pub mod warp;
Expand Down
24 changes: 21 additions & 3 deletions substrate/client/network/sync/src/strategy/chain_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ use crate::{
justification_requests::ExtraRequests,
schema::v1::StateResponse,
strategy::{
disconnected_peers::DisconnectedPeers,
state_sync::{ImportResult, StateSync, StateSyncProvider},
warp::{WarpSyncPhase, WarpSyncProgress},
},
Expand Down Expand Up @@ -250,6 +251,7 @@ pub struct ChainSync<B: BlockT, Client> {
client: Arc<Client>,
/// The active peers that we are using to sync and their PeerSync status
peers: HashMap<PeerId, PeerSync<B>>,
disconnected_peers: DisconnectedPeers,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: may be call it slow_peers to highlight we track only disconnected peers during active requests, not all disconnected peers?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It makes sense, the primary focus of the backoff is to handle slow peers 🤔 Although, I think there might be a chance that the remote peer disconnects immediately because it cannot handle the request for some reason (ie the request resulted in a possible recoverable error). This gives other peers priority to handle the request, while the one that disconnected is backoff for a while

/// A `BlockCollection` of blocks that are being downloaded from peers
blocks: BlockCollection<B>,
/// The best block number in our queue of blocks to import
Expand Down Expand Up @@ -378,6 +380,7 @@ where
let mut sync = Self {
client,
peers: HashMap::new(),
disconnected_peers: DisconnectedPeers::new(),
blocks: BlockCollection::new(),
best_queued_hash: Default::default(),
best_queued_number: Zero::zero(),
Expand Down Expand Up @@ -1141,7 +1144,15 @@ where
if let Some(gap_sync) = &mut self.gap_sync {
gap_sync.blocks.clear_peer_download(peer_id)
}
self.peers.remove(peer_id);

if let Some(state) = self.peers.remove(peer_id) {
if !state.state.is_available() {
if let Some(bad_peer) = self.disconnected_peers.on_disconnect(*peer_id) {
self.actions.push(ChainSyncAction::DropPeer(bad_peer));
}
}
}

self.extra_justifications.peer_disconnected(peer_id);
self.allowed_requests.set_all();
self.fork_targets.retain(|_, target| {
Expand Down Expand Up @@ -1541,10 +1552,14 @@ where
let max_parallel = if is_major_syncing { 1 } else { self.max_parallel_downloads };
let max_blocks_per_request = self.max_blocks_per_request;
let gap_sync = &mut self.gap_sync;
let disconnected_peers = &mut self.disconnected_peers;
self.peers
.iter_mut()
.filter_map(move |(&id, peer)| {
if !peer.state.is_available() || !allowed_requests.contains(&id) {
if !peer.state.is_available() ||
!allowed_requests.contains(&id) ||
!disconnected_peers.is_peer_available(&id)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: this can then be !slow_peers.is_peer_available(&id), corresponding to the PR description.

{
return None
}

Expand Down Expand Up @@ -1656,7 +1671,10 @@ where
}

for (id, peer) in self.peers.iter_mut() {
if peer.state.is_available() && peer.common_number >= sync.target_number() {
if peer.state.is_available() &&
peer.common_number >= sync.target_number() &&
self.disconnected_peers.is_peer_available(&id)
{
peer.state = PeerSyncState::DownloadingState;
let request = sync.next_request();
trace!(target: LOG_TARGET, "New StateRequest for {}: {:?}", id, request);
Expand Down
188 changes: 188 additions & 0 deletions substrate/client/network/sync/src/strategy/disconnected_peers.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
// This file is part of Substrate.

// Copyright (C) Parity Technologies (UK) Ltd.
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0

// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.

use crate::types::BadPeer;
use sc_network::ReputationChange as Rep;
use sc_network_types::PeerId;
use schnellru::{ByLength, LruMap};

const LOG_TARGET: &str = "sync::persistent_peer_state";

/// The maximum number of disconnected peers to keep track of.
///
/// When a peer disconnects, we must keep track if it was in the middle of a request.
/// The peer may disconnect because it cannot keep up with the number of requests
/// (ie not having enough resources available to handle the requests); or because it is malicious.
const MAX_DISCONNECTED_PEERS_STATE: u32 = 512;

/// The time we are going to backoff a peer that has disconnected with an inflight request.
///
/// The backoff time is calculated as `num_disconnects * DISCONNECTED_PEER_BACKOFF_SECONDS`.
/// This is to prevent submitting a request to a peer that has disconnected because it could not
/// keep up with the number of requests.
///
/// The peer may disconnect due to the keep-alive timeout, however disconnections without
/// an inflight request are not tracked.
const DISCONNECTED_PEER_BACKOFF_SECONDS: u64 = 60;

/// Maximum number of disconnects with a request in flight before a peer is banned.
const MAX_NUM_DISCONNECTS: u64 = 3;

/// Peer disconnected with a request in flight after backoffs.
///
/// The peer may be slow to respond to the request after backoffs, or it refuses to respond.
/// Report the peer and let the reputation system handle disconnecting the peer.
pub const REPUTATION_REPORT: Rep = Rep::new_fatal("Peer disconnected with inflight after backoffs");

/// The state of a disconnected peer with a request in flight.
#[derive(Debug)]
struct DisconnectedState {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: may be call the struct SlowPeers to highlight the fact we only track disconnections / connection time outs during requests?

/// The total number of disconnects.
num_disconnects: u64,
/// The time at the last disconnect.
last_disconnect: std::time::Instant,
}

impl DisconnectedState {
/// Create a new `DisconnectedState`.
pub fn new() -> Self {
Self { num_disconnects: 1, last_disconnect: std::time::Instant::now() }
}

/// Increment the number of disconnects.
pub fn increment(&mut self) {
self.num_disconnects = self.num_disconnects.saturating_add(1);
self.last_disconnect = std::time::Instant::now();
}

/// Get the number of disconnects.
pub fn num_disconnects(&self) -> u64 {
self.num_disconnects
}

/// Get the time of the last disconnect.
pub fn last_disconnect(&self) -> std::time::Instant {
self.last_disconnect
}
}

/// Tracks the state of disconnected peers with a request in flight.
///
/// This helps to prevent submitting requests to peers that have disconnected
/// before responding to the request to offload the peer.
pub struct DisconnectedPeers {
/// The state of disconnected peers.
disconnected_peers: LruMap<PeerId, DisconnectedState>,
}

impl DisconnectedPeers {
/// Create a new `DisconnectedPeers`.
pub fn new() -> Self {
Self { disconnected_peers: LruMap::new(ByLength::new(MAX_DISCONNECTED_PEERS_STATE)) }
}

/// Insert a new peer to the persistent state if not seen before, or update the state if seen.
///
/// Returns true if the peer should be disconnected.
pub fn on_disconnect(&mut self, peer: PeerId) -> Option<BadPeer> {
lexnv marked this conversation as resolved.
Show resolved Hide resolved
if let Some(state) = self.disconnected_peers.get(&peer) {
state.increment();

let should_ban = state.num_disconnects() >= MAX_NUM_DISCONNECTS;
log::debug!(
target: LOG_TARGET,
"Remove known peer {peer} state: {state:?}, should ban: {should_ban}",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The state is only removed below if should_ban is true.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed I see the confusion, the function was called remove before the last refactor :D will adjust the log here.
We are losing track of the peer when:

  • reaching LRU capacity (configured 512)
  • the peer is reported as disconnected (tracking then shifts to the peerstore)
    Thanks for catching this!

);

return should_ban.then(|| {
// We can lose track of the peer state and let the banning mechanism handle
// the peer backoff.
//
// After the peer banning expires, if the peer continues to misbehave, it will be
// backed off again.
self.disconnected_peers.remove(&peer);
BadPeer(peer, REPUTATION_REPORT)
})
}

log::debug!(
target: LOG_TARGET,
"Added peer {peer} for the first time"
);
// First time we see this peer.
self.disconnected_peers.insert(peer, DisconnectedState::new());
None
lexnv marked this conversation as resolved.
Show resolved Hide resolved
}

/// Check if a peer is available for queries.
pub fn is_peer_available(&mut self, peer_id: &PeerId) -> bool {
let Some(state) = self.disconnected_peers.get(peer_id) else {
return true;
};

let elapsed = state.last_disconnect().elapsed();
if elapsed.as_secs() >= DISCONNECTED_PEER_BACKOFF_SECONDS * state.num_disconnects {
log::debug!(target: LOG_TARGET, "Peer {peer_id} is available for queries");
self.disconnected_peers.remove(peer_id);
true
} else {
log::debug!(target: LOG_TARGET,"Peer {peer_id} is backedoff");
false
}
}
}

#[cfg(test)]
mod tests {
use super::*;
use std::time::Duration;

#[test]
fn test_persistent_peer_state() {
let mut state = DisconnectedPeers::new();
let peer = PeerId::random();

// Is not part of the disconnected peers yet.
assert_eq!(state.is_peer_available(&peer), true);

assert!(state.on_disconnect(peer).is_none());
assert_eq!(state.is_peer_available(&peer), false);

assert!(state.on_disconnect(peer).is_none());
assert_eq!(state.is_peer_available(&peer), false);

lexnv marked this conversation as resolved.
Show resolved Hide resolved
assert!(state.on_disconnect(peer).is_some());
// Peer is supposed to get banned and disconnected.
// The state ownership moves to the PeerStore.
assert!(state.disconnected_peers.get(&peer).is_none());
}

#[test]
fn ensure_backoff_time() {
let mut state = DisconnectedPeers::new();
let peer = PeerId::random();

assert!(state.on_disconnect(peer).is_none());
assert_eq!(state.is_peer_available(&peer), false);

// Wait until the backoff time has passed
std::thread::sleep(Duration::from_secs(DISCONNECTED_PEER_BACKOFF_SECONDS + 1));
lexnv marked this conversation as resolved.
Show resolved Hide resolved

assert_eq!(state.is_peer_available(&peer), true);
}
}
Loading
Loading