diff --git a/transports/quic/src/muxer.rs b/transports/quic/src/muxer.rs index f3daaf0c657..4014a72a860 100644 --- a/transports/quic/src/muxer.rs +++ b/transports/quic/src/muxer.rs @@ -24,6 +24,7 @@ use crate::error::Error; use futures::{AsyncRead, AsyncWrite}; use libp2p_core::muxing::{StreamMuxer, StreamMuxerEvent}; use parking_lot::Mutex; +use quinn_proto::FinishError; use std::{ collections::HashMap, io::{self, Write}, @@ -123,6 +124,9 @@ impl StreamMuxer for QuicMuxer { ConnectionEvent::StreamFinished(substream) | ConnectionEvent::StreamStopped(substream) => { if let Some(substream) = inner.substreams.get_mut(&substream) { + if let Some(waker) = substream.read_waker.take() { + waker.wake(); + } if let Some(waker) = substream.write_waker.take() { waker.wake(); } @@ -344,5 +348,19 @@ impl Drop for Substream { fn drop(&mut self) { let mut muxer = self.muxer.lock(); muxer.substreams.remove(&self.id); + let _ = muxer + .connection + .connection + .recv_stream(self.id) + .stop(0u32.into()); + let mut send_stream = muxer.connection.connection.send_stream(self.id); + match send_stream.finish() { + Ok(()) => {} + // Already finished or reset, which is fine. + Err(FinishError::UnknownStream) => {} + Err(FinishError::Stopped(reason)) => { + let _ = send_stream.reset(reason); + } + } } }