Skip to content
This repository has been archived by the owner on Mar 3, 2020. It is now read-only.

#[async_stream] implementation #32

Merged
merged 21 commits into from
Oct 17, 2017
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 27 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,7 @@ fn foo() -> io::Result<i32> {
}
```

And finally you can also have "async `for` loops" which operate over the
[`Stream`] trait:
You can also have "async `for` loops" which operate over the [`Stream`] trait:

```rust
#[async]
Expand All @@ -78,6 +77,32 @@ An async `for` loop will propagate errors out of the function so `message` has
the `Item` type of the `stream` passed in. Note that async `for` loops can only
be used inside of an `#[async]` function.

And finally, you can create a `Stream` instead of a `Future` via
`#[async_stream(item = _)]`:

```rust
#[async]
fn fetch(client: hyper::Client, url: &'static str) -> io::Result<String> {
// ...
}

/// Fetch all provided urls one at a time
#[async_stream(item = String)]
fn fetch_all(client: hyper::Client, urls: Vec<&'static str>) -> io::Result<()> {
for url in urls {
let s = await!(fetch(client, url))?;
stream_yield!(Ok(s));
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was this intended to be stream_yield!(s)?

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah so it looks like this was changed during discussion?

I'd personally prefer to only yield the item type from stream_yield!. Our eventual desired semantics for Stream is that an Err fuses (ends) the stream as well as returning None, so I'm hoping we can get a jump start on that here by only allowing yielding items and using return Err(...) for errors.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Awesome, that's a change I'm very much in favour of. For some reason I got the impression from the linked issue that that wouldn't be changing any time soon, if it is a concrete plan then I can definitely switch this back.

}
Ok(())
}
```

`#[async_stream]` must have an item type specified via `item = some::Path` and
the values output from the stream must be wrapped into a `Result` and yielded
via the `stream_yield!` macro. This macro also supports the same features as
`#[async]`, an additional `boxed` argument to return a `Box<Stream>`, async
`for` loops, etc.

[`Future`]: https://docs.rs/futures/0.1.13/futures/future/trait.Future.html
[`Stream`]: https://docs.rs/futures/0.1.13/futures/stream/trait.Stream.html

Expand Down
19 changes: 8 additions & 11 deletions examples/echo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,11 @@ fn main() {
let server = async_block! {
#[async]
for (client, _) in tcp.incoming() {
handle.spawn(handle_client(client).then(|result| {
match result {
Ok(n) => println!("wrote {} bytes", n),
Err(e) => println!("IO error {:?}", e),
}
handle.spawn(handle_client(client).for_each(|n| {
println!("wrote {} bytes", n);
Ok(())
}).map_err(|e| {
println!("IO error {:?}", e);
}));
}

Expand All @@ -43,19 +42,17 @@ fn main() {
core.run(server).unwrap();
}

#[async]
fn handle_client(socket: TcpStream) -> io::Result<u64> {
#[async_stream(item = u64)]
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could this stay as it was before? This was mostly intended to showcase the similarity to an otherwise synchronous echo server

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

fn handle_client(socket: TcpStream) -> io::Result<()> {
let (reader, mut writer) = socket.split();
let input = BufReader::new(reader);

let mut total = 0;

#[async]
for line in tokio_io::io::lines(input) {
println!("got client line: {}", line);
total += line.len() as u64;
stream_yield!(Ok(line.len() as u64));
writer = await!(tokio_io::io::write_all(writer, line))?.0;
}

Ok(total)
Ok(())
}
4 changes: 4 additions & 0 deletions futures-async-macro/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,7 @@ proc-macro2 = { version = "0.1", features = ["unstable"] }
git = 'https://github.com/dtolnay/syn'
features = ["full", "fold", "parsing", "printing"]
default-features = false

[dependencies.synom]
git = 'https://github.com/dtolnay/syn'
default-features = false
212 changes: 172 additions & 40 deletions futures-async-macro/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,25 +18,23 @@ extern crate proc_macro;
#[macro_use]
extern crate quote;
extern crate syn;
#[macro_use]
extern crate synom;

use proc_macro2::Span;
use proc_macro::{TokenStream, TokenTree, Delimiter, TokenNode};
use quote::{Tokens, ToTokens};
use syn::*;
use syn::delimited::Delimited;
use syn::fold::Folder;

#[proc_macro_attribute]
pub fn async(attribute: TokenStream, function: TokenStream) -> TokenStream {
// Handle arguments to the #[async] attribute, if any
let attribute = attribute.to_string();
let boxed = if attribute == "( boxed )" {
true
} else if attribute == "" {
false
} else {
panic!("the #[async] attribute currently only takes `boxed` as an arg");
};

fn async_inner<F>(
boxed: bool,
function: TokenStream,
gen_function: Tokens,
return_ty: F)
-> TokenStream
where F: FnOnce(&Ty) -> proc_macro2::TokenStream {
// Parse our item, expecting a function. This function may be an actual
// top-level function or it could be a method (typically dictated by the
// arguments). We then extract everything we'd like to use.
Expand Down Expand Up @@ -164,30 +162,7 @@ pub fn async(attribute: TokenStream, function: TokenStream) -> TokenStream {
// Basically just take all those expression and expand them.
let block = ExpandAsyncFor.fold_block(*block);

// TODO: can we lift the restriction that `futures` must be at the root of
// the crate?

let output_span = first_last(&output);
let return_ty = if boxed {
quote! {
Box<::futures::Future<
Item = <! as ::futures::__rt::IsResult>::Ok,
Error = <! as ::futures::__rt::IsResult>::Err,
>>
}
} else {
// Dunno why this is buggy, hits weird typecheck errors in tests
//
// quote! {
// impl ::futures::Future<
// Item = <#output as ::futures::__rt::MyTry>::MyOk,
// Error = <#output as ::futures::__rt::MyTry>::MyError,
// >
// }
quote! { impl ::futures::__rt::MyFuture<!> + 'static }
};
let return_ty = respan(return_ty.into(), &output_span);
let return_ty = replace_bang(return_ty, &output);
let return_ty = return_ty(&output);

let block_inner = quote! {
#( let #patterns = #temp_bindings; )*
Expand All @@ -207,7 +182,7 @@ pub fn async(attribute: TokenStream, function: TokenStream) -> TokenStream {
#[allow(unreachable_code)]
{
return __e;
loop { yield }
loop { yield ::futures::Async::NotReady }
}
};
let mut gen_body = Tokens::new();
Expand All @@ -218,7 +193,7 @@ pub fn async(attribute: TokenStream, function: TokenStream) -> TokenStream {
// Give the invocation of the `gen` function the same span as the output
// as currently errors related to it being a result are targeted here. Not
// sure if more errors will highlight this function call...
let gen_function = quote! { ::futures::__rt::gen };
let output_span = first_last(&output);
let gen_function = respan(gen_function.into(), &output_span);
let body_inner = quote! {
#gen_function (move || #gen_body)
Expand Down Expand Up @@ -247,6 +222,93 @@ pub fn async(attribute: TokenStream, function: TokenStream) -> TokenStream {
output.into()
}

#[proc_macro_attribute]
pub fn async(attribute: TokenStream, function: TokenStream) -> TokenStream {
// Handle arguments to the #[async] attribute, if any
let attribute = attribute.to_string();
let boxed = if attribute == "( boxed )" {
true
} else if attribute == "" {
false
} else {
panic!("the #[async] attribute currently only takes `boxed` as an arg");
};

async_inner(boxed, function, quote! { ::futures::__rt::gen }, |output| {
// TODO: can we lift the restriction that `futures` must be at the root of
// the crate?
let output_span = first_last(&output);
let return_ty = if boxed {
quote! {
Box<::futures::Future<
Item = <! as ::futures::__rt::IsResult>::Ok,
Error = <! as ::futures::__rt::IsResult>::Err,
>>
}
} else {
// Dunno why this is buggy, hits weird typecheck errors in tests
//
// quote! {
// impl ::futures::Future<
// Item = <#output as ::futures::__rt::MyTry>::MyOk,
// Error = <#output as ::futures::__rt::MyTry>::MyError,
// >
// }
quote! { impl ::futures::__rt::MyFuture<!> + 'static }
};
let return_ty = respan(return_ty.into(), &output_span);
replace_bang(return_ty, &output)
})
}

#[proc_macro_attribute]
pub fn async_stream(attribute: TokenStream, function: TokenStream) -> TokenStream {
// Handle arguments to the #[async_stream] attribute, if any
let args = syn::parse::<AsyncStreamArgs>(attribute)
.expect("failed to parse attribute arguments");

let mut boxed = false;
let mut item_ty = None;

for arg in args.0 {
match arg {
AsyncStreamArg(term, None) => {
if term == "boxed" {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to below, could this panic if boxed is already true?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

boxed = true;
} else {
panic!("unexpected #[async_stream] argument '{}'", term);
}
}
AsyncStreamArg(term, Some(ty)) => {
if term == "item" {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could this also panic if item_ty is Some already?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

item_ty = Some(ty);
} else {
panic!("unexpected #[async_stream] argument '{}'", quote!(#term = #ty));
}
}
}
}

let boxed = boxed;
let item_ty = item_ty.expect("#[async_stream] requires item type to be specified");

async_inner(boxed, function, quote! { ::futures::__rt::gen_stream }, |output| {
let output_span = first_last(&output);
let return_ty = if boxed {
quote! {
Box<::futures::Stream<
Item = !,
Error = <! as ::futures::__rt::IsResult>::Err,
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like it may not work? Maybe a test could be added for #[async_stream(boxed, item = ...)]?

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aha nevermind, missed the new replace_bangs function

>>
}
} else {
quote! { impl ::futures::__rt::MyStream<!, !> + 'static }
};
let return_ty = respan(return_ty.into(), &output_span);
replace_bangs(return_ty, &[&item_ty, &output])
})
}

#[proc_macro]
pub fn async_block(input: TokenStream) -> TokenStream {
let input = TokenStream::from(TokenTree {
Expand All @@ -268,7 +330,40 @@ pub fn async_block(input: TokenStream) -> TokenStream {
syn::tokens::Move(span).to_tokens(tokens);
syn::tokens::OrOr([span, span]).to_tokens(tokens);
syn::tokens::Brace(span).surround(tokens, |tokens| {
(quote! { if false { yield } }).to_tokens(tokens);
(quote! {
if false { yield ::futures::Async::NotReady }
}).to_tokens(tokens);
expr.to_tokens(tokens);
});
});

tokens.into()
}

#[proc_macro]
pub fn async_stream_block(input: TokenStream) -> TokenStream {
let input = TokenStream::from(TokenTree {
kind: TokenNode::Group(Delimiter::Brace, input),
span: Default::default(),
});
let expr = syn::parse(input)
.expect("failed to parse tokens as an expression");
let expr = ExpandAsyncFor.fold_expr(expr);

let mut tokens = quote! {
::futures::__rt::gen_stream
};

// Use some manual token construction here instead of `quote!` to ensure
// that we get the `call_site` span instead of the default span.
let span = syn::Span(Span::call_site());
syn::tokens::Paren(span).surround(&mut tokens, |tokens| {
syn::tokens::Move(span).to_tokens(tokens);
syn::tokens::OrOr([span, span]).to_tokens(tokens);
syn::tokens::Brace(span).surround(tokens, |tokens| {
(quote! {
if false { yield ::futures::Async::NotReady }
}).to_tokens(tokens);
expr.to_tokens(tokens);
});
});
Expand Down Expand Up @@ -311,7 +406,7 @@ impl Folder for ExpandAsyncFor {
}
}
futures_await::Async::NotReady => {
yield;
yield futures_await::Async::NotReady;
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this may be best using ::futures::....

Could a test be added for this as well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's an extern crate futures_await; line just above. This whole #[async] for expander uses that instead of via ::futures.

continue
}
}
Expand Down Expand Up @@ -362,3 +457,40 @@ fn replace_bang(input: proc_macro2::TokenStream, tokens: &ToTokens)
}
new_tokens.into()
}

fn replace_bangs(input: proc_macro2::TokenStream, replacements: &[&ToTokens])
-> proc_macro2::TokenStream
{
let mut replacements = replacements.iter().cycle();
let mut new_tokens = Tokens::new();
for token in input.into_iter() {
match token.kind {
proc_macro2::TokenNode::Op('!', _) => {
replacements.next().unwrap().to_tokens(&mut new_tokens);
}
_ => token.to_tokens(&mut new_tokens),
}
}
new_tokens.into()
}

struct AsyncStreamArg(syn::Ident, Option<syn::Ty>);

impl synom::Synom for AsyncStreamArg {
named!(parse -> Self, do_parse!(
i: syn!(syn::Ident) >>
p: option!(do_parse!(
syn!(syn::tokens::Eq) >>
p: syn!(syn::Ty) >>
(p))) >>
(AsyncStreamArg(i, p))));
}

struct AsyncStreamArgs(Vec<AsyncStreamArg>);

impl synom::Synom for AsyncStreamArgs {
named!(parse -> Self, map!(
option!(parens!(call!(Delimited::<AsyncStreamArg, syn::tokens::Comma>::parse_separated_nonempty))),
|p| AsyncStreamArgs(p.map(|d| d.0.into_vec()).unwrap_or_default())
));
}
10 changes: 9 additions & 1 deletion futures-await-macro/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,15 @@ macro_rules! await {
break ::futures::__rt::Err(e)
}
}
yield
yield ::futures::Async::NotReady
}
})
}

#[macro_export]
macro_rules! stream_yield {
($e:expr) => ({
let e = $e;
yield ::futures::Async::Ready(e)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could this perhaps just be yield e? I think it'd be fine to use two different conversions internally for the two attributes

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand what you mean by two different conversions here, could you explain a bit more?

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah sure yeah, basically I just mean that yielding in #[async] should be just a literal yield, and yielding in #[async_stream] would be yield e

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So instead of having the stream_yield! macro, just do a rewrite from yield e; to yield ::futures::Async::Ready(e); as part of the #[async_stream] macro? That does have the downside of meaning you can't write macros that yield values, but I assume those would be rare enough to not matter. I'll try and get this and the attribute parsing done tomorrow.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh no I was thinking we'd keep the stream_yield! macro, but it would be implemented with yield e

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, how would that fit with await! and #[async] for yielding Async::NotReady then, just have the user do something like stream_yield!(Async::Ready(e))? I thought part of the purpose of async/await syntax was to allow composing futures/streams together without having to work with some of the more verbose underlying types like Async.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I see, that makes sense! In that case let's leave as-is.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Depending on how much magic (tm) is acceptable, #[async] and #[async_stream] could themselves expand await! in different ways that make sense for each of them?

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nah I'm fine using the strategy here, seems fine to me!

})
}
Loading