Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

register_blocking_method #523

Merged
merged 10 commits into from
Oct 15, 2021
5 changes: 4 additions & 1 deletion proc-macros/src/render_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,8 +153,11 @@ impl RpcDescription {
#resources
})
} else {
let register_kind =
if method.blocking { quote!(register_blocking_method) } else { quote!(register_method) };

handle_register_result(quote! {
rpc.register_method(#rpc_method_name, |params, context| {
rpc.#register_kind(#rpc_method_name, |params, context| {
#parsing
context.#rust_method_name(#params_seq)
})
Expand Down
12 changes: 10 additions & 2 deletions proc-macros/src/rpc_macro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,13 @@ use crate::{

use proc_macro2::TokenStream as TokenStream2;
use quote::quote;
use syn::spanned::Spanned;
use syn::{punctuated::Punctuated, Attribute, Token};

#[derive(Debug, Clone)]
pub struct RpcMethod {
pub name: String,
pub blocking: bool,
pub docs: TokenStream2,
pub params: Vec<(syn::PatIdent, syn::Type)>,
pub returns: Option<syn::Type>,
Expand All @@ -48,15 +50,21 @@ pub struct RpcMethod {

impl RpcMethod {
pub fn from_item(attr: Attribute, mut method: syn::TraitItemMethod) -> syn::Result<Self> {
let [aliases, name, resources] = AttributeMeta::parse(attr)?.retain(["aliases", "name", "resources"])?;
let [aliases, blocking, name, resources] =
AttributeMeta::parse(attr)?.retain(["aliases", "blocking", "name", "resources"])?;

let aliases = parse_aliases(aliases)?;
let blocking = optional(blocking, Argument::flag)?.is_some();
let name = name?.string()?;
let resources = optional(resources, Argument::group)?.unwrap_or_default();

let sig = method.sig.clone();
let docs = extract_doc_comments(&method.attrs);

if blocking && sig.asyncness.is_some() {
return Err(syn::Error::new(sig.span(), "Blocking method must be synchronous"));
}

let params: Vec<_> = sig
.inputs
.into_iter()
Expand All @@ -77,7 +85,7 @@ impl RpcMethod {
// We've analyzed attributes and don't need them anymore.
method.attrs.clear();

Ok(Self { aliases, name, params, returns, signature: method, docs, resources })
Ok(Self { aliases, blocking, name, params, returns, signature: method, docs, resources })
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
error: Unknown argument `magic`, expected one of: `aliases`, `name`, `resources`
error: Unknown argument `magic`, expected one of: `aliases`, `blocking`, `name`, `resources`
--> $DIR/method_unexpected_field.rs:6:25
|
6 | #[method(name = "foo", magic = false)]
Expand Down
27 changes: 27 additions & 0 deletions tests/tests/proc_macros.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
//! Example of using proc macro to generate working client and server.

use std::net::SocketAddr;
use std::time::{Duration, Instant};

use jsonrpsee::{ws_client::*, ws_server::WsServerBuilder};
use serde_json::value::RawValue;
Expand Down Expand Up @@ -77,6 +78,12 @@ mod rpc_impl {
fn zero_copy_cow(&self, a: std::borrow::Cow<'_, str>, b: beef::Cow<'_, str>) -> RpcResult<String> {
Ok(format!("Zero copy params: {}, {}", matches!(a, std::borrow::Cow::Borrowed(_)), b.is_borrowed()))
}

#[method(name = "blocking_call", blocking)]
fn blocking_call(&self) -> RpcResult<u32> {
std::thread::sleep(std::time::Duration::from_millis(50));
Ok(42)
}
}

#[rpc(client, server, namespace = "chain")]
Expand Down Expand Up @@ -277,3 +284,23 @@ async fn macro_zero_copy_cow() {

assert_eq!(result, r#"{"jsonrpc":"2.0","result":"Zero copy params: false, false","id":0}"#);
}

#[tokio::test]
async fn multiple_blocking_calls_overlap() {
let module = RpcServerImpl.into_rpc();

let params = RawValue::from_string("[]".into()).ok();
// MacOS CI has very limited number of cores, so running on 2 to be safe
let futures = std::iter::repeat_with(|| module.call("foo_blocking_call", params.clone())).take(2);
let now = Instant::now();
let results = futures::future::join_all(futures).await;
let elapsed = now.elapsed();

for result in results {
let result = serde_json::from_str::<serde_json::Value>(&result.unwrap()).unwrap();
assert_eq!(result["result"], 42);
}

// Each request takes 50ms, added 10ms margin for scheduling
assert!(elapsed < Duration::from_millis(60), "Expected less than 60ms, got {:?}", elapsed);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm hard pressed to come up with something better than this. It'll have to make do.

Copy link
Contributor Author

@maciejhirsz maciejhirsz Oct 14, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right now on MacOS CI 50ms + 50ms in parallel = 160ms though :D

}
1 change: 1 addition & 0 deletions utils/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ rand = { version = "0.8", optional = true }
serde = { version = "1.0", default-features = false, features = ["derive"], optional = true }
serde_json = { version = "1", features = ["raw_value"], optional = true }
parking_lot = { version = "0.11", optional = true }
tokio = { version = "1", features = ["rt"] }
maciejhirsz marked this conversation as resolved.
Show resolved Hide resolved

[features]
default = []
Expand Down
37 changes: 37 additions & 0 deletions utils/src/server/rpc_module.rs
Original file line number Diff line number Diff line change
Expand Up @@ -466,6 +466,43 @@ impl<Context: Send + Sync + 'static> RpcModule<Context> {
Ok(MethodResourcesBuilder { build: ResourceVec::new(), callback })
}

/// Register a new **blocking** synchronous RPC method, which computes the response with the given callback.
/// Unlike the regular [`register_method`](RpcModule::register_method), this method can block its thread and perform expensive computations.
pub fn register_blocking_method<R, F>(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Soon we'll be at a point where we should have a macro for all these register_* methods... :)

&mut self,
method_name: &'static str,
callback: F,
) -> Result<MethodResourcesBuilder, Error>
where
Context: Send + Sync + 'static,
R: Serialize,
F: Fn(Params, Arc<Context>) -> Result<R, Error> + Copy + Send + Sync + 'static,
{
let ctx = self.ctx.clone();
let callback = self.methods.verify_and_insert(
method_name,
MethodCallback::new_async(Arc::new(move |id, params, tx, _conn_id, claimed| {
let ctx = ctx.clone();

tokio::task::spawn_blocking(move || {
match callback(params, ctx) {
Ok(res) => send_response(id, &tx, res),
Err(err) => send_call_error(id, &tx, err),
};

// Release claimed resources
drop(claimed);
})
.map(|err| {
log::error!("Join error for blocking RPC method: {:?}", err);
})
.boxed()
})),
)?;

Ok(MethodResourcesBuilder { build: ResourceVec::new(), callback })
}

/// Register a new RPC subscription that invokes callback on every subscription request.
/// The callback itself takes three parameters:
/// - [`Params`]: JSONRPC parameters in the subscription request.
Expand Down