Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[mplex] Tweak default config and yield before exceeding buffer limits. #1825

Merged
merged 3 commits into from
Nov 9, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions muxers/mplex/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
# 0.24.0 [unreleased]

- Change the default configuration to use `MaxBufferBehaviour::Block`
and yield from waiting for the next substream or reading from a
particular substream whenever the current read loop may have
already filled a substream buffer, to give the current task a
chance to read from the buffer(s) before the `MaxBufferBehaviour`
takes effect. This is primarily relevant for
`MaxBufferBehaviour::ResetStream`.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
`MaxBufferBehaviour::ResetStream`.
`MaxBufferBehaviour::ResetStream`.
[PR 1825](https://github.com/libp2p/rust-libp2p/pull/1825/).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I was too late. I will include this in #1829.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops, sorry, thanks for doing that.


- Tweak the naming in the `MplexConfig` API for better
consistency with `libp2p-yamux`.
[PR 1822](https://github.com/libp2p/rust-libp2p/pull/1822).
Expand Down
27 changes: 19 additions & 8 deletions muxers/mplex/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,14 +90,26 @@ impl MplexConfig {
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub enum MaxBufferBehaviour {
/// Reset the substream whose frame buffer overflowed.
///
/// > **Note**: If more than [`MplexConfig::set_max_buffer_size()`] frames
/// > are received in succession for a substream in the context of
/// > trying to read data from a different substream, the former substream
/// > may be reset before application code had a chance to read from the
/// > buffer. The max. buffer size needs to be sized appropriately when
/// > using this option to balance maximum resource usage and the
/// > probability of premature termination of a substream.
ResetStream,
/// No new message can be read from any substream as long as the buffer
/// for a single substream is full.
/// No new message can be read from the underlying connection from any
/// substream as long as the buffer for a single substream is full,
/// i.e. application code is expected to read from the full buffer.
///
/// This can potentially introduce a deadlock if you are waiting for a
/// message from a substream before processing the messages received
/// on another substream, i.e. if there are data dependencies across
/// substreams.
/// > **Note**: To avoid blocking without making progress, application
/// > tasks should ensure that, when woken, always try to read (i.e.
/// > make progress) from every substream on which data is expected.
/// > This is imperative in general, as a woken task never knows for
/// > which substream it has been woken, but failure to do so with
/// > [`MaxBufferBehaviour::Block`] in particular may lead to stalled
/// > execution or spinning of a task without progress.
Block,
}

Expand All @@ -106,9 +118,8 @@ impl Default for MplexConfig {
MplexConfig {
max_substreams: 128,
max_buffer_len: 32,
max_buffer_behaviour: MaxBufferBehaviour::ResetStream,
max_buffer_behaviour: MaxBufferBehaviour::Block,
split_send_size: 1024,
}
}
}

52 changes: 45 additions & 7 deletions muxers/mplex/src/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,8 +214,18 @@ where
}

debug_assert!(self.open_buffer.is_empty());
let mut num_buffered = 0;

loop {
// Whenever we may have completely filled a substream
// buffer while waiting for the next inbound stream,
// yield to give the current task a chance to read
// from the respective substreams.
if num_buffered == self.config.max_buffer_len {
cx.waker().clone().wake();
return Poll::Pending
}

// Wait for the next inbound `Open` frame.
match ready!(self.poll_read_frame(cx, None))? {
Frame::Open { stream_id } => {
Expand All @@ -225,6 +235,7 @@ where
}
Frame::Data { stream_id, data } => {
self.buffer(stream_id.into_local(), data)?;
num_buffered += 1;
}
Frame::Close { stream_id } => {
self.on_close(stream_id.into_local())?;
Expand Down Expand Up @@ -406,7 +417,18 @@ where
buf.shrink_to_fit();
}

let mut num_buffered = 0;

loop {
// Whenever we may have completely filled a substream
// buffer of another substream while waiting for the
// next frame for `id`, yield to give the current task
// a chance to read from the other substream(s).
if num_buffered == self.config.max_buffer_len {
cx.waker().clone().wake();
return Poll::Pending
}

// Check if the targeted substream (if any) reached EOF.
if !self.can_read(&id) {
// Note: Contrary to what is recommended by the spec, we must
Expand All @@ -427,6 +449,7 @@ where
// currently being polled, so it needs to be buffered and
// the interested tasks notified.
self.buffer(stream_id.into_local(), data)?;
num_buffered += 1;
}
frame @ Frame::Open { .. } => {
if let Some(id) = self.on_open(frame.remote_id())? {
Expand Down Expand Up @@ -1106,15 +1129,30 @@ mod tests {
let id = LocalStreamId::listener(0);
match m.poll_next_stream(cx) {
Poll::Ready(r) => panic!("Unexpected result for next stream: {:?}", r),
Poll::Pending => {}
Poll::Pending => {
// We expect the implementation to yield when the buffer
// is full but before it is exceeded and the max buffer
// behaviour takes effect, giving the current task a
// chance to read from the buffer. Here we just read
// again to provoke the max buffer behaviour.
assert_eq!(
m.substreams.get_mut(&id).unwrap().recv_buf().len(),
cfg.max_buffer_len
);
match m.poll_next_stream(cx) {
Poll::Ready(r) => panic!("Unexpected result for next stream: {:?}", r),
Poll::Pending => {
// Expect the buffer for stream 0 to be exceeded, triggering
// the max. buffer behaviour.
assert_eq!(
m.substreams.get_mut(&id).unwrap().recv_buf().len(),
cfg.max_buffer_len + 1
);
}
}
}
}

// Expect the buffer for stream 0 to be just 1 over the limit.
assert_eq!(
m.substreams.get_mut(&id).unwrap().recv_buf().len(),
cfg.max_buffer_len + 1
);

// Expect either a `Reset` to be sent or all reads to be
// blocked `Pending`, depending on the `MaxBufferBehaviour`.
match cfg.max_buffer_behaviour {
Expand Down