Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Possible regression with mpsc channels closing early #1440

Closed
ebkalderon opened this issue Jan 28, 2019 · 9 comments · Fixed by #1443
Closed

Possible regression with mpsc channels closing early #1440

ebkalderon opened this issue Jan 28, 2019 · 9 comments · Fixed by #1443

Comments

@ebkalderon
Copy link
Contributor

I have a simple project which spawns a directed acyclic graph of tasks which each feed data into a futures::channel::mpsc::Sender while the main thread awaits the futures::channel::mpsc::Receiver for progress reported from each task. The DAG is constructed by calling future::join_all() on a set of dependencies and then chaining the task itself with .then(). I have a minimal verifiable example for the code in question at ebkalderon/sink-futures-example.

Given the following task graph:

+-------+   +--------+
| first |   | second |
+-------+   +--------+
       \     /
      +-------+
      | final |
      +-------+

Each future forwards a stream::once(future::ok(...)) to the Sender. When the code was written with futures 0.1.25, the Receiver stream results in the output:

Ok("first")
Ok("second")
Ok("final")

However, when the code is ported to futures 0.3.0-alpha.12, with very minimal changes, the Receiver stream unexpectedly closes early after the first element:

Ok("first")

I would hazard a guess that the stream::once() in futures 0.3.0-alpha.12 is causing the sink to close early, whereas the stream::once() in 0.1.25 does not, but I'm not familiar enough with the implementation of the mpsc channels to know if this is the case.

@Nemo157
Copy link
Member

Nemo157 commented Jan 28, 2019

I think the issue here is that closing a sender should just flush any outgoing messages and decrement the sender count, similar to its Drop::drop impl, instead of shutting everything down.

@ebkalderon
Copy link
Contributor Author

ebkalderon commented Jan 28, 2019

@Nemo157 Interesting. According to the documentation for StreamExt::forward, the sink is not supposed to close. It should flush, but not close.

@Nemo157
Copy link
Member

Nemo157 commented Jan 28, 2019

Oh, yes, that's probably the main issue here then. (I still think closing one of the senders (at least via Sink::poll_close) should not shut down the channel, but that wouldn't matter if forward was fixed).

@ebkalderon
Copy link
Contributor Author

I updated StreamExt::forward to call poll_flush() instead of poll_close() on my own personal clone, but that didn't affect the output. I'll investigate poll_close() as well, as I'm thinking it's still being called somewhere.

@ebkalderon
Copy link
Contributor Author

Looks like StreamExt::forward in futures 0.1.25 also calls close() on the sink once it's finished forwarding. My guess is that the correct solution here is to update poll_close() with the correct behavior you described.

@Nemo157
Copy link
Member

Nemo157 commented Jan 28, 2019

To rule out forward I tried changing your example code with:

diff --git src/bin/new.rs src/bin/new.rs
index e998019..e64ad4a 100644
--- src/bin/new.rs
+++ src/bin/new.rs
@@ -18,13 +18,10 @@ type MySender = Sender<Result<String, ()>>;
 pub struct JobFuture(Pin<Box<dyn Future<Output = ()> + Send>>);

 impl JobFuture {
-    fn new<S: Stream<Item = Result<String, ()>> + Send + 'static>(inner: S, tx: MySender) -> Self {
-        let future = inner
-            .map(|result| Ok(result))
-            .forward(tx.sink_map_err(|_| ()))
-            .map(|_| ());
-
-        JobFuture(Box::pin(future))
+    fn new<S: Stream<Item = Result<String, ()>> + Send + Unpin + 'static>(mut inner: S, mut tx: MySender) -> Self {
+        JobFuture(Box::pin(async move {
+            await!(tx.send(await!(inner.next()).unwrap())).unwrap();
+        }))
     }
 }

that gives the same result (and is definitely not closing the senders), so it looks like there's something more going on here...

@ebkalderon
Copy link
Contributor Author

ebkalderon commented Jan 28, 2019

In futures 0.1.25, Sender::close() only returns Ok(Async::Ready(())) (source) and does not close the channel. But in futures 0.3.0-alpha.12, Sender::poll_close() calls Sender::close_channel() before returning Ok(Async::Ready(())) (source).

I think I just need to remove the self.close_channel() call to match the old implementation. What do you think, @Nemo157?

@ebkalderon
Copy link
Contributor Author

I tried removing the self.close_channel() from Sender::poll_close() to see what would happen, and it still doesn't seem to fix the problem.

@ebkalderon
Copy link
Contributor Author

After discussing the matter on Discord with @Nemo157, we found a solution. The way the stream was being iterated in my example code was incorrect, but it was also found that changing StreamExt::forward to call poll_flush() instead of poll_close() indeed fixed the issue.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants