-
Notifications
You must be signed in to change notification settings - Fork 620
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Possible regression with mpsc channels closing early #1440
Comments
I think the issue here is that closing a sender should just flush any outgoing messages and decrement the sender count, similar to its |
@Nemo157 Interesting. According to the documentation for |
Oh, yes, that's probably the main issue here then. (I still think closing one of the senders (at least via |
I updated |
Looks like |
To rule out diff --git src/bin/new.rs src/bin/new.rs
index e998019..e64ad4a 100644
--- src/bin/new.rs
+++ src/bin/new.rs
@@ -18,13 +18,10 @@ type MySender = Sender<Result<String, ()>>;
pub struct JobFuture(Pin<Box<dyn Future<Output = ()> + Send>>);
impl JobFuture {
- fn new<S: Stream<Item = Result<String, ()>> + Send + 'static>(inner: S, tx: MySender) -> Self {
- let future = inner
- .map(|result| Ok(result))
- .forward(tx.sink_map_err(|_| ()))
- .map(|_| ());
-
- JobFuture(Box::pin(future))
+ fn new<S: Stream<Item = Result<String, ()>> + Send + Unpin + 'static>(mut inner: S, mut tx: MySender) -> Self {
+ JobFuture(Box::pin(async move {
+ await!(tx.send(await!(inner.next()).unwrap())).unwrap();
+ }))
}
}
that gives the same result (and is definitely not closing the senders), so it looks like there's something more going on here... |
In I think I just need to remove the |
I tried removing the |
After discussing the matter on Discord with @Nemo157, we found a solution. The way the stream was being iterated in my example code was incorrect, but it was also found that changing |
I have a simple project which spawns a directed acyclic graph of tasks which each feed data into a
futures::channel::mpsc::Sender
while the main threadawait
s thefutures::channel::mpsc::Receiver
for progress reported from each task. The DAG is constructed by callingfuture::join_all()
on a set of dependencies and then chaining the task itself with.then()
. I have a minimal verifiable example for the code in question at ebkalderon/sink-futures-example.Given the following task graph:
Each future forwards a
stream::once(future::ok(...))
to theSender
. When the code was written withfutures
0.1.25, theReceiver
stream results in the output:However, when the code is ported to
futures
0.3.0-alpha.12, with very minimal changes, theReceiver
stream unexpectedly closes early after the first element:I would hazard a guess that the
stream::once()
infutures
0.3.0-alpha.12 is causing the sink to close early, whereas thestream::once()
in 0.1.25 does not, but I'm not familiar enough with the implementation of thempsc
channels to know if this is the case.The text was updated successfully, but these errors were encountered: