Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

find_node: Optimize parallelism factor for slow to respond peers #220

Merged
merged 5 commits into from
Aug 27, 2024
Merged
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 100 additions & 8 deletions src/protocol/libp2p/kademlia/query/find_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ use std::collections::{BTreeMap, HashMap, HashSet, VecDeque};
/// Logging target for the file.
const LOG_TARGET: &str = "litep2p::ipfs::kademlia::query::find_node";

/// Default timeout for a peer to respond to a query.
const DEFAULT_PEER_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);

/// The configuration needed to instantiate a new [`FindNodeContext`].
#[derive(Debug, Clone)]
pub struct FindNodeConfig<T: Clone + Into<Vec<u8>>> {
Expand Down Expand Up @@ -63,7 +66,7 @@ pub struct FindNodeContext<T: Clone + Into<Vec<u8>>> {
kad_message: Bytes,

/// Peers from whom the `QueryEngine` is waiting to hear a response.
pub pending: HashMap<PeerId, KademliaPeer>,
pub pending: HashMap<PeerId, (KademliaPeer, std::time::Instant)>,

/// Queried candidates.
///
Expand All @@ -76,6 +79,18 @@ pub struct FindNodeContext<T: Clone + Into<Vec<u8>>> {

/// Responses.
pub responses: BTreeMap<Distance, KademliaPeer>,

/// The timeout after which the pending request is no longer
/// counting towards the parallelism factor.
///
/// This is used to prevent the query from getting stuck when a peer
/// is slow or fails to respond in due time.
peer_timeout: std::time::Duration,
/// The number of pending responses that count towards the parallelism factor.
///
/// These represent the number of peers added to the `Self::pending` minus the number of peers
/// that have failed to respond within the `Self::peer_timeout`
pending_responses: usize,
}

impl<T: Clone + Into<Vec<u8>>> FindNodeContext<T> {
Expand All @@ -98,28 +113,33 @@ impl<T: Clone + Into<Vec<u8>>> FindNodeContext<T> {
pending: HashMap::new(),
queried: HashSet::new(),
responses: BTreeMap::new(),

peer_timeout: DEFAULT_PEER_TIMEOUT,
pending_responses: 0,
}
}

/// Register response failure for `peer`.
pub fn register_response_failure(&mut self, peer: PeerId) {
let Some(peer) = self.pending.remove(&peer) else {
tracing::debug!(target: LOG_TARGET, query = ?self.config.query, ?peer, "pending peer doesn't exist");
let Some((peer, instant)) = self.pending.remove(&peer) else {
tracing::debug!(target: LOG_TARGET, query = ?self.config.query, ?peer, "pending peer doesn't exist during response failure");
return;
};

tracing::trace!(target: LOG_TARGET, query = ?self.config.query, ?peer, elapsed = ?instant.elapsed(), "peer failed to respond");

self.queried.insert(peer.peer);
}

/// Register `FIND_NODE` response from `peer`.
pub fn register_response(&mut self, peer: PeerId, peers: Vec<KademliaPeer>) {
tracing::trace!(target: LOG_TARGET, query = ?self.config.query, ?peer, "received response from peer");

let Some(peer) = self.pending.remove(&peer) else {
let Some((peer, instant)) = self.pending.remove(&peer) else {
tracing::debug!(target: LOG_TARGET, query = ?self.config.query, ?peer, "received response from peer but didn't expect it");
return;
};

tracing::trace!(target: LOG_TARGET, query = ?self.config.query, ?peer, elapsed = ?instant.elapsed(), "received response from peer");

// calculate distance for `peer` from target and insert it if
// a) the map doesn't have 20 responses
// b) it can replace some other peer that has a higher distance
Expand Down Expand Up @@ -189,7 +209,8 @@ impl<T: Clone + Into<Vec<u8>>> FindNodeContext<T> {
let peer = candidate.peer;

tracing::trace!(target: LOG_TARGET, query = ?self.config.query, ?peer, "current candidate");
self.pending.insert(candidate.peer, candidate.clone());
self.pending.insert(candidate.peer, (candidate, std::time::Instant::now()));
self.pending_responses = self.pending_responses.saturating_add(1);

Some(QueryAction::SendMessage {
query: self.config.query,
Expand Down Expand Up @@ -221,9 +242,22 @@ impl<T: Clone + Into<Vec<u8>>> FindNodeContext<T> {
};
}

for (peer, instant) in self.pending.values() {
if instant.elapsed() > self.peer_timeout {
tracing::trace!(
target: LOG_TARGET,
query = ?self.config.query,
?peer,
elapsed = ?instant.elapsed(),
"peer no longer counting towards parallelism factor"
);
self.pending_responses = self.pending_responses.saturating_sub(1);
}
}

// At this point, we either have pending responses or candidates to query; and we need more
// results. Ensure we do not exceed the parallelism factor.
if self.pending.len() == self.config.parallelism_factor {
if self.pending_responses == self.config.parallelism_factor {
return None;
}

Expand Down Expand Up @@ -343,6 +377,64 @@ mod tests {
assert!(context.next_action().is_none());
}

#[test]
fn fulfill_parallelism_with_timeout_optimization() {
let config = FindNodeConfig {
parallelism_factor: 3,
..default_config()
};

let in_peers_set = (0..4).map(|_| PeerId::random()).collect::<HashSet<_>>();
let in_peers = in_peers_set.iter().map(|peer| peer_to_kad(*peer)).collect();
let mut context = FindNodeContext::new(config, in_peers);
// Test overwrite.
context.peer_timeout = std::time::Duration::from_secs(1);

for num in 0..3 {
let event = context.next_action().unwrap();
match event {
QueryAction::SendMessage { query, peer, .. } => {
assert_eq!(query, QueryId(0));
// Added as pending.
assert_eq!(context.pending.len(), num + 1);
assert!(context.pending.contains_key(&peer));

// Check the peer is the one provided.
assert!(in_peers_set.contains(&peer));
}
_ => panic!("Unexpected event"),
}
}

// Fulfilled parallelism.
assert!(context.next_action().is_none());

// Sleep more than 1 second.
std::thread::sleep(std::time::Duration::from_secs(2));

// The pending responses are reset only on the next query action.
assert_eq!(context.pending_responses, 3);
assert_eq!(context.pending.len(), 3);

// This allows other peers to be queried.
let event = context.next_action().unwrap();
match event {
QueryAction::SendMessage { query, peer, .. } => {
assert_eq!(query, QueryId(0));
// Added as pending.
assert_eq!(context.pending.len(), 4);
assert!(context.pending.contains_key(&peer));

// Check the peer is the one provided.
assert!(in_peers_set.contains(&peer));
}
_ => panic!("Unexpected event"),
}

assert_eq!(context.pending_responses, 1);
assert_eq!(context.pending.len(), 4);
}

#[test]
fn completes_when_responses() {
let config = FindNodeConfig {
Expand Down
Loading