Skip to content
This repository has been archived by the owner on Nov 5, 2018. It is now read-only.

WIP: Unbounded futures mpmc channel #38

Conversation

seunlanlege
Copy link

@seunlanlege seunlanlege commented Feb 26, 2018

  • naive implementation of an unbounded futures aware mpmc channel.

notes:
I've tested this here(yes I actually have a need for an mpmc futures channel) and it works for the most part.

https://github.com/SeunLanLege/arc-reactor/blob/futures-mpmc/src/ArcCore/reactor.rs#L64-L139

A lot of work still needs to be done, as regards to drop semantics and a more efficient approach to task notification. plus there's a lot i don't understand like:
1.why we need to track a task's notification state? or maybe i don't fully understand what tasks really are. 😢

but yeah, pointers are welcome.

danburkert and others added 3 commits February 24, 2018 21:07
This is a proof-of-concept futures-aware mpsc channel which builds on
the internal bounded and unbounded channel flavors.
@seunlanlege
Copy link
Author

@danburkert @stjepang

@seunlanlege seunlanlege mentioned this pull request Feb 26, 2018
@ghost
Copy link

ghost commented Mar 7, 2018

Sorry for the late reply! The code looks very good so far, and I especially like the abundance of comments. :)

why we need to track a task's notification state? or maybe i don't fully understand what tasks really are.

@danburkert I don't know the answer to this question either - is the current notification system in this PR (inherited from your MPSC implementation) going to work?

@seunlanlege Just in case you missed the discussion, the subtleties around notifying blocked tasks are explained in this comment. The most important thing to keep in mind is that methods send and recv will have to return a custom future which, when dropped, removes the current task from the notification queue (currently called sender_tasks/receiver_tasks).

@jonhoo
Copy link

jonhoo commented Mar 29, 2018

@seunlanlege if you want to get more familiar with the internals of futures so that you can implement this more efficiently, I highly recommend @aturon's (wip) book on asynchrony in Rust: https://aturon.github.io/apr/async-in-rust/chapter.html.

FWIW, I'm also really excited about the prospect of a futures-aware MPMC channel. Currently I have to do sad things like https://github.com/jonhoo/trawler/blob/17d4f1efcc0a340dfda8be4a8b8cf5ad07a3b441/src/execution/issuer.rs#L34-L40.

@Imxset21
Copy link

Imxset21 commented Aug 2, 2018

Out of curiosity, what's the current status of this PR? I see some references to how this could be implemented more efficiently, but not being familiar with how this is being benchmarked I'm not sure what the obvious issues are.

Would love to see this pushed over the line 👍

@ghost
Copy link

ghost commented Aug 2, 2018

@Imxset21

This PR is currently stalled and based off of an old version of the crate, which has undergone significant changes since then. Futures support is still on the table, but I think we should take a different approach:

  1. Start from the Context struct and change its thread: Thread field.

  2. We make Context generic over P: Park and define trait Park that knows how to register/park/unpark regular threads and tasks (futures).

  3. Then we make methods like send and recv in other files asynchronous. For example, send should be defined as fn send(&self, msg: T) -> Send<T>, where Send<T> is a custom future-like struct that can be polled in order to drive the operation towards completion. It'll probably be an enum of Ready(T) and NotReady(Option<Duration>) (the duration tells us when it will become ready).

  4. Structs Sender<T> and Receiver<T> should be generic over P: Park, too. We continue making things generic and asynchronous like this until everything compiles.

  5. What we end up with at this point are channels generic over the parking mechanism. Perhaps we should move everything into a new crate and call it crossbeam-channel-core.

  6. Finally, we use those generic channels and wrap them in two interfaces: one that uses normal threads for parking and the other that uses futures/tasks.

@jonhoo
Copy link

jonhoo commented Aug 2, 2018

@habnabit has been trying to make a similar change to the one @stjepang suggests above for the bus crate (jonhoo/bus#15), but ran into a lot of trouble doing so. I'll let them expand on it further though, as a bunch of the discussion is on direct e-mails between the two of us. @habnabit -- I'm sure some of your experiences trying to be generic over a Park-like trait would be helpful here!

@habnabit
Copy link

habnabit commented Aug 3, 2018

oof! Yeah I tried to do this but it ended up being a bit of a pain. Porting over a synchronous API took a lot of work to nail down, because you have to ensure that every path through the async code hits something that will schedule an unpark later, unless you can return Ready.

The PR linked above shows the initial work I did when I was attempting to factor this behind a trait. I ended up doing a pure-async refactor just to figure out what the semantics even needed to be. Originally I intended to fold that back into the above PR in the factored-out way, but at this point I think it might obfuscate more than help. Keeping track of control flow and which things will schedule an unpark was more difficult when abstracted.

Another issue was that synchronous code can do work and block a little in Drop, but you can't spin the event loop in Drop if your finalization work ends up hitting a NotReady. The async APIs have some finalization APIs but there's not really a fail-safe; you have to document "unless you use these, you might suffer deadlocks". I did add a warn! log call in there but I'm real iffy on that.

Honestly, the final straw that tipped me into abandoning the refactor was lacking HKT. I switched from using std::sync::mpsc::* because futures::sync::mpsc::* give you the right poll behavior for free, and you can't write type Sender<T> = std::sync::mpsc::Sender<T>; in a trait definition. The number of types needed for the queues is bounded and small, at least, but I didn't feel like dealing with that burden along with all the other issues that were cropping up.

I hope my experiences prove useful, at least. I'll keep subscribed to the issue too.

@ghost
Copy link

ghost commented Sep 16, 2018

So I've played with this a little bit and tried porting channels to futures, but ultimately gave up because the thread wakeup mechanism is so deeply tied to all parts of the crate.

It'd probably be easier to make a new crate with futures support from scratch. I'll give that a shot. :)

@ghost ghost closed this Sep 16, 2018
@lucab
Copy link

lucab commented Oct 15, 2018

@stjepang unfortunately I don't have enough knowledge to contribute to this, but I'm interested in crossing futures and crossbeam-channel. If you already started hacking on this, is it somewhere public?

This pull request was closed.
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Development

Successfully merging this pull request may close these issues.

6 participants