Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add chainHead RPC methods #766

Merged
merged 8 commits into from
Jan 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
163 changes: 162 additions & 1 deletion subxt/src/rpc/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,11 @@

use super::{
rpc_params,
types,
types::{
self,
ChainHeadEvent,
FollowEvent,
},
RpcClient,
RpcClientT,
Subscription,
Expand Down Expand Up @@ -465,6 +469,163 @@ impl<T: Config> Rpc<T> {
self.client.request("system_dryRun", params).await?;
Ok(types::decode_dry_run_result(&mut &*result_bytes.0)?)
}

/// Subscribe to `chainHead_unstable_follow` to obtain all reported blocks by the chain.
///
/// The subscription ID can be used to make queries for the
/// block's body ([`chainhead_unstable_body`](Rpc::chainhead_unstable_follow)),
/// block's header ([`chainhead_unstable_header`](Rpc::chainhead_unstable_header)),
/// block's storage ([`chainhead_unstable_storage`](Rpc::chainhead_unstable_storage)) and submitting
/// runtime API calls at this block ([`chainhead_unstable_call`](Rpc::chainhead_unstable_call)).
///
/// # Note
///
/// When the user is no longer interested in a block, the user is responsible
/// for calling the [`chainhead_unstable_unpin`](Rpc::chainhead_unstable_unpin) method.
/// Failure to do so will result in the subscription being stopped by generating the `Stop` event.
pub async fn chainhead_unstable_follow(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could be nice to have a separate module or something for the v2 impls but perhaps _unstable is sufficient

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the longer run, hopefully we will be able to remove all of the "old" methods and only have these new ones on this Rpc type :)

&self,
runtime_updates: bool,
) -> Result<Subscription<FollowEvent<T::Hash>>, Error> {
let subscription = self
.client
.subscribe(
"chainHead_unstable_follow",
rpc_params![runtime_updates],
"chainHead_unstable_unfollow",
)
.await?;

Ok(subscription)
}

/// Subscribe to `chainHead_unstable_body` to obtain events regarding the block's body.
///
/// # Note
///
/// The subscription ID is obtained from an open subscription created by
/// [`chainhead_unstable_follow`](Rpc::chainhead_unstable_follow).
pub async fn chainhead_unstable_body(
&self,
subscription_id: String,
hash: T::Hash,
) -> Result<Subscription<ChainHeadEvent<String>>, Error> {
let subscription = self
.client
.subscribe(
"chainHead_unstable_body",
rpc_params![subscription_id, hash],
"chainHead_unstable_stopBody",
)
.await?;

Ok(subscription)
}

/// Get the block's body using the `chainHead_unstable_header` method.
///
/// # Note
///
/// The subscription ID is obtained from an open subscription created by
/// [`chainhead_unstable_follow`](Rpc::chainhead_unstable_follow).
pub async fn chainhead_unstable_header(
&self,
subscription_id: String,
hash: T::Hash,
) -> Result<Option<String>, Error> {
let header = self
.client
.request(
"chainHead_unstable_header",
rpc_params![subscription_id, hash],
)
.await?;

Ok(header)
}

/// Subscribe to `chainHead_storage` to obtain events regarding the
/// block's storage.
///
/// # Note
///
/// The subscription ID is obtained from an open subscription created by
/// [`chainhead_unstable_follow`](Rpc::chainhead_unstable_follow).
pub async fn chainhead_unstable_storage(
&self,
subscription_id: String,
hash: T::Hash,
key: &[u8],
child_key: Option<&[u8]>,
) -> Result<Subscription<ChainHeadEvent<Option<String>>>, Error> {
let subscription = self
.client
.subscribe(
"chainHead_unstable_storage",
rpc_params![subscription_id, hash, to_hex(key), child_key.map(to_hex)],
"chainHead_unstable_stopStorage",
)
.await?;

Ok(subscription)
}

/// Subscribe to `chainHead_call` to obtain events regarding the
/// runtime API call.
///
/// # Note
///
/// The subscription ID is obtained from an open subscription created by
/// [`chainhead_unstable_follow`](Rpc::chainhead_unstable_follow).
pub async fn chainhead_unstable_call(
&self,
subscription_id: String,
hash: T::Hash,
function: String,
call_parameters: &[u8],
) -> Result<Subscription<ChainHeadEvent<String>>, Error> {
let subscription = self
.client
.subscribe(
"chainHead_unstable_call",
rpc_params![subscription_id, hash, function, to_hex(call_parameters)],
"chainHead_unstable_stopCall",
)
.await?;

Ok(subscription)
}

/// Unpin a block reported by the `chainHead_follow` subscription.
///
/// # Note
///
/// The subscription ID is obtained from an open subscription created by
/// [`chainhead_unstable_follow`](Rpc::chainhead_unstable_follow).
pub async fn chainhead_unstable_unpin(
&self,
subscription_id: String,
hash: T::Hash,
) -> Result<(), Error> {
self.client
.request(
"chainHead_unstable_unpin",
rpc_params![subscription_id, hash],
)
.await?;

Ok(())
}

/// Get genesis hash obtained from the `chainHead_genesisHash` method.
pub async fn chainhead_unstable_genesishash(&self) -> Result<T::Hash, Error> {
let hash = self
.client
.request("chainHead_unstable_genesisHash", rpc_params![])
.await?;

Ok(hash)
}
}

fn to_hex(bytes: impl AsRef<[u8]>) -> String {
Expand Down
Loading