Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
Add metrics about block requests (#5811)
Browse files Browse the repository at this point in the history
* Add metrics about block requests

* Apply suggestions from code review

Co-Authored-By: Max Inden <mail@max-inden.de>

Co-authored-by: Gavin Wood <i@gavwood.com>
Co-authored-by: Max Inden <mail@max-inden.de>
  • Loading branch information
3 people committed Apr 28, 2020
1 parent 8c54626 commit 0f46400
Show file tree
Hide file tree
Showing 3 changed files with 215 additions and 19 deletions.
85 changes: 79 additions & 6 deletions client/network/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use libp2p::swarm::{NetworkBehaviourAction, NetworkBehaviourEventProcess, PollPa
use log::debug;
use sp_consensus::{BlockOrigin, import_queue::{IncomingBlock, Origin}};
use sp_runtime::{traits::{Block as BlockT, NumberFor}, ConsensusEngineId, Justification};
use std::{borrow::Cow, iter, task::Context, task::Poll};
use std::{borrow::Cow, iter, task::{Context, Poll}, time::Duration};
use void;

/// General behaviour of the network. Combines all protocols together.
Expand Down Expand Up @@ -67,8 +67,39 @@ pub enum BehaviourOut<B: BlockT> {
BlockImport(BlockOrigin, Vec<IncomingBlock<B>>),
JustificationImport(Origin, B::Hash, NumberFor<B>, Justification),
FinalityProofImport(Origin, B::Hash, NumberFor<B>, Vec<u8>),
/// Started a random Kademlia discovery query.

/// Started a random iterative Kademlia discovery query.
RandomKademliaStarted(ProtocolId),

/// We have received a request from a peer and answered it.
AnsweredRequest {
/// Peer which sent us a request.
peer: PeerId,
/// Protocol name of the request.
protocol: Vec<u8>,
/// Time it took to build the response.
build_time: Duration,
},
/// Started a new request with the given node.
RequestStarted {
peer: PeerId,
/// Protocol name of the request.
protocol: Vec<u8>,
},
/// Finished, successfully or not, a previously-started request.
RequestFinished {
/// Who we were requesting.
peer: PeerId,
/// Protocol name of the request.
protocol: Vec<u8>,
/// How long before the response came or the request got cancelled.
request_duration: Duration,
},

/// Any event represented by the [`Event`] enum.
///
/// > **Note**: The [`Event`] enum contains the events that are available through the public
/// > API of the library.
Event(Event),
}

Expand Down Expand Up @@ -220,7 +251,27 @@ Behaviour<B, H> {
CustomMessageOutcome::FinalityProofImport(origin, hash, nb, proof) =>
self.events.push(BehaviourOut::FinalityProofImport(origin, hash, nb, proof)),
CustomMessageOutcome::BlockRequest { target, request } => {
self.block_requests.send_request(&target, request);
match self.block_requests.send_request(&target, request) {
block_requests::SendRequestOutcome::Ok => {
self.events.push(BehaviourOut::RequestStarted {
peer: target,
protocol: self.block_requests.protocol_name().to_vec(),
});
},
block_requests::SendRequestOutcome::Replaced { request_duration, .. } => {
self.events.push(BehaviourOut::RequestFinished {
peer: target.clone(),
protocol: self.block_requests.protocol_name().to_vec(),
request_duration,
});
self.events.push(BehaviourOut::RequestStarted {
peer: target,
protocol: self.block_requests.protocol_name().to_vec(),
});
}
block_requests::SendRequestOutcome::NotConnected |
block_requests::SendRequestOutcome::EncodeError(_) => {},
}
},
CustomMessageOutcome::FinalityProofRequest { target, block_hash, request } => {
self.finality_proof_requests.send_request(&target, block_hash, request);
Expand Down Expand Up @@ -257,18 +308,40 @@ Behaviour<B, H> {
impl<B: BlockT, H: ExHashT> NetworkBehaviourEventProcess<block_requests::Event<B>> for Behaviour<B, H> {
fn inject_event(&mut self, event: block_requests::Event<B>) {
match event {
block_requests::Event::Response { peer, original_request, response } => {
block_requests::Event::AnsweredRequest { peer, response_build_time } => {
self.events.push(BehaviourOut::AnsweredRequest {
peer,
protocol: self.block_requests.protocol_name().to_vec(),
build_time: response_build_time,
});
},
block_requests::Event::Response { peer, original_request, response, request_duration } => {
self.events.push(BehaviourOut::RequestFinished {
peer: peer.clone(),
protocol: self.block_requests.protocol_name().to_vec(),
request_duration,
});
let ev = self.substrate.on_block_response(peer, original_request, response);
self.inject_event(ev);
}
block_requests::Event::RequestCancelled { .. } => {
block_requests::Event::RequestCancelled { peer, request_duration, .. } => {
// There doesn't exist any mechanism to report cancellations yet.
// We would normally disconnect the node, but this event happens as the result of
// a disconnect, so there's nothing more to do.
self.events.push(BehaviourOut::RequestFinished {
peer,
protocol: self.block_requests.protocol_name().to_vec(),
request_duration,
});
}
block_requests::Event::RequestTimeout { peer, .. } => {
block_requests::Event::RequestTimeout { peer, request_duration, .. } => {
// There doesn't exist any mechanism to report timeouts yet, so we process them by
// disconnecting the node.
self.events.push(BehaviourOut::RequestFinished {
peer: peer.clone(),
protocol: self.block_requests.protocol_name().to_vec(),
request_duration,
});
self.substrate.disconnect_peer(&peer);
}
}
Expand Down
96 changes: 83 additions & 13 deletions client/network/src/protocol/block_requests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,32 +65,52 @@ use std::{
task::{Context, Poll}
};
use void::{Void, unreachable};
use wasm_timer::Instant;

// Type alias for convenience.
pub type Error = Box<dyn std::error::Error + 'static>;

/// Event generated by the block requests behaviour.
#[derive(Debug)]
pub enum Event<B: Block> {
/// A request came and we answered it.
AnsweredRequest {
/// Peer which has emitted the request.
peer: PeerId,
/// Time it took to compute the response.
response_build_time: Duration,
},

/// A response to a block request has arrived.
Response {
peer: PeerId,
/// The original request passed to `send_request`.
original_request: message::BlockRequest<B>,
response: message::BlockResponse<B>,
/// Time elapsed between the start of the request and the response.
request_duration: Duration,
},

/// A request has been cancelled because the peer has disconnected.
/// Disconnects can also happen as a result of violating the network protocol.
///
/// > **Note**: This event is NOT emitted if a request is overridden by calling `send_request`.
/// > For that, you must check the value returned by `send_request`.
RequestCancelled {
peer: PeerId,
/// The original request passed to `send_request`.
original_request: message::BlockRequest<B>,
/// Time elapsed between the start of the request and the cancellation.
request_duration: Duration,
},

/// A request has timed out.
RequestTimeout {
peer: PeerId,
/// The original request passed to `send_request`.
original_request: message::BlockRequest<B>,
/// Time elapsed between the start of the request and the timeout.
request_duration: Duration,
}
}

Expand Down Expand Up @@ -184,10 +204,32 @@ struct Connection<B: Block> {

#[derive(Debug)]
struct OngoingRequest<B: Block> {
/// `Instant` when the request has been emitted. Used for diagnostic purposes.
emitted: Instant,
request: message::BlockRequest<B>,
timeout: Delay,
}

/// Outcome of calling `send_request`.
#[derive(Debug)]
#[must_use]
pub enum SendRequestOutcome<B: Block> {
/// Request has been emitted.
Ok,
/// The request has been emitted and has replaced an existing request.
Replaced {
/// The previously-emitted request.
previous: message::BlockRequest<B>,
/// Time that had elapsed since `previous` has been emitted.
request_duration: Duration,
},
/// Didn't start a request because we have no connection to this node.
/// If `send_request` returns that, it is as if the function had never been called.
NotConnected,
/// Error while serializing the request.
EncodeError(prost::EncodeError),
}

impl<B> BlockRequests<B>
where
B: Block,
Expand All @@ -202,13 +244,18 @@ where
}
}

/// Returns the libp2p protocol name used on the wire (e.g. `/foo/sync/2`).
pub fn protocol_name(&self) -> &[u8] {
&self.config.protocol
}

/// Issue a new block request.
///
/// Cancels any existing request targeting the same `PeerId`.
///
/// If the response doesn't arrive in time, or if the remote answers improperly, the target
/// will be disconnected.
pub fn send_request(&mut self, target: &PeerId, req: message::BlockRequest<B>) {
pub fn send_request(&mut self, target: &PeerId, req: message::BlockRequest<B>) -> SendRequestOutcome<B> {
// Determine which connection to send the request to.
let connection = if let Some(peer) = self.peers.get_mut(target) {
// We don't want to have multiple requests for any given node, so in priority try to
Expand All @@ -222,10 +269,10 @@ where
target: "sync",
"State inconsistency: empty list of peer connections"
);
return;
return SendRequestOutcome::NotConnected;
}
} else {
return;
return SendRequestOutcome::NotConnected;
};

let protobuf_rq = api::v1::BlockRequest {
Expand All @@ -252,17 +299,12 @@ where
protobuf_rq,
err
);
return;
return SendRequestOutcome::EncodeError(err);
}

if let Some(rq) = &connection.ongoing_request {
log::debug!(
target: "sync",
"Replacing existing block request on connection {:?}",
connection.id
);
}
let previous_request = connection.ongoing_request.take();
connection.ongoing_request = Some(OngoingRequest {
emitted: Instant::now(),
request: req.clone(),
timeout: Delay::new(self.config.request_timeout),
});
Expand All @@ -278,6 +320,20 @@ where
protocol: self.config.protocol.clone(),
},
});

if let Some(previous_request) = previous_request {
log::debug!(
target: "sync",
"Replacing existing block request on connection {:?}",
connection.id
);
SendRequestOutcome::Replaced {
previous: previous_request.request,
request_duration: previous_request.emitted.elapsed(),
}
} else {
SendRequestOutcome::Ok
}
}

/// Callback, invoked when a new block request has been received from remote.
Expand Down Expand Up @@ -445,6 +501,7 @@ where
let ev = Event::RequestCancelled {
peer: peer_id.clone(),
original_request: ongoing_request.request.clone(),
request_duration: ongoing_request.emitted.elapsed(),
};
self.pending_events.push_back(NetworkBehaviourAction::GenerateEvent(ev));
}
Expand Down Expand Up @@ -476,6 +533,8 @@ where
) {
match node_event {
NodeEvent::Request(request, mut stream) => {
let before_answer_build = Instant::now();

match self.on_block_request(&peer, &request) {
Ok(res) => {
log::trace!(
Expand Down Expand Up @@ -508,18 +567,26 @@ where
"Error handling block request from peer {}: {}", peer, e
)
}

let ev = Event::AnsweredRequest {
peer: peer.clone(),
response_build_time: before_answer_build.elapsed(),
};
self.pending_events.push_back(NetworkBehaviourAction::GenerateEvent(ev));
}
NodeEvent::Response(original_request, response) => {
log::trace!(
target: "sync",
"Received block response from peer {} with {} blocks",
peer, response.blocks.len()
);
if let Some(connections) = self.peers.get_mut(&peer) {
let request_duration = if let Some(connections) = self.peers.get_mut(&peer) {
if let Some(connection) = connections.iter_mut().find(|c| c.id == connection_id) {
if let Some(ongoing_request) = &mut connection.ongoing_request {
if ongoing_request.request == original_request {
let request_duration = ongoing_request.emitted.elapsed();
connection.ongoing_request = None;
request_duration
} else {
// We're no longer interested in that request.
log::debug!(
Expand Down Expand Up @@ -550,7 +617,7 @@ where
peer
);
return;
}
};

let blocks = response.blocks.into_iter().map(|block_data| {
Ok(message::BlockData::<B> {
Expand Down Expand Up @@ -594,6 +661,7 @@ where
peer,
original_request,
response: message::BlockResponse::<B> { id, blocks },
request_duration,
};
self.pending_events.push_back(NetworkBehaviourAction::GenerateEvent(ev));
}
Expand Down Expand Up @@ -625,6 +693,7 @@ where

if let Poll::Ready(_) = Pin::new(&mut ongoing_request.timeout).poll(cx) {
let original_request = ongoing_request.request.clone();
let request_duration = ongoing_request.emitted.elapsed();
connection.ongoing_request = None;
log::debug!(
target: "sync",
Expand All @@ -634,6 +703,7 @@ where
let ev = Event::RequestTimeout {
peer: peer.clone(),
original_request,
request_duration,
};
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(ev));
}
Expand Down
Loading

0 comments on commit 0f46400

Please sign in to comment.