Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(gossipsub): introduce backpressure #4914

Open
wants to merge 13 commits into
base: master
Choose a base branch
from

Conversation

jxs
Copy link
Member

@jxs jxs commented Nov 22, 2023

Follows @mxinden implementation suggestion on #4667 (comment)

Notes & open questions

Publish, GRAFT, PRUNE and Subscription messages are prioritized, Publish messages are cap'ed, using sender.clone().try_send doesn't work as the queue is shared by the Receiver. I used an AtomicUsize for that.
Used async-channel to be able to check if is_empty() in the ConnectionHandler.
Reused the InsufficientPeers peers Error as to not introduce another variant and make the changes breaking, but this is probably a breaking change in the sense that the internal behaviour is changing no?
if this design makes sense I can then add a test to test queues fill.

Change checklist

  • I have performed a self-review of my own code
  • I have made corresponding changes to the documentation
  • I have added tests that prove my fix is effective or that my feature works
  • A changelog entry has been made in the appropriate crates

@jxs jxs force-pushed the gossipsub-backpressure-cont branch from a85f1f8 to a15e72b Compare November 22, 2023 23:39
Copy link
Contributor

@thomaseizinger thomaseizinger left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great start! Left some comments! :)

@@ -332,6 +335,9 @@ pub struct Behaviour<D = IdentityTransform, F = AllowAllSubscriptionFilter> {

/// Keep track of a set of internal metrics relating to gossipsub.
metrics: Option<Metrics>,

/// Connection handler message queue channels.
handler_send_queues: HashMap<PeerId, RpcSender>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we might have to make this by ConnectionId. Or make the value of the hashmap a Vec of RpcSender.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, because ConnectionHandlers may send to different PeerIds during its existence right? But then how do we relate the mesh peers which are identified by PeerIds to ConnectionIds?

Copy link
Member

@mxinden mxinden Nov 23, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because ConnectionHandlers may send to different PeerIds during its existence right?

No. A ConnectionHandler only ever connects the local node to a single remote node (i.e. remote PeerId) during its existence. Though note that the local node might have multiple ConnectionHandlers to a single remote node, each with a different ConnectionId.

I think the problem that Thomas is hinting at, is that we might have multiple connections to a single PeerId. In other words, there might be multiple RpcSenders per PeerId.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for explaining Max, If I understand correctly it's also why Thomas mentioned that we'd need to distinguish between Full and Closed when sending the message to the Receiver so that with the idea of a Vec of RpcSender we try all the ConnectionHandlers right?
If that is so, since async-channel is mpmc ( where each message can be received by only one of all existing consumers) why don't we clone the RpcReceiver so that the first ConnectionHandler reading the message sends it and we assure the channel never gets closed? Presented this idea on dd13fcd

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With the latest refactorings, is this field still needed? The RpcSender is now part of PeerConnections, right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no no, thanks Max updated!

protocols/gossipsub/src/types.rs Outdated Show resolved Hide resolved
protocols/gossipsub/src/config.rs Show resolved Hide resolved
Comment on lines 523 to 524
let (priority_sender, priority_receiver) = async_channel::unbounded();
let (non_priority_sender, non_priority_receiver) = async_channel::bounded(cap / 2);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

async-channel is an MPMC channel but we don't ever .clone() the Receiver, right? So why not use the mpsc channel from futures?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The futures::channel::mpsc implementation gives you one slot per Sender + whatever capacity you initialize the channel with. So by doing sender.clone().try_send, I think we are guaranteed to be able to send a message, even if the Sender is immediately dropped after. Technically, that makes the channel unbounded but if we only do it for Control messages, that shouldn't matter much because they are so small.

I think that is what @mxinden meant in #4667 (comment).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

async-channel is an MPMC channel but we don't ever .clone() the Receiver, right? So why not use the mpsc channel from futures?

because we need to check if the channel is empty on the connection handler when determining if we need to create the outbound stream:

        if !self.send_queue.is_empty()
            && self.outbound_substream.is_none()
            && !self.outbound_substream_establishing
        {

handler.rs:231

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can wrap the Receiver in a Peekable and check if it has an item: https://docs.rs/futures/latest/futures/stream/trait.StreamExt.html#method.peekable

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks Thomas I wasn't aware of this! Nor of the property Max mentioned regarding the Sender clone. See my question regarding MPMC on #4914 (comment)

Comment on lines 546 to 548
let event = RpcOut::Subscribe(topic_hash.clone());
self.send_message(peer, event);
self.send_message(peer, event)
.expect("Subscribe messages should be always sent");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be useful if we'd split RpcOut to avoid this. Something like:

  • ControlRpcOut
  • Publish
  • Forward

And then we can have:

fn send_control_message(); 					// Never fails
fn send_publish() -> Result<(), ...>;		// Can fail
fn send_forward(); 							// Never fails but pushes into a different channel

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah thought about it, but ControlAction has different types of priorities, IHAVE/IWANT are low and GRAFT/PRUNE are high priority, all of them are under the ControlAction enum as struct variants. We could split them into isolated structures to then use both on PriorityRpcOut and and RpcOut so that we could do:

fn prune(&mut self, graft: Graft);
fn graft(&mut self, graft: Graft);

but ControlAction is pub so that would be a breaking change

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but ControlAction has different types of priorities, IHAVE/IWANT are low and GRAFT/PRUNE are high priority

Why is this distinction relevant?

The goal of this pull request is to prevent unbounded growth of send_queue. Neither IHAVE/IWANT nor GRAFT/PRUNE are large. Correct me if I am wrong. Thus differentiating among the two is not relevant for this pull request.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was following @AgeManning on #4667 (comment) but you are probably right Max, updated the code to implement Thomas suggested methods into RpcSender which allow us to not split RpcOut and meanwhile also remove send_message and the double iterations on Behaviour, ptal Thomas

Copy link
Contributor

@AgeManning AgeManning Nov 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the distinction is important.

GRAFT/PRUNE are very important in maintaining the overlay mesh network. If we drop these messages we could break the whole network, i.e nodes could start randomly having broken links and will not know to find new mesh peers and messages can stop being forwarded through the network.

IHAVE/IWANT are very low priority. In fact if we are struggling with send/receiving messages we probably want to drop these along with forward messages. Although the messages themselves are fairly small (but we can send a large number of message-ids, like a few thousand (configurable)), they both induce a greater message load. IHAVE can induce IWANT requests from other peers, which ends up making us publish more messages (which we don't want if we are struggling). IWANT will request more messages to be sent to us and further consume bandwidth. An IWANT can request a fair amount more messages (up to 500 in lighthhouse).

For these reasons, I think if we are in the unhappy case where we are struggling, we want to drop IHAVE/IWANT and forward messages, but we definitely do not want to drop GRAFT/PRUNE if we can help it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I write this, we probably also want to make sure that messages that we respond to an IWANT with, gets sent as a "forward" message and not a "publish" message.
If it is sent too late, we will have already broken our promise, as they are time-bound anyway. We want to remove these from sending if we can't send them in time.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Distinguishing in GRAFT/PRUNE and IHAVE/IWANT makes sense to me. Thank you for expanding on it.

That said, I suggest not enlarging the scope of this pull request. I suggest to fix the unbounded send_queue problem in this pull request (#4667). Once merged I suggest following up with one or more pull requests introducing various optimizations, e.g. time based prioritization of forward messages, differentiation of GRAFT/PRUNE and IHAVE/IWANT, ...

I won't block this pull request in case you want to do all of this in one. Though I expect separate pull requests to be significantly either to implement, review and debug.

Comment on lines 2761 to 2764
if sender.try_send(rpc.clone()).is_err() {
tracing::debug!(peer=%peer_id, "Dropping message as peer is full");
return Err(());
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should differentiate between Closed and Full here and perhaps return the message back to the caller in Err.

Copy link
Member Author

@jxs jxs Nov 23, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but Closed is unreachable no?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not necessarily. A connection runs in a different task and on a multi-threaded executor, this connection could get dropped and thus the receiver freed before we receive the event that the connection is closed (where we'd clean up the state).

It might not matter though for this case so probably fine :)

tracing::debug!(peer=%peer_id, "Dropping message as peer is full");
return Err(());
}

if let Some(m) = self.metrics.as_mut() {
if let RpcOut::Publish(ref message) | RpcOut::Forward(ref message) = rpc {
// register bytes sent on the internal metrics.
m.msg_sent(&message.topic, message.raw_protobuf_len());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you compute this before we use try_send, you can avoid the .clone() on the message.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but if the send fails when we are adding wrong metrics data as the message was not actually sent no?

protocols/gossipsub/src/behaviour.rs Outdated Show resolved Hide resolved
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ugh, this is so painful ...

We should really rewrite these tests to go against the Swarm API using libp2p-swarm-test. Changing tests AND implementation in one PR makes me a lot less confident in a green CI.

There are a lot of tests to change. Can we perhaps make this a group effort? (cc @mxinden @AgeManning)

It probably makes sense to first rewrite a few in order to create some utility functions for setting up networks. But once those are in place, perhaps we can chip away at them together? There are 85 tests to change ..

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also don't want to blow the scope of this work too much. It is just that reviewing all the changes to these tests also takes a lot of time but it is an effort that we aren't going to benefit from in the future.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah agree Thomas, let me look into that

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah. I'm happy to help out also. My opinion is to try and get these changes in first (we'll be doing fairly extensive tests on live networks as well as small scale simulations) to give us confidence in the changes.
Then in a future PR we re-write all the tests.

self.send_queue.shrink_to_fit();
self.outbound_substream =
Some(OutboundSubstreamState::PendingSend(substream, message));
if let Poll::Ready(Some(message)) = self.send_queue.poll_next_unpin(cx) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we now get rid of OutboundSubstreamState and just use an async move with a loop inside? I guess that would require us to be able to .clone() the Receiver so we can keep one around in case the stream fails and we want to re-establish it.

Probably not worth doing as part of this to keep the scope small but it would be good to debate, why we are retrying to establish the stream if it failed. Any form of loop that retries something is always a bit sus in my eyes. We could as well rely on an upper layer to re-establish the connection. I don't see why trying 5 times to re-establish the stream is fundamentally different to just disabling ourselves after the first one.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah agree we should try it on a subsequent PR

and clone it per ConnectionHandler, this will allow us to always have an open Receiver.
Copy link
Member Author

@jxs jxs left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

did not forget about Thomas remark on https://github.com/libp2p/rust-libp2p/pull/4914/files#r1402807803. I am just trying to close the api so that we know what we need from the updated tests

@@ -332,6 +335,9 @@ pub struct Behaviour<D = IdentityTransform, F = AllowAllSubscriptionFilter> {

/// Keep track of a set of internal metrics relating to gossipsub.
metrics: Option<Metrics>,

/// Connection handler message queue channels.
handler_send_queues: HashMap<PeerId, RpcSender>,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for explaining Max, If I understand correctly it's also why Thomas mentioned that we'd need to distinguish between Full and Closed when sending the message to the Receiver so that with the idea of a Vec of RpcSender we try all the ConnectionHandlers right?
If that is so, since async-channel is mpmc ( where each message can be received by only one of all existing consumers) why don't we clone the RpcReceiver so that the first ConnectionHandler reading the message sends it and we assure the channel never gets closed? Presented this idea on dd13fcd

Comment on lines 546 to 548
let event = RpcOut::Subscribe(topic_hash.clone());
self.send_message(peer, event);
self.send_message(peer, event)
.expect("Subscribe messages should be always sent");
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was following @AgeManning on #4667 (comment) but you are probably right Max, updated the code to implement Thomas suggested methods into RpcSender which allow us to not split RpcOut and meanwhile also remove send_message and the double iterations on Behaviour, ptal Thomas

to allow for better handling of each message send.
@jxs jxs force-pushed the gossipsub-backpressure-cont branch from 40e651c to 92a171c Compare November 27, 2023 15:54
@jxs jxs force-pushed the gossipsub-backpressure-cont branch from bf76c6f to 4422990 Compare December 9, 2023 16:44
@jxs jxs requested a review from mxinden December 9, 2023 16:44
@jxs jxs force-pushed the gossipsub-backpressure-cont branch from 4422990 to 09143b5 Compare December 9, 2023 16:46
@jxs jxs force-pushed the gossipsub-backpressure-cont branch from 09143b5 to e421174 Compare December 9, 2023 16:47
protocols/gossipsub/CHANGELOG.md Outdated Show resolved Hide resolved
@@ -37,12 +37,13 @@ sha2 = "0.10.8"
smallvec = "1.11.2"
tracing = "0.1.37"
void = "1.0.2"
async-channel = "1.9.0"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still needed?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no no, thanks Max, updated

@@ -332,6 +335,9 @@ pub struct Behaviour<D = IdentityTransform, F = AllowAllSubscriptionFilter> {

/// Keep track of a set of internal metrics relating to gossipsub.
metrics: Option<Metrics>,

/// Connection handler message queue channels.
handler_send_queues: HashMap<PeerId, RpcSender>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With the latest refactorings, is this field still needed? The RpcSender is now part of PeerConnections, right?

.connected_peers
.get_mut(&peer_id)
.expect("Peerid should exist")
.clone();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this clone needed?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's not, thanks Max!

/// Send a `RpcOut::Subscribe` message to the `RpcReceiver`
/// this is high priority. By cloning `futures::channel::mpsc::Sender`
/// we get one extra slot in the channel's capacity.
pub(crate) fn subscribe(&mut self, topic: TopicHash) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should subscribe return a Result for the case where all channels to the remote peer are closed?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we assume that situation as unreachable? i.e. there will always be at least one connection id per peer? as we add them and remove them on new and closed connections.

If not all PeerConnections send methods should account for that right?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See also #4914 (comment).

}
}
}
unreachable!("At least one peer should be available");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this unreachable? Say that all channels to the given peer are closed, the for loop above will not return and thus this unreachable is hit, no?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, it's as I asked above, isn't the situation of having all channels to a given peer closed unreachable?
On handle_established_{inbound,outbound}_connection we add the PeerId and the ConnectionId to connected_peers, which is then removed on on_connection_closed removing the ConnectionId, and the PeerId if `remaining_established_ is zero.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Say that each connection has an error, e.g. here:

Then each connection is going to close their channel.

Say that "the same" time (concurrently) the user calls Behaviour::publish.

In such case, the Behaviour would still track the connections, even though each channel would be dropped.

Does the above make sense?

Comment on lines +178 to +185
Err(err) if err.is_full() => {
tracing::trace!("Queue is full, dropped Forward message");
return;
}
// Channel is closed, try another sender.
Err(err) => {
rpc = err.into_inner();
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still have to give this more thought, though of the top of my head, trying a different connection in both cases is fine, no?

pub(crate) non_priority: Sender<RpcOut>,
}

/// `RpcOut` sender that is priority aware.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// `RpcOut` sender that is priority aware.
/// `RpcOut` receiver that is priority aware.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks Max, addressed!

Copy link
Contributor

mergify bot commented Jan 16, 2024

This pull request has merge conflicts. Could you please resolve them @jxs? 🙏

@jxs jxs force-pushed the gossipsub-backpressure-cont branch from ffe9e74 to a9d74c2 Compare August 22, 2024 17:09
@jxs jxs force-pushed the gossipsub-backpressure-cont branch from a9d74c2 to 1072cc7 Compare August 22, 2024 17:14
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants