Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mplex: Check for error and shutdown. #1529

Merged
merged 3 commits into from
Apr 1, 2020
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 44 additions & 24 deletions muxers/mplex/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ pub struct MplexConfig {

impl MplexConfig {
/// Builds the default configuration.
#[inline]
pub fn new() -> MplexConfig {
Default::default()
}
Expand All @@ -62,7 +61,6 @@ impl MplexConfig {
/// generated and the connection closes.
///
/// A limit is necessary in order to avoid DoS attacks.
#[inline]
pub fn max_substreams(&mut self, max: usize) -> &mut Self {
self.max_substreams = max;
self
Expand All @@ -71,7 +69,6 @@ impl MplexConfig {
/// Sets the maximum number of pending incoming messages.
///
/// A limit is necessary in order to avoid DoS attacks.
#[inline]
pub fn max_buffer_len(&mut self, max: usize) -> &mut Self {
self.max_buffer_len = max;
self
Expand All @@ -80,7 +77,6 @@ impl MplexConfig {
/// Sets the behaviour when the maximum buffer length has been reached.
///
/// See the documentation of `MaxBufferBehaviour`.
#[inline]
pub fn max_buffer_len_behaviour(&mut self, behaviour: MaxBufferBehaviour) -> &mut Self {
self.max_buffer_behaviour = behaviour;
self
Expand All @@ -94,7 +90,6 @@ impl MplexConfig {
self
}

#[inline]
fn upgrade<C>(self, i: C) -> Multiplex<C>
where
C: AsyncRead + AsyncWrite + Unpin
Expand Down Expand Up @@ -122,7 +117,6 @@ impl MplexConfig {
}

impl Default for MplexConfig {
#[inline]
fn default() -> MplexConfig {
MplexConfig {
max_substreams: 128,
Expand All @@ -149,7 +143,6 @@ impl UpgradeInfo for MplexConfig {
type Info = &'static [u8];
type InfoIter = iter::Once<Self::Info>;

#[inline]
fn protocol_info(&self) -> Self::InfoIter {
iter::once(b"/mplex/6.7.0")
}
Expand Down Expand Up @@ -334,9 +327,7 @@ where C: AsyncRead + AsyncWrite + Unpin,
fn poll_send<C>(inner: &mut MultiplexInner<C>, cx: &mut Context, elem: codec::Elem) -> Poll<Result<(), IoError>>
where C: AsyncRead + AsyncWrite + Unpin
{
if inner.is_shutdown {
return Poll::Ready(Err(IoError::new(IoErrorKind::Other, "connection is shut down")))
}
ensure_no_error_no_close(inner)?;

inner.notifier_write.insert(cx.waker());

Expand All @@ -348,10 +339,26 @@ where C: AsyncRead + AsyncWrite + Unpin
}
},
Poll::Pending => Poll::Pending,
Poll::Ready(Err(err)) => Poll::Ready(Err(err))
Poll::Ready(Err(err)) => {
inner.error = Err(IoError::new(err.kind(), err.to_string()));
Poll::Ready(Err(err))
}
}
}

fn ensure_no_error_no_close<C>(inner: &mut MultiplexInner<C>) -> Result<(), IoError>
where
C: AsyncRead + AsyncWrite + Unpin
{
if inner.is_shutdown {
return Err(IoError::new(IoErrorKind::Other, "connection is shut down"))
}
if let Err(ref e) = inner.error {
return Err(IoError::new(e.kind(), e.to_string()))
}
Ok(())
}

impl<C> StreamMuxer for Multiplex<C>
where C: AsyncRead + AsyncWrite + Unpin
{
Expand Down Expand Up @@ -418,9 +425,7 @@ where C: AsyncRead + AsyncWrite + Unpin
poll_send(&mut inner, cx, elem.clone())
},
OutboundSubstreamState::Flush => {
if inner.is_shutdown {
return Poll::Ready(Err(IoError::new(IoErrorKind::Other, "connection is shut down")))
}
ensure_no_error_no_close(&mut inner)?;
let inner = &mut *inner; // Avoids borrow errors
inner.notifier_write.insert(cx.waker());
Sink::poll_flush(Pin::new(&mut inner.inner), &mut Context::from_waker(&waker_ref(&inner.notifier_write)))
Expand All @@ -438,6 +443,7 @@ where C: AsyncRead + AsyncWrite + Unpin
inner.buffer.retain(|elem| {
elem.substream_id() != substream.num || elem.endpoint() == Some(Endpoint::Dialer)
});
inner.error = Err(IoError::new(err.kind(), err.to_string()));
return Poll::Ready(Err(err));
},
};
Expand Down Expand Up @@ -465,7 +471,6 @@ where C: AsyncRead + AsyncWrite + Unpin
}
}

#[inline]
fn destroy_outbound(&self, _substream: Self::OutboundSubstream) {
// Nothing to do.
}
Expand Down Expand Up @@ -548,13 +553,14 @@ where C: AsyncRead + AsyncWrite + Unpin

fn flush_substream(&self, cx: &mut Context, _substream: &mut Self::Substream) -> Poll<Result<(), IoError>> {
let mut inner = self.inner.lock();
if inner.is_shutdown {
return Poll::Ready(Err(IoError::new(IoErrorKind::Other, "connection is shut down")))
}

ensure_no_error_no_close(&mut inner)?;
let inner = &mut *inner; // Avoids borrow errors
inner.notifier_write.insert(cx.waker());
Sink::poll_flush(Pin::new(&mut inner.inner), &mut Context::from_waker(&waker_ref(&inner.notifier_write)))
let result = Sink::poll_flush(Pin::new(&mut inner.inner), &mut Context::from_waker(&waker_ref(&inner.notifier_write)));
if let Poll::Ready(Err(err)) = &result {
inner.error = Err(IoError::new(err.kind(), err.to_string()));
}
result
}

fn shutdown_substream(&self, cx: &mut Context, sub: &mut Self::Substream) -> Poll<Result<(), IoError>> {
Expand Down Expand Up @@ -585,28 +591,42 @@ where C: AsyncRead + AsyncWrite + Unpin
self.inner.lock().is_acknowledged
}

#[inline]
fn close(&self, cx: &mut Context) -> Poll<Result<(), IoError>> {
let inner = &mut *self.inner.lock();
if inner.is_shutdown {
return Poll::Ready(Ok(()))
}
if let Err(ref e) = inner.error {
return Poll::Ready(Err(IoError::new(e.kind(), e.to_string())))
}
inner.notifier_write.insert(cx.waker());
match Sink::poll_close(Pin::new(&mut inner.inner), &mut Context::from_waker(&waker_ref(&inner.notifier_write))) {
Poll::Ready(Ok(())) => {
inner.is_shutdown = true;
Poll::Ready(Ok(()))
}
Poll::Ready(Err(err)) => Poll::Ready(Err(err)),
Poll::Ready(Err(err)) => {
inner.error = Err(IoError::new(err.kind(), err.to_string()));
Poll::Ready(Err(err))
}
Poll::Pending => Poll::Pending,
}
}

#[inline]
fn flush_all(&self, cx: &mut Context) -> Poll<Result<(), IoError>> {
let inner = &mut *self.inner.lock();
if inner.is_shutdown {
return Poll::Ready(Ok(()))
}
if let Err(ref e) = inner.error {
return Poll::Ready(Err(IoError::new(e.kind(), e.to_string())))
}
inner.notifier_write.insert(cx.waker());
Sink::poll_flush(Pin::new(&mut inner.inner), &mut Context::from_waker(&waker_ref(&inner.notifier_write)))
let result = Sink::poll_flush(Pin::new(&mut inner.inner), &mut Context::from_waker(&waker_ref(&inner.notifier_write)));
if let Poll::Ready(Err(err)) = &result {
inner.error = Err(IoError::new(err.kind(), err.to_string()));
}
result
}
}

Expand Down