Skip to content

Commit

Permalink
feat(gossipsub): introduce backpressure
Browse files Browse the repository at this point in the history
  • Loading branch information
jxs committed Nov 22, 2023
1 parent bb2b798 commit a85f1f8
Show file tree
Hide file tree
Showing 7 changed files with 582 additions and 400 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions protocols/gossipsub/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ sha2 = "0.10.8"
smallvec = "1.11.2"
tracing = "0.1.37"
void = "1.0.2"
async-channel = "1.9.0"

# Metrics dependencies
prometheus-client = { workspace = true }
Expand Down
79 changes: 56 additions & 23 deletions protocols/gossipsub/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,6 @@ use libp2p_swarm::{
THandlerOutEvent, ToSwarm,
};

use crate::backoff::BackoffStorage;
use crate::config::{Config, ValidationMode};
use crate::gossip_promises::GossipPromises;
use crate::handler::{Handler, HandlerEvent, HandlerIn};
use crate::mcache::MessageCache;
Expand All @@ -62,6 +60,11 @@ use crate::types::{
SubscriptionAction,
};
use crate::types::{PeerConnections, PeerKind, RpcOut};
use crate::{backoff::BackoffStorage, types::RpcSender};
use crate::{
config::{Config, ValidationMode},
types::rpc_channel,
};
use crate::{rpc_proto::proto, TopicScoreParams};
use crate::{PublishError, SubscriptionError, ValidationError};
use instant::SystemTime;
Expand Down Expand Up @@ -332,6 +335,9 @@ pub struct Behaviour<D = IdentityTransform, F = AllowAllSubscriptionFilter> {

/// Keep track of a set of internal metrics relating to gossipsub.
metrics: Option<Metrics>,

/// Connection handler message queue channels.
handler_send_queues: HashMap<PeerId, RpcSender>,
}

impl<D, F> Behaviour<D, F>
Expand Down Expand Up @@ -471,6 +477,7 @@ where
config,
subscription_filter,
data_transform,
handler_send_queues: Default::default(),
})
}
}
Expand Down Expand Up @@ -537,7 +544,8 @@ where
for peer in self.peer_topics.keys().copied().collect::<Vec<_>>() {
tracing::debug!(%peer, "Sending SUBSCRIBE to peer");
let event = RpcOut::Subscribe(topic_hash.clone());
self.send_message(peer, event);
self.send_message(peer, event)
.expect("Subscribe messages should be always sent");
}

// call JOIN(topic)
Expand All @@ -564,7 +572,8 @@ where
for peer in self.peer_topics.keys().copied().collect::<Vec<_>>() {
tracing::debug!(%peer, "Sending UNSUBSCRIBE to peer");
let event = RpcOut::Unsubscribe(topic_hash.clone());
self.send_message(peer, event);
self.send_message(peer, event)
.expect("Subscribe messages should be always sent");
}

// call LEAVE(topic)
Expand Down Expand Up @@ -711,9 +720,18 @@ where
}

// Send to peers we know are subscribed to the topic.
let mut errors = 0;
for peer_id in recipient_peers.iter() {
tracing::trace!(peer=%peer_id, "Sending message to peer");
self.send_message(*peer_id, RpcOut::Publish(raw_message.clone()));
if self
.send_message(*peer_id, RpcOut::Publish(raw_message.clone()))
.is_err()
{
errors += 1;
}
}
if errors == recipient_peers.len() {
return Err(PublishError::InsufficientPeers);
}

tracing::debug!(message=%msg_id, "Published message");
Expand Down Expand Up @@ -1311,7 +1329,7 @@ where
);
} else {
tracing::debug!(peer=%peer_id, "IWANT: Sending cached messages to peer");
self.send_message(*peer_id, RpcOut::Forward(msg));
let _ = self.send_message(*peer_id, RpcOut::Forward(msg));
}
}
}
Expand Down Expand Up @@ -1469,7 +1487,8 @@ where
.map(|t| self.make_prune(t, peer_id, do_px, on_unsubscribe))
.collect::<Vec<_>>()
{
self.send_message(*peer_id, RpcOut::Control(action));
self.send_message(*peer_id, RpcOut::Control(action))
.expect("PRUNE messages should always be sent");
}
// Send the prune messages to the peer
tracing::debug!(
Expand Down Expand Up @@ -1970,6 +1989,7 @@ where
.collect::<Vec<_>>()
{
self.send_message(*propagation_source, RpcOut::Control(action))
.expect("GRAFT messages should always be sent");
}

// Notify the application of the subscriptions
Expand Down Expand Up @@ -2520,7 +2540,8 @@ where

// send the control messages
for msg in control_msgs.chain(prunes).collect::<Vec<_>>() {
self.send_message(peer, RpcOut::Control(msg));
self.send_message(peer, RpcOut::Control(msg))
.expect("PRUNE messages should always be sent");
}
}

Expand All @@ -2534,7 +2555,8 @@ where
self.config.do_px() && !no_px.contains(peer),
false,
);
self.send_message(*peer, RpcOut::Control(prune));
self.send_message(*peer, RpcOut::Control(prune))
.expect("PRUNE messages should always be sent");

// inform the handler
peer_removed_from_mesh(
Expand Down Expand Up @@ -2606,7 +2628,7 @@ where

for peer in recipient_peers.iter() {
tracing::debug!(%peer, message=%msg_id, "Sending message to peer");
self.send_message(*peer, event.clone());
let _ = self.send_message(*peer, event.clone());
}
tracing::debug!("Completed forwarding message");
Ok(true)
Expand Down Expand Up @@ -2720,7 +2742,7 @@ where
fn flush_control_pool(&mut self) {
for (peer, controls) in self.control_pool.drain().collect::<Vec<_>>() {
for msg in controls {
self.send_message(peer, RpcOut::Control(msg));
let _ = self.send_message(peer, RpcOut::Control(msg));
}
}

Expand All @@ -2730,19 +2752,24 @@ where

/// Send a [`RpcOut`] message to a peer. This will wrap the message in an arc if it
/// is not already an arc.
fn send_message(&mut self, peer_id: PeerId, rpc: RpcOut) {
fn send_message(&mut self, peer_id: PeerId, rpc: RpcOut) -> Result<(), ()> {
let sender = self
.handler_send_queues
.get_mut(&peer_id)
.expect("Peerid should exist");

if sender.try_send(rpc.clone()).is_err() {
tracing::debug!(peer=%peer_id, "Dropping message as peer is full");
return Err(());
}

if let Some(m) = self.metrics.as_mut() {
if let RpcOut::Publish(ref message) | RpcOut::Forward(ref message) = rpc {
// register bytes sent on the internal metrics.
m.msg_sent(&message.topic, message.raw_protobuf_len());
}
}

self.events.push_back(ToSwarm::NotifyHandler {
peer_id,
event: HandlerIn::Message(rpc),
handler: NotifyHandler::Any,
});
Ok(())
}

fn on_connection_established(
Expand Down Expand Up @@ -2811,7 +2838,8 @@ where
tracing::debug!(peer=%peer_id, "New peer connected");
// We need to send our subscriptions to the newly-connected node.
for topic_hash in self.mesh.clone().into_keys() {
self.send_message(peer_id, RpcOut::Subscribe(topic_hash));
self.send_message(peer_id, RpcOut::Subscribe(topic_hash))
.expect("Subscribe messages should be always sent");
}
}

Expand Down Expand Up @@ -2939,6 +2967,7 @@ where
}

self.connected_peers.remove(&peer_id);
self.handler_send_queues.remove(&peer_id);

if let Some((peer_score, ..)) = &mut self.peer_score {
peer_score.remove_peer(&peer_id);
Expand Down Expand Up @@ -2998,21 +3027,25 @@ where
fn handle_established_inbound_connection(
&mut self,
_: ConnectionId,
_: PeerId,
peer_id: PeerId,
_: &Multiaddr,
_: &Multiaddr,
) -> Result<THandler<Self>, ConnectionDenied> {
Ok(Handler::new(self.config.protocol_config()))
let (sender, receiver) = rpc_channel(self.config.connection_handler_queue_len());
self.handler_send_queues.insert(peer_id, sender);
Ok(Handler::new(self.config.protocol_config(), receiver))
}

fn handle_established_outbound_connection(
&mut self,
_: ConnectionId,
_: PeerId,
peer_id: PeerId,
_: &Multiaddr,
_: Endpoint,
) -> Result<THandler<Self>, ConnectionDenied> {
Ok(Handler::new(self.config.protocol_config()))
let (sender, receiver) = rpc_channel(self.config.connection_handler_queue_len());
self.handler_send_queues.insert(peer_id, sender);
Ok(Handler::new(self.config.protocol_config(), receiver))
}

fn on_connection_handler_event(
Expand Down
Loading

0 comments on commit a85f1f8

Please sign in to comment.