Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Add metrics about block requests #5811

Merged
merged 2 commits into from
Apr 28, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 79 additions & 6 deletions client/network/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use libp2p::swarm::{NetworkBehaviourAction, NetworkBehaviourEventProcess, PollPa
use log::debug;
use sp_consensus::{BlockOrigin, import_queue::{IncomingBlock, Origin}};
use sp_runtime::{traits::{Block as BlockT, NumberFor}, ConsensusEngineId, Justification};
use std::{borrow::Cow, iter, task::Context, task::Poll};
use std::{borrow::Cow, iter, task::{Context, Poll}, time::Duration};
use void;

/// General behaviour of the network. Combines all protocols together.
Expand Down Expand Up @@ -67,8 +67,39 @@ pub enum BehaviourOut<B: BlockT> {
BlockImport(BlockOrigin, Vec<IncomingBlock<B>>),
JustificationImport(Origin, B::Hash, NumberFor<B>, Justification),
FinalityProofImport(Origin, B::Hash, NumberFor<B>, Vec<u8>),
/// Started a random Kademlia discovery query.

/// Started a random iterative Kademlia discovery query.
RandomKademliaStarted(ProtocolId),

/// We have received a request from a peer and answered it.
AnsweredRequest {
/// Peer which sent us a request.
peer: PeerId,
/// Protocol name of the request.
protocol: Vec<u8>,
/// Time it took to build the response.
build_time: Duration,
},
/// Started a new request with the given node.
RequestStarted {
peer: PeerId,
/// Protocol name of the request.
protocol: Vec<u8>,
},
/// Finished, successfully or not, a previously-started request.
RequestFinished {
/// Who we were requesting.
peer: PeerId,
/// Protocol name of the request.
protocol: Vec<u8>,
/// How long before the response came or the request got cancelled.
request_duration: Duration,
},

/// Any event represented by the [`Event`] enum.
///
/// > **Note**: The [`Event`] enum contains the events that are available through the public
/// > API of the library.
Event(Event),
}

Expand Down Expand Up @@ -220,7 +251,27 @@ Behaviour<B, H> {
CustomMessageOutcome::FinalityProofImport(origin, hash, nb, proof) =>
self.events.push(BehaviourOut::FinalityProofImport(origin, hash, nb, proof)),
CustomMessageOutcome::BlockRequest { target, request } => {
self.block_requests.send_request(&target, request);
match self.block_requests.send_request(&target, request) {
block_requests::SendRequestOutcome::Ok => {
self.events.push(BehaviourOut::RequestStarted {
peer: target,
protocol: self.block_requests.protocol_name().to_vec(),
});
},
block_requests::SendRequestOutcome::Replaced { request_duration, .. } => {
self.events.push(BehaviourOut::RequestFinished {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could deviate the finished histogram quite a bit as requests might take < 1ms, right? Would it make sense to differentiate between failure and success in the histogram?

Feel free to ignore.

peer: target.clone(),
protocol: self.block_requests.protocol_name().to_vec(),
request_duration,
});
self.events.push(BehaviourOut::RequestStarted {
peer: target,
protocol: self.block_requests.protocol_name().to_vec(),
});
}
block_requests::SendRequestOutcome::NotConnected |
block_requests::SendRequestOutcome::EncodeError(_) => {},
}
},
CustomMessageOutcome::FinalityProofRequest { target, block_hash, request } => {
self.finality_proof_requests.send_request(&target, block_hash, request);
Expand Down Expand Up @@ -257,18 +308,40 @@ Behaviour<B, H> {
impl<B: BlockT, H: ExHashT> NetworkBehaviourEventProcess<block_requests::Event<B>> for Behaviour<B, H> {
fn inject_event(&mut self, event: block_requests::Event<B>) {
match event {
block_requests::Event::Response { peer, original_request, response } => {
block_requests::Event::AnsweredRequest { peer, response_build_time } => {
self.events.push(BehaviourOut::AnsweredRequest {
peer,
protocol: self.block_requests.protocol_name().to_vec(),
build_time: response_build_time,
});
},
block_requests::Event::Response { peer, original_request, response, request_duration } => {
self.events.push(BehaviourOut::RequestFinished {
peer: peer.clone(),
protocol: self.block_requests.protocol_name().to_vec(),
request_duration,
});
let ev = self.substrate.on_block_response(peer, original_request, response);
self.inject_event(ev);
}
block_requests::Event::RequestCancelled { .. } => {
block_requests::Event::RequestCancelled { peer, request_duration, .. } => {
// There doesn't exist any mechanism to report cancellations yet.
// We would normally disconnect the node, but this event happens as the result of
// a disconnect, so there's nothing more to do.
self.events.push(BehaviourOut::RequestFinished {
peer,
protocol: self.block_requests.protocol_name().to_vec(),
request_duration,
});
}
block_requests::Event::RequestTimeout { peer, .. } => {
block_requests::Event::RequestTimeout { peer, request_duration, .. } => {
// There doesn't exist any mechanism to report timeouts yet, so we process them by
// disconnecting the node.
self.events.push(BehaviourOut::RequestFinished {
peer: peer.clone(),
protocol: self.block_requests.protocol_name().to_vec(),
request_duration,
});
self.substrate.disconnect_peer(&peer);
}
}
Expand Down
97 changes: 84 additions & 13 deletions client/network/src/protocol/block_requests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,32 +65,52 @@ use std::{
task::{Context, Poll}
};
use void::{Void, unreachable};
use wasm_timer::Instant;

// Type alias for convenience.
pub type Error = Box<dyn std::error::Error + 'static>;

/// Event generated by the block requests behaviour.
#[derive(Debug)]
pub enum Event<B: Block> {
/// A request came and we answered it.
AnsweredRequest {
/// Peer which has emitted the request.
peer: PeerId,
/// Time it took to compute the response.
response_build_time: Duration,
},

/// A response to a block request has arrived.
Response {
peer: PeerId,
/// The original request passed to `send_request`.
original_request: message::BlockRequest<B>,
response: message::BlockResponse<B>,
/// Time elapsed between the start of the request and the response.
request_duration: Duration,
},

/// A request has been cancelled because the peer has disconnected.
/// Disconnects can also happen as a result of violating the network protocol.
///
/// > **Note**: This event is NOT emitted if a request is overridden by calling `send_request`.
/// > For that, you must check the value returned by `send_request`.
RequestCancelled {
peer: PeerId,
/// The original request passed to `send_request`.
original_request: message::BlockRequest<B>,
/// Time elapsed between the start of the request and the cancellation.
request_duration: Duration,
},

/// A request has timed out.
RequestTimeout {
peer: PeerId,
/// The original request passed to `send_request`.
original_request: message::BlockRequest<B>,
/// Time elapsed between the start of the request and the timeout.
request_duration: Duration,
}
}

Expand Down Expand Up @@ -184,10 +204,32 @@ struct Connection<B: Block> {

#[derive(Debug)]
struct OngoingRequest<B: Block> {
/// `Instant` when the request has been emitted. Used for diagnostic purposes.
emitted: Instant,
request: message::BlockRequest<B>,
timeout: Delay,
}

/// Outcome of calling `send_request`.
#[derive(Debug)]
#[must_use]
pub enum SendRequestOutcome<B: Block> {
/// Request has been emitted.
Ok,
/// The request has been emitted and has replaced an existing request.
Replaced {
/// The previously-emitted request.
previous: message::BlockRequest<B>,
/// Time that had elapsed since `previous` has been emitted.
request_duration: Duration,
},
/// Didn't start a request because we have no connection to this node.
/// If `send_request` returns that, it is as if the function had never been called.
NotConnected,
/// Error while serializing the request.
EncodeError(prost::EncodeError),
}

impl<B> BlockRequests<B>
where
B: Block,
Expand All @@ -202,13 +244,18 @@ where
}
}

/// Returns the libp2p protocol name used on the wire (e.g. `/foo/sync/2`).
pub fn protocol_name(&self) -> &[u8] {
&self.config.protocol
}

/// Issue a new block request.
///
/// Cancels any existing request targeting the same `PeerId`.
///
/// If the response doesn't arrive in time, or if the remote answers improperly, the target
/// will be disconnected.
pub fn send_request(&mut self, target: &PeerId, req: message::BlockRequest<B>) {
pub fn send_request(&mut self, target: &PeerId, req: message::BlockRequest<B>) -> SendRequestOutcome<B> {
// Determine which connection to send the request to.
let connection = if let Some(peer) = self.peers.get_mut(target) {
// We don't want to have multiple requests for any given node, so in priority try to
Expand All @@ -222,10 +269,10 @@ where
target: "sync",
"State inconsistency: empty list of peer connections"
);
return;
return SendRequestOutcome::NotConnected;
}
} else {
return;
return SendRequestOutcome::NotConnected;
};

let protobuf_rq = api::v1::BlockRequest {
Expand All @@ -252,17 +299,12 @@ where
protobuf_rq,
err
);
return;
return SendRequestOutcome::EncodeError(err);
}

if let Some(rq) = &connection.ongoing_request {
log::debug!(
target: "sync",
"Replacing existing block request on connection {:?}",
connection.id
);
}
let previous_request = connection.ongoing_request.take();
connection.ongoing_request = Some(OngoingRequest {
emitted: Instant::now(),
request: req.clone(),
timeout: Delay::new(self.config.request_timeout),
});
Expand All @@ -278,6 +320,21 @@ where
protocol: self.config.protocol.clone(),
},
});

if let Some(previous_request) = previous_request {
log::debug!(
target: "sync",
"Replacing existing block request on connection {:?}",
connection.id
);
SendRequestOutcome::Replaced {
previous: previous_request.request,
request_duration: previous_request.emitted.elapsed(),
}

gavofyork marked this conversation as resolved.
Show resolved Hide resolved
} else {
SendRequestOutcome::Ok
}
}

/// Callback, invoked when a new block request has been received from remote.
Expand Down Expand Up @@ -445,6 +502,7 @@ where
let ev = Event::RequestCancelled {
peer: peer_id.clone(),
original_request: ongoing_request.request.clone(),
request_duration: ongoing_request.emitted.elapsed(),
};
self.pending_events.push_back(NetworkBehaviourAction::GenerateEvent(ev));
}
Expand Down Expand Up @@ -476,6 +534,8 @@ where
) {
match node_event {
NodeEvent::Request(request, mut stream) => {
let before_answer_build = Instant::now();

match self.on_block_request(&peer, &request) {
Ok(res) => {
log::trace!(
Expand Down Expand Up @@ -508,18 +568,26 @@ where
"Error handling block request from peer {}: {}", peer, e
)
}

let ev = Event::AnsweredRequest {
peer: peer.clone(),
response_build_time: before_answer_build.elapsed(),
};
self.pending_events.push_back(NetworkBehaviourAction::GenerateEvent(ev));
}
NodeEvent::Response(original_request, response) => {
log::trace!(
target: "sync",
"Received block response from peer {} with {} blocks",
peer, response.blocks.len()
);
if let Some(connections) = self.peers.get_mut(&peer) {
let request_duration = if let Some(connections) = self.peers.get_mut(&peer) {
if let Some(connection) = connections.iter_mut().find(|c| c.id == connection_id) {
if let Some(ongoing_request) = &mut connection.ongoing_request {
if ongoing_request.request == original_request {
let request_duration = ongoing_request.emitted.elapsed();
connection.ongoing_request = None;
request_duration
} else {
// We're no longer interested in that request.
log::debug!(
Expand Down Expand Up @@ -550,7 +618,7 @@ where
peer
);
return;
}
};

let blocks = response.blocks.into_iter().map(|block_data| {
Ok(message::BlockData::<B> {
Expand Down Expand Up @@ -594,6 +662,7 @@ where
peer,
original_request,
response: message::BlockResponse::<B> { id, blocks },
request_duration,
};
self.pending_events.push_back(NetworkBehaviourAction::GenerateEvent(ev));
}
Expand Down Expand Up @@ -625,6 +694,7 @@ where

if let Poll::Ready(_) = Pin::new(&mut ongoing_request.timeout).poll(cx) {
let original_request = ongoing_request.request.clone();
let request_duration = ongoing_request.emitted.elapsed();
connection.ongoing_request = None;
log::debug!(
target: "sync",
Expand All @@ -634,6 +704,7 @@ where
let ev = Event::RequestTimeout {
peer: peer.clone(),
original_request,
request_duration,
};
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(ev));
}
Expand Down
Loading