Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

sp-utils: tracing bounded channels #6609

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions primitives/utils/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ use prometheus::{
#[cfg(feature = "metered")]
use prometheus::{core::GenericCounterVec, Opts};


lazy_static! {
pub static ref TOKIO_THREADS_TOTAL: GenericCounter<AtomicU64> = GenericCounter::new(
"tokio_threads_total", "Total number of threads created"
Expand All @@ -44,9 +43,12 @@ lazy_static! {
&["entity", "action"] // 'name of channel, send|received|dropped
).expect("Creating of statics doesn't fail. qed");

pub static ref CHANNELS_COUNTER : GenericCounterVec<AtomicU64> = GenericCounterVec::new(
Opts::new("channel_len", "Items in each mpsc::channel instance"),
&["entity", "action"] // 'name of channel, send|received|dropped
).expect("Creating of statics doesn't fail. qed");
}


/// Register the statics to report to registry
pub fn register_globals(registry: &Registry) -> Result<(), PrometheusError> {
registry.register(Box::new(TOKIO_THREADS_ALIVE.clone()))?;
Expand Down
232 changes: 212 additions & 20 deletions primitives/utils/src/mpsc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,52 +18,93 @@
//! Features to meter unbounded channels

#[cfg(not(feature = "metered"))]
/// Just aliased, non performance implications for mpsc.
mod inner {
// just aliased, non performance implications
use futures::channel::mpsc::{self, UnboundedReceiver, UnboundedSender};
use futures::channel::mpsc::{self, UnboundedReceiver, UnboundedSender, Sender, Receiver};

/// Alias to `mpsc::UnboundedSender`.
pub type TracingUnboundedSender<T> = UnboundedSender<T>;
/// Alias to `mpsc::UnboundedReceiver`.
pub type TracingUnboundedReceiver<T> = UnboundedReceiver<T>;

/// Alias `mpsc::unbounded`
pub fn tracing_unbounded<T>(_key: &'static str) ->(TracingUnboundedSender<T>, TracingUnboundedReceiver<T>) {
/// Alias to `mpsc::Sender`.
pub type TracingSender<T> = Sender<T>;
/// Alias to `mpsc::Receiver`.
pub type TracingReceiver<T> = Receiver<T>;

/// Alias to `mpsc::unbounded`.
pub fn tracing_unbounded<T>(
_key: &'static str
) -> (TracingUnboundedSender<T>, TracingUnboundedReceiver<T>) {
mpsc::unbounded()
}
}

/// Alias to `mpsc::channel`.
pub fn tracing_channel<T>(
_key: &'static str,
buffer: usize
) -> (TracingSender<T>, TracingReceiver<T>) {
mpsc::channel(buffer)
}
}

#[cfg(feature = "metered")]
/// Tracing implementation for mpsc.
mod inner {
//tracing implementation
use futures::channel::mpsc::{self,
UnboundedReceiver, UnboundedSender,
UnboundedReceiver, UnboundedSender, Sender, Receiver,
TryRecvError, TrySendError, SendError
};
use futures::{sink::Sink, task::{Poll, Context}, stream::{Stream, FusedStream}};
use std::pin::Pin;
use crate::metrics::UNBOUNDED_CHANNELS_COUNTER;
use crate::metrics::{UNBOUNDED_CHANNELS_COUNTER, CHANNELS_COUNTER};

/// Wrapper Type around `UnboundedSender` that increases the global
/// measure when a message is added
/// Wrapper type around `UnboundedSender` that increases the global measure when a message is
/// added.
#[derive(Debug)]
pub struct TracingUnboundedSender<T>(&'static str, UnboundedSender<T>);

// Strangely, deriving `Clone` requires that `T` is also `Clone`.
impl<T> Clone for TracingUnboundedSender<T> {
fn clone(&self) -> Self {
Self(self.0, self.1.clone())
}
}

/// Wrapper Type around `UnboundedReceiver` that decreases the global
/// measure when a message is polled
/// Wrapper type around `Sender` that increases the global measure when a message is added.
#[derive(Debug)]
pub struct TracingSender<T>(&'static str, Sender<T>);

impl<T> Clone for TracingSender<T> {
fn clone(&self) -> Self {
Self(self.0, self.1.clone())
}
}

/// Wrapper type around `UnboundedReceiver` that decreases the global measure when a message is
/// polled.
#[derive(Debug)]
pub struct TracingUnboundedReceiver<T>(&'static str, UnboundedReceiver<T>);

/// Wrapper type around `Receiver` that decreases the global measure when a message is
/// polled.
#[derive(Debug)]
pub struct TracingReceiver<T>(&'static str, Receiver<T>);

/// Wrapper around `mpsc::unbounded` that tracks the in- and outflow via
/// `UNBOUNDED_CHANNELS_COUNTER`
pub fn tracing_unbounded<T>(key: &'static str) ->(TracingUnboundedSender<T>, TracingUnboundedReceiver<T>) {
/// `UNBOUNDED_CHANNELS_COUNTER`.
pub fn tracing_unbounded<T>(
key: &'static str
) -> (TracingUnboundedSender<T>, TracingUnboundedReceiver<T>) {
let (s, r) = mpsc::unbounded();
(TracingUnboundedSender(key.clone(), s), TracingUnboundedReceiver(key,r))
(TracingUnboundedSender(key, s), TracingUnboundedReceiver(key, r))
}

/// Wrapper around `mpsc::channel` that tracks the in- and outflow via `CHANNELS_COUNTER`.
pub fn tracing_channel<T>(
key: &'static str,
buffer: usize
) -> (TracingSender<T>, TracingReceiver<T>) {
let (s, r) = mpsc::channel(buffer);
(TracingSender(key, s), TracingReceiver(key, r))
}

impl<T> TracingUnboundedSender<T> {
Expand Down Expand Up @@ -106,8 +147,47 @@ mod inner {
}
}

impl<T> TracingUnboundedReceiver<T> {
impl<T> TracingSender<T> {
/// Proxy function to mpsc::Sender
pub fn poll_ready(&mut self, ctx: &mut Context) -> Poll<Result<(), SendError>> {
self.1.poll_ready(ctx)
}

/// Proxy function to mpsc::Sender
pub fn is_closed(&self) -> bool {
self.1.is_closed()
}

/// Proxy function to mpsc::Sender
pub fn close_channel(&mut self) {
self.1.close_channel()
}

/// Proxy function to mpsc::Sender
pub fn disconnect(&mut self) {
self.1.disconnect()
}

/// Proxy function to mpsc::Sender
pub fn start_send(&mut self, msg: T) -> Result<(), SendError> {
self.1.start_send(msg)
}

/// Proxy function to mpsc::Sender
pub fn try_send(&mut self, msg: T) -> Result<(), TrySendError<T>> {
self.1.try_send(msg).map(|s| {
CHANNELS_COUNTER.with_label_values(&[self.0, &"send"]).inc();
s
})
}

/// Proxy function to mpsc::Sender
pub fn same_receiver(&self, other: &Sender<T>) -> bool {
self.1.same_receiver(other)
}
}

impl<T> TracingUnboundedReceiver<T> {
fn consume(&mut self) {
// consume all items, make sure to reflect the updated count
let mut count = 0;
Expand Down Expand Up @@ -147,13 +227,61 @@ mod inner {
}
}

impl<T> TracingReceiver<T> {
fn consume(&mut self) {
// consume all items, make sure to reflect the updated count
let mut count = 0;
loop {
if self.1.is_terminated() {
break;
}

match self.try_next() {
Ok(Some(..)) => count += 1,
_ => break
}
}
// and discount the messages
if count > 0 {
CHANNELS_COUNTER.with_label_values(&[self.0, &"dropped"]).inc_by(count);
}

}

/// Proxy function to mpsc::Receiver
/// that consumes all messages first and updates the counter
pub fn close(&mut self) {
self.consume();
self.1.close()
}

/// Proxy function to mpsc::Receiver
/// that discounts the messages taken out
pub fn try_next(&mut self) -> Result<Option<T>, TryRecvError> {
self.1.try_next().map(|s| {
if s.is_some() {
CHANNELS_COUNTER.with_label_values(&[self.0, &"received"]).inc();
}
s
})
}
}

impl<T> Drop for TracingUnboundedReceiver<T> {
fn drop(&mut self) {
self.consume();
}
}

impl<T> Unpin for TracingUnboundedReceiver<T> {}
impl<T> Drop for TracingReceiver<T> {
fn drop(&mut self) {
self.consume();
}
}

impl<T> Unpin for TracingUnboundedReceiver<T> { }

impl<T> Unpin for TracingReceiver<T> { }

impl<T> Stream for TracingUnboundedReceiver<T> {
type Item = T;
Expand All @@ -177,12 +305,40 @@ mod inner {
}
}

impl<T> Stream for TracingReceiver<T> {
type Item = T;

fn poll_next(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<T>> {
let s = self.get_mut();
match Pin::new(&mut s.1).poll_next(cx) {
Poll::Ready(msg) => {
if msg.is_some() {
CHANNELS_COUNTER.with_label_values(&[s.0, "received"]).inc();
}
Poll::Ready(msg)
}
Poll::Pending => {
Poll::Pending
}
}
}
}

impl<T> FusedStream for TracingUnboundedReceiver<T> {
fn is_terminated(&self) -> bool {
self.1.is_terminated()
}
}

impl<T> FusedStream for TracingReceiver<T> {
fn is_terminated(&self) -> bool {
self.1.is_terminated()
}
}

impl<T> Sink<T> for TracingUnboundedSender<T> {
type Error = SendError;

Expand Down Expand Up @@ -216,6 +372,39 @@ mod inner {
}
}

impl<T> Sink<T> for TracingSender<T> {
type Error = SendError;

fn poll_ready(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<(), Self::Error>> {
TracingSender::poll_ready(&mut *self, cx)
}

fn start_send(
mut self: Pin<&mut Self>,
msg: T,
) -> Result<(), Self::Error> {
TracingSender::start_send(&mut *self, msg)
}

fn poll_flush(
self: Pin<&mut Self>,
_: &mut Context<'_>,
) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}

fn poll_close(
mut self: Pin<&mut Self>,
_: &mut Context<'_>,
) -> Poll<Result<(), Self::Error>> {
self.disconnect();
Poll::Ready(Ok(()))
}
}

impl<T> Sink<T> for &TracingUnboundedSender<T> {
type Error = SendError;

Expand Down Expand Up @@ -248,4 +437,7 @@ mod inner {
}
}

pub use inner::{tracing_unbounded, TracingUnboundedSender, TracingUnboundedReceiver};
pub use inner::{
tracing_unbounded, TracingUnboundedSender, TracingUnboundedReceiver,
tracing_channel, TracingSender, TracingReceiver
};