Skip to content

Commit

Permalink
Add a select combinator for streams
Browse files Browse the repository at this point in the history
Similar to merge but requires the items/errors to be the same and then only
yields one at a time.

Closes rust-lang#239
  • Loading branch information
alexcrichton committed Nov 10, 2016
1 parent 5137d3f commit d30d6a1
Show file tree
Hide file tree
Showing 3 changed files with 100 additions and 1 deletion.
22 changes: 21 additions & 1 deletion src/stream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ mod merge;
mod once;
mod or_else;
mod peek;
mod select;
mod skip;
mod skip_while;
mod take;
Expand All @@ -50,12 +51,13 @@ pub use self::map_err::MapErr;
pub use self::merge::{Merge, MergedItem};
pub use self::once::{Once, once};
pub use self::or_else::OrElse;
pub use self::peek::Peekable;
pub use self::select::Select;
pub use self::skip::Skip;
pub use self::skip_while::SkipWhile;
pub use self::take::Take;
pub use self::then::Then;
pub use self::zip::Zip;
pub use self::peek::Peekable;

if_std! {
use std;
Expand Down Expand Up @@ -801,6 +803,24 @@ pub trait Stream {
{
chunks::new(self, capacity)
}

/// Creates a stream that selects the next element from either this stream
/// or the provided one, whichever is ready first.
///
/// This combinator will attempt to pull items from both streams. Each
/// stream will be polled in a round-robin fashion, and whenever a stream is
/// ready to yield an item that item is yielded.
///
/// The `select` function is similar to `merge` except that it requires both
/// streams to have the same item and error types.
///
/// Error are passed through from either stream.
fn select<S>(self, other: S) -> Select<Self, S>
where S: Stream<Item = Self::Item, Error = Self::Error>,
Self: Sized,
{
select::new(self, other)
}
}

impl<'a, S: ?Sized + Stream> Stream for &'a mut S {
Expand Down
64 changes: 64 additions & 0 deletions src/stream/select.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
use {Poll, Async};
use stream::{Stream, Fuse};

/// An adapter for merging the output of two streams.
///
/// The merged stream produces items from either of the underlying streams as
/// they become available, and the streams are polled in a round-robin fashion.
/// Errors, however, are not merged: you get at most one error at a time.
#[must_use = "streams do nothing unless polled"]
pub struct Select<S1, S2> {
stream1: Fuse<S1>,
stream2: Fuse<S2>,
flag: bool,
}

pub fn new<S1, S2>(stream1: S1, stream2: S2) -> Select<S1, S2>
where S1: Stream,
S2: Stream<Item = S1::Item, Error = S1::Error>
{
Select {
stream1: stream1.fuse(),
stream2: stream2.fuse(),
flag: false,
}
}

impl<S1, S2> Stream for Select<S1, S2>
where S1: Stream,
S2: Stream<Item = S1::Item, Error = S1::Error>
{
type Item = S1::Item;
type Error = S1::Error;

fn poll(&mut self) -> Poll<Option<S1::Item>, S1::Error> {
let (a, b) = if self.flag {
(&mut self.stream2 as &mut Stream<Item=_, Error=_>,
&mut self.stream1 as &mut Stream<Item=_, Error=_>)
} else {
(&mut self.stream1 as &mut Stream<Item=_, Error=_>,
&mut self.stream2 as &mut Stream<Item=_, Error=_>)
};
self.flag = !self.flag;

let a_done = match try!(a.poll()) {
Async::Ready(Some(item)) => return Ok(Some(item).into()),
Async::Ready(None) => true,
Async::NotReady => false,
};

match try!(b.poll()) {
Async::Ready(Some(item)) => {
// If the other stream isn't finished yet, give them a chance to
// go first next time as we pulled something off `b`.
if !a_done {
self.flag = !self.flag;
}
return Ok(Some(item).into())
}
Async::Ready(None) if a_done => Ok(None.into()),
Async::Ready(None) => Ok(Async::NotReady),
Async::NotReady => Ok(Async::NotReady),
}
}
}
15 changes: 15 additions & 0 deletions tests/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -287,3 +287,18 @@ fn chunks() {
fn chunks_panic_on_cap_zero() {
let _ = list().chunks(0);
}

#[test]
fn select() {
let a = iter(vec![Ok::<_, u32>(1), Ok(2), Ok(3)]);
let b = iter(vec![Ok(4), Ok(5), Ok(6)]);
assert_done(|| a.select(b).collect(), Ok(vec![1, 4, 2, 5, 3, 6]));

let a = iter(vec![Ok::<_, u32>(1), Ok(2), Ok(3)]);
let b = iter(vec![Ok(1), Ok(2)]);
assert_done(|| a.select(b).collect(), Ok(vec![1, 1, 2, 2, 3]));

let a = iter(vec![Ok(1), Ok(2)]);
let b = iter(vec![Ok::<_, u32>(1), Ok(2), Ok(3)]);
assert_done(|| a.select(b).collect(), Ok(vec![1, 1, 2, 2, 3]));
}

0 comments on commit d30d6a1

Please sign in to comment.