-
Notifications
You must be signed in to change notification settings - Fork 55
#[async_stream] implementation #32
Changes from 17 commits
6f69b6b
ee6e5dd
1de2fd7
c591be4
60206d1
38b0367
9f30805
12be4e6
d73e94a
5f2e9b5
d06ee39
775a6e6
7b8d3ce
9aa4e75
18e5262
902be86
5c95eda
24f693a
4fb8e8b
1c5f933
82fc83d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -29,12 +29,11 @@ fn main() { | |
let server = async_block! { | ||
#[async] | ||
for (client, _) in tcp.incoming() { | ||
handle.spawn(handle_client(client).then(|result| { | ||
match result { | ||
Ok(n) => println!("wrote {} bytes", n), | ||
Err(e) => println!("IO error {:?}", e), | ||
} | ||
handle.spawn(handle_client(client).for_each(|n| { | ||
println!("wrote {} bytes", n); | ||
Ok(()) | ||
}).map_err(|e| { | ||
println!("IO error {:?}", e); | ||
})); | ||
} | ||
|
||
|
@@ -43,19 +42,17 @@ fn main() { | |
core.run(server).unwrap(); | ||
} | ||
|
||
#[async] | ||
fn handle_client(socket: TcpStream) -> io::Result<u64> { | ||
#[async_stream(item = u64)] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could this stay as it was before? This was mostly intended to showcase the similarity to an otherwise synchronous echo server There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done |
||
fn handle_client(socket: TcpStream) -> io::Result<()> { | ||
let (reader, mut writer) = socket.split(); | ||
let input = BufReader::new(reader); | ||
|
||
let mut total = 0; | ||
|
||
#[async] | ||
for line in tokio_io::io::lines(input) { | ||
println!("got client line: {}", line); | ||
total += line.len() as u64; | ||
stream_yield!(Ok(line.len() as u64)); | ||
writer = await!(tokio_io::io::write_all(writer, line))?.0; | ||
} | ||
|
||
Ok(total) | ||
Ok(()) | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,25 +18,23 @@ extern crate proc_macro; | |
#[macro_use] | ||
extern crate quote; | ||
extern crate syn; | ||
#[macro_use] | ||
extern crate synom; | ||
|
||
use proc_macro2::Span; | ||
use proc_macro::{TokenStream, TokenTree, Delimiter, TokenNode}; | ||
use quote::{Tokens, ToTokens}; | ||
use syn::*; | ||
use syn::delimited::Delimited; | ||
use syn::fold::Folder; | ||
|
||
#[proc_macro_attribute] | ||
pub fn async(attribute: TokenStream, function: TokenStream) -> TokenStream { | ||
// Handle arguments to the #[async] attribute, if any | ||
let attribute = attribute.to_string(); | ||
let boxed = if attribute == "( boxed )" { | ||
true | ||
} else if attribute == "" { | ||
false | ||
} else { | ||
panic!("the #[async] attribute currently only takes `boxed` as an arg"); | ||
}; | ||
|
||
fn async_inner<F>( | ||
boxed: bool, | ||
function: TokenStream, | ||
gen_function: Tokens, | ||
return_ty: F) | ||
-> TokenStream | ||
where F: FnOnce(&Ty) -> proc_macro2::TokenStream { | ||
// Parse our item, expecting a function. This function may be an actual | ||
// top-level function or it could be a method (typically dictated by the | ||
// arguments). We then extract everything we'd like to use. | ||
|
@@ -164,30 +162,7 @@ pub fn async(attribute: TokenStream, function: TokenStream) -> TokenStream { | |
// Basically just take all those expression and expand them. | ||
let block = ExpandAsyncFor.fold_block(*block); | ||
|
||
// TODO: can we lift the restriction that `futures` must be at the root of | ||
// the crate? | ||
|
||
let output_span = first_last(&output); | ||
let return_ty = if boxed { | ||
quote! { | ||
Box<::futures::Future< | ||
Item = <! as ::futures::__rt::IsResult>::Ok, | ||
Error = <! as ::futures::__rt::IsResult>::Err, | ||
>> | ||
} | ||
} else { | ||
// Dunno why this is buggy, hits weird typecheck errors in tests | ||
// | ||
// quote! { | ||
// impl ::futures::Future< | ||
// Item = <#output as ::futures::__rt::MyTry>::MyOk, | ||
// Error = <#output as ::futures::__rt::MyTry>::MyError, | ||
// > | ||
// } | ||
quote! { impl ::futures::__rt::MyFuture<!> + 'static } | ||
}; | ||
let return_ty = respan(return_ty.into(), &output_span); | ||
let return_ty = replace_bang(return_ty, &output); | ||
let return_ty = return_ty(&output); | ||
|
||
let block_inner = quote! { | ||
#( let #patterns = #temp_bindings; )* | ||
|
@@ -207,7 +182,7 @@ pub fn async(attribute: TokenStream, function: TokenStream) -> TokenStream { | |
#[allow(unreachable_code)] | ||
{ | ||
return __e; | ||
loop { yield } | ||
loop { yield ::futures::Async::NotReady } | ||
} | ||
}; | ||
let mut gen_body = Tokens::new(); | ||
|
@@ -218,7 +193,7 @@ pub fn async(attribute: TokenStream, function: TokenStream) -> TokenStream { | |
// Give the invocation of the `gen` function the same span as the output | ||
// as currently errors related to it being a result are targeted here. Not | ||
// sure if more errors will highlight this function call... | ||
let gen_function = quote! { ::futures::__rt::gen }; | ||
let output_span = first_last(&output); | ||
let gen_function = respan(gen_function.into(), &output_span); | ||
let body_inner = quote! { | ||
#gen_function (move || #gen_body) | ||
|
@@ -247,6 +222,93 @@ pub fn async(attribute: TokenStream, function: TokenStream) -> TokenStream { | |
output.into() | ||
} | ||
|
||
#[proc_macro_attribute] | ||
pub fn async(attribute: TokenStream, function: TokenStream) -> TokenStream { | ||
// Handle arguments to the #[async] attribute, if any | ||
let attribute = attribute.to_string(); | ||
let boxed = if attribute == "( boxed )" { | ||
true | ||
} else if attribute == "" { | ||
false | ||
} else { | ||
panic!("the #[async] attribute currently only takes `boxed` as an arg"); | ||
}; | ||
|
||
async_inner(boxed, function, quote! { ::futures::__rt::gen }, |output| { | ||
// TODO: can we lift the restriction that `futures` must be at the root of | ||
// the crate? | ||
let output_span = first_last(&output); | ||
let return_ty = if boxed { | ||
quote! { | ||
Box<::futures::Future< | ||
Item = <! as ::futures::__rt::IsResult>::Ok, | ||
Error = <! as ::futures::__rt::IsResult>::Err, | ||
>> | ||
} | ||
} else { | ||
// Dunno why this is buggy, hits weird typecheck errors in tests | ||
// | ||
// quote! { | ||
// impl ::futures::Future< | ||
// Item = <#output as ::futures::__rt::MyTry>::MyOk, | ||
// Error = <#output as ::futures::__rt::MyTry>::MyError, | ||
// > | ||
// } | ||
quote! { impl ::futures::__rt::MyFuture<!> + 'static } | ||
}; | ||
let return_ty = respan(return_ty.into(), &output_span); | ||
replace_bang(return_ty, &output) | ||
}) | ||
} | ||
|
||
#[proc_macro_attribute] | ||
pub fn async_stream(attribute: TokenStream, function: TokenStream) -> TokenStream { | ||
// Handle arguments to the #[async_stream] attribute, if any | ||
let args = syn::parse::<AsyncStreamArgs>(attribute) | ||
.expect("failed to parse attribute arguments"); | ||
|
||
let mut boxed = false; | ||
let mut item_ty = None; | ||
|
||
for arg in args.0 { | ||
match arg { | ||
AsyncStreamArg(term, None) => { | ||
if term == "boxed" { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Similar to below, could this panic if There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done |
||
boxed = true; | ||
} else { | ||
panic!("unexpected #[async_stream] argument '{}'", term); | ||
} | ||
} | ||
AsyncStreamArg(term, Some(ty)) => { | ||
if term == "item" { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could this also panic if There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done |
||
item_ty = Some(ty); | ||
} else { | ||
panic!("unexpected #[async_stream] argument '{}'", quote!(#term = #ty)); | ||
} | ||
} | ||
} | ||
} | ||
|
||
let boxed = boxed; | ||
let item_ty = item_ty.expect("#[async_stream] requires item type to be specified"); | ||
|
||
async_inner(boxed, function, quote! { ::futures::__rt::gen_stream }, |output| { | ||
let output_span = first_last(&output); | ||
let return_ty = if boxed { | ||
quote! { | ||
Box<::futures::Stream< | ||
Item = !, | ||
Error = <! as ::futures::__rt::IsResult>::Err, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This looks like it may not work? Maybe a test could be added for There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Aha nevermind, missed the new |
||
>> | ||
} | ||
} else { | ||
quote! { impl ::futures::__rt::MyStream<!, !> + 'static } | ||
}; | ||
let return_ty = respan(return_ty.into(), &output_span); | ||
replace_bangs(return_ty, &[&item_ty, &output]) | ||
}) | ||
} | ||
|
||
#[proc_macro] | ||
pub fn async_block(input: TokenStream) -> TokenStream { | ||
let input = TokenStream::from(TokenTree { | ||
|
@@ -268,7 +330,40 @@ pub fn async_block(input: TokenStream) -> TokenStream { | |
syn::tokens::Move(span).to_tokens(tokens); | ||
syn::tokens::OrOr([span, span]).to_tokens(tokens); | ||
syn::tokens::Brace(span).surround(tokens, |tokens| { | ||
(quote! { if false { yield } }).to_tokens(tokens); | ||
(quote! { | ||
if false { yield ::futures::Async::NotReady } | ||
}).to_tokens(tokens); | ||
expr.to_tokens(tokens); | ||
}); | ||
}); | ||
|
||
tokens.into() | ||
} | ||
|
||
#[proc_macro] | ||
pub fn async_stream_block(input: TokenStream) -> TokenStream { | ||
let input = TokenStream::from(TokenTree { | ||
kind: TokenNode::Group(Delimiter::Brace, input), | ||
span: Default::default(), | ||
}); | ||
let expr = syn::parse(input) | ||
.expect("failed to parse tokens as an expression"); | ||
let expr = ExpandAsyncFor.fold_expr(expr); | ||
|
||
let mut tokens = quote! { | ||
::futures::__rt::gen_stream | ||
}; | ||
|
||
// Use some manual token construction here instead of `quote!` to ensure | ||
// that we get the `call_site` span instead of the default span. | ||
let span = syn::Span(Span::call_site()); | ||
syn::tokens::Paren(span).surround(&mut tokens, |tokens| { | ||
syn::tokens::Move(span).to_tokens(tokens); | ||
syn::tokens::OrOr([span, span]).to_tokens(tokens); | ||
syn::tokens::Brace(span).surround(tokens, |tokens| { | ||
(quote! { | ||
if false { yield ::futures::Async::NotReady } | ||
}).to_tokens(tokens); | ||
expr.to_tokens(tokens); | ||
}); | ||
}); | ||
|
@@ -311,7 +406,7 @@ impl Folder for ExpandAsyncFor { | |
} | ||
} | ||
futures_await::Async::NotReady => { | ||
yield; | ||
yield futures_await::Async::NotReady; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this may be best using Could a test be added for this as well? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There's an |
||
continue | ||
} | ||
} | ||
|
@@ -362,3 +457,40 @@ fn replace_bang(input: proc_macro2::TokenStream, tokens: &ToTokens) | |
} | ||
new_tokens.into() | ||
} | ||
|
||
fn replace_bangs(input: proc_macro2::TokenStream, replacements: &[&ToTokens]) | ||
-> proc_macro2::TokenStream | ||
{ | ||
let mut replacements = replacements.iter().cycle(); | ||
let mut new_tokens = Tokens::new(); | ||
for token in input.into_iter() { | ||
match token.kind { | ||
proc_macro2::TokenNode::Op('!', _) => { | ||
replacements.next().unwrap().to_tokens(&mut new_tokens); | ||
} | ||
_ => token.to_tokens(&mut new_tokens), | ||
} | ||
} | ||
new_tokens.into() | ||
} | ||
|
||
struct AsyncStreamArg(syn::Ident, Option<syn::Ty>); | ||
|
||
impl synom::Synom for AsyncStreamArg { | ||
named!(parse -> Self, do_parse!( | ||
i: syn!(syn::Ident) >> | ||
p: option!(do_parse!( | ||
syn!(syn::tokens::Eq) >> | ||
p: syn!(syn::Ty) >> | ||
(p))) >> | ||
(AsyncStreamArg(i, p)))); | ||
} | ||
|
||
struct AsyncStreamArgs(Vec<AsyncStreamArg>); | ||
|
||
impl synom::Synom for AsyncStreamArgs { | ||
named!(parse -> Self, map!( | ||
option!(parens!(call!(Delimited::<AsyncStreamArg, syn::tokens::Comma>::parse_separated_nonempty))), | ||
|p| AsyncStreamArgs(p.map(|d| d.0.into_vec()).unwrap_or_default()) | ||
)); | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -22,7 +22,15 @@ macro_rules! await { | |
break ::futures::__rt::Err(e) | ||
} | ||
} | ||
yield | ||
yield ::futures::Async::NotReady | ||
} | ||
}) | ||
} | ||
|
||
#[macro_export] | ||
macro_rules! stream_yield { | ||
($e:expr) => ({ | ||
let e = $e; | ||
yield ::futures::Async::Ready(e) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could this perhaps just be There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't understand what you mean by two different conversions here, could you explain a bit more? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah sure yeah, basically I just mean that yielding in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So instead of having the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oh no I was thinking we'd keep the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oh, how would that fit with There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah I see, that makes sense! In that case let's leave as-is. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Depending on how much magic (tm) is acceptable, There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nah I'm fine using the strategy here, seems fine to me! |
||
}) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Was this intended to be
stream_yield!(s)
?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah so it looks like this was changed during discussion?
I'd personally prefer to only yield the item type from
stream_yield!
. Our eventual desired semantics forStream
is that anErr
fuses (ends) the stream as well as returningNone
, so I'm hoping we can get a jump start on that here by only allowing yielding items and usingreturn Err(...)
for errors.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Awesome, that's a change I'm very much in favour of. For some reason I got the impression from the linked issue that that wouldn't be changing any time soon, if it is a concrete plan then I can definitely switch this back.