Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rpc: add full support reconnecting rpc client #1505

Merged
merged 78 commits into from
May 8, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
78 commits
Select commit Hold shift + click to select a range
476afd9
add simple reconnecting rpc client
niklasad1 Mar 21, 2024
0bf7299
Merge remote-tracking branch 'origin/master' into na-reconnecting-rpc…
niklasad1 Mar 26, 2024
9f6d38b
initial retryable calls
niklasad1 Mar 27, 2024
c107e14
add reconnecting backend
niklasad1 Mar 28, 2024
4208599
add reconnecting example for unstable backend
niklasad1 Mar 28, 2024
1ecc4a0
add todo what isn't working
niklasad1 Mar 28, 2024
ffabc04
FollowStream: restart on reconn
niklasad1 Apr 9, 2024
4ba4d75
naive fix: fetch sub_id in stream_headers
niklasad1 Apr 10, 2024
76cbd71
cleanup
niklasad1 Apr 11, 2024
e71a583
remove resubscribe APIs
niklasad1 Apr 11, 2024
2457d3a
cleanup and remove many wrapper streams
niklasad1 Apr 12, 2024
25fa3b1
remove retry backend
niklasad1 Apr 15, 2024
39dcea4
legacy rpc: make it retryable
niklasad1 Apr 15, 2024
a5ae924
unstable rpc: make it retryable
niklasad1 Apr 16, 2024
f27a7f4
Merge remote-tracking branch 'origin/master' into na-reconnecting-rpc…
niklasad1 Apr 16, 2024
5822c40
fix nits
niklasad1 Apr 16, 2024
adc35b9
support wasm as well
niklasad1 Apr 17, 2024
643af0b
remove deadcode
niklasad1 Apr 17, 2024
825e3b3
address grumbles
niklasad1 Apr 18, 2024
6027072
revert rpc methods
niklasad1 Apr 18, 2024
7b01f1f
don't create a subscription per block
niklasad1 Apr 18, 2024
2bfede2
get rid off retry logic in subxt rpc
niklasad1 Apr 19, 2024
b14f144
Update subxt/Cargo.toml
niklasad1 Apr 19, 2024
284abdb
Update subxt/src/backend/legacy/mod.rs
niklasad1 Apr 19, 2024
77bb9fd
Update subxt/src/backend/legacy/mod.rs
niklasad1 Apr 19, 2024
43c4cf1
remove outdated comments
niklasad1 Apr 19, 2024
0dfaeea
Merge remote-tracking branch 'origin/na-reconnecting-rpc-client-v2' i…
niklasad1 Apr 19, 2024
be93cdd
Merge branch 'master' into na-reconnecting-rpc-client-v2
niklasad1 Apr 22, 2024
85285c7
fix bad merge
niklasad1 Apr 22, 2024
eb84437
Merge remote-tracking branch 'origin/master' into na-reconnecting-rpc…
niklasad1 Apr 22, 2024
d645aee
Fix reconnecting RPC client and update dependencies
niklasad1 Apr 22, 2024
3cee6e9
add back retry logic and remove `finito`
niklasad1 Apr 22, 2024
2dcac78
fix nits
niklasad1 Apr 22, 2024
91b2804
cleanup
niklasad1 Apr 22, 2024
d673203
add hack for race when reconnecting
niklasad1 Apr 23, 2024
802f7ee
backend: emit Stop event DisconnectWillRecoonect
niklasad1 Apr 23, 2024
2794ac1
merge reconnecting client examples
niklasad1 Apr 23, 2024
80c2396
Merge remote-tracking branch 'origin/master' into na-reconnecting-rpc…
niklasad1 Apr 23, 2024
7b4a9fe
add fn retry_stream
niklasad1 Apr 23, 2024
44b15ff
cleanup
niklasad1 Apr 24, 2024
a369f26
add all features from reconnecting-rpc-client
niklasad1 Apr 24, 2024
6261cb8
fix build
niklasad1 Apr 24, 2024
3dc77c2
Merge remote-tracking branch 'origin/master' into na-reconnecting-rpc…
niklasad1 Apr 25, 2024
7d6428f
remove needless retry for fetch_storage
niklasad1 Apr 25, 2024
628ee57
StorageFetchDescendantKeysStream handle disconnect err
niklasad1 Apr 25, 2024
0a837a4
dont retry transactions
niklasad1 Apr 25, 2024
55f68d5
fetch subscription ID from FollowStreamMsg
niklasad1 Apr 25, 2024
ada46e7
fix nits
niklasad1 Apr 25, 2024
3b4963a
Update subxt/src/backend/legacy/mod.rs
niklasad1 Apr 25, 2024
a1f3c0e
Update subxt/src/backend/legacy/mod.rs
niklasad1 Apr 25, 2024
97f9480
add reconn to StorageItems stream
niklasad1 Apr 25, 2024
a7ae56f
Merge remote-tracking branch 'origin/master' into na-reconnecting-rpc…
niklasad1 Apr 30, 2024
1bae24b
StorageFetchDescendantKeysStreamchore: retry storage call
niklasad1 Apr 30, 2024
7747382
RetryStream: emit DisconnectWillReconnect msg
niklasad1 Apr 30, 2024
425c7fa
runtime subscriptions ignore DisconnectWillReconn
niklasad1 Apr 30, 2024
ab9a6e3
Update subxt/examples/setup_reconnecting_rpc_client.rs
niklasad1 Apr 30, 2024
0d45b1b
Update subxt/src/client/online_client.rs
niklasad1 Apr 30, 2024
e9d171c
Update subxt/src/client/online_client.rs
niklasad1 Apr 30, 2024
1ba9814
Add custom stream wrapper for finalized blocks
niklasad1 May 2, 2024
5ebdfe2
add missing retry block
niklasad1 May 2, 2024
8deaba2
clippy
niklasad1 May 2, 2024
523904e
clippy again
niklasad1 May 2, 2024
9da32c9
cleanup
niklasad1 May 3, 2024
46d9879
remove duplicate logic
niklasad1 May 3, 2024
546425f
fix more grumbles
niklasad1 May 3, 2024
22e13f1
Update subxt/examples/setup_reconnecting_rpc_client.rs
niklasad1 May 3, 2024
1a986e5
simplify the example
niklasad1 May 4, 2024
abb7cf0
remove pin-project dep
niklasad1 May 4, 2024
8cce412
Merge remote-tracking branch 'origin/master' into na-reconnecting-rpc…
niklasad1 May 6, 2024
775d3fd
remove duplicate retry logic
niklasad1 May 7, 2024
d3faa21
remove extra code
niklasad1 May 7, 2024
59401b8
specify trait bounds for retry api
niklasad1 May 7, 2024
3b3e8e4
simplify the example
niklasad1 May 7, 2024
57881fa
fix weird Poll::Pending return
niklasad1 May 7, 2024
7bd55e7
fix nit in poll impl
niklasad1 May 8, 2024
c74d514
remove needless paths
niklasad1 May 8, 2024
27a2ea4
make retry_stream pub and add doc examples
niklasad1 May 8, 2024
64d3aae
Update subxt/src/backend/utils.rs
niklasad1 May 8, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ which = "5.0.0"
strip-ansi-escapes = "0.2.0"
proptest = "1.4.0"
hex-literal = "0.4.1"
pin-project-lite = "0.2.14"

# Light client support:
smoldot = { version = "0.16.0", default-features = false }
Expand Down
1 change: 1 addition & 0 deletions subxt/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ tracing = { workspace = true }
frame-metadata = { workspace = true }
either = { workspace = true }
instant = { workspace = true }
pin-project-lite = { workspace = true }

# Provides some deserialization, types like U256/H256 and hashing impls like twox/blake256:
impl-serde = { workspace = true }
Expand Down
74 changes: 73 additions & 1 deletion subxt/src/backend/unstable/follow_stream_driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@
use super::follow_stream_unpin::{BlockRef, FollowStreamMsg, FollowStreamUnpin};
use crate::backend::unstable::rpc_methods::{FollowEvent, Initialized, RuntimeEvent};
use crate::config::BlockHash;
use crate::error::Error;
use crate::error::{Error, RpcError};
use futures::stream::{Stream, StreamExt};
use pin_project_lite::pin_project;
use std::collections::{HashMap, HashSet, VecDeque};
use std::ops::DerefMut;
use std::pin::Pin;
Expand Down Expand Up @@ -379,6 +380,77 @@ struct SubscriberDetails<Hash: BlockHash> {
waker: Option<Waker>,
}

pin_project! {
niklasad1 marked this conversation as resolved.
Show resolved Hide resolved
/// A stream that subscribes to finalized blocks
/// and indicates whether a block was missed if the connection was lost.
#[derive(Debug)]
pub struct FollowStreamFinalizedHeads<Hash: BlockHash> {
#[pin]
stream: FollowStreamDriverSubscription<Hash>,
sub_id: Option<String>,
last_seen_block: Option<BlockRef<Hash>>,
}
}

impl<Hash: BlockHash> FollowStreamFinalizedHeads<Hash> {
pub fn new(stream: FollowStreamDriverSubscription<Hash>) -> Self {
Self {
stream,
sub_id: None,
last_seen_block: None,
}
}
}

impl<Hash: BlockHash> Stream for FollowStreamFinalizedHeads<Hash> {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks good to me!As a nit I'd be tempted to put it in a different file like the stream in storage_items.rs is (or put some of this stuff into a utils file or something) since it's not really a part of the follow stream driver code

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yepp, the thingy was that the FollowStreamSubscription wasn't exported outside this and I placed here to avoid sharing it with pub crate.

I can move it if it ok to make it pub crate :)

type Item = Result<(String, Vec<BlockRef<Hash>>), Error>;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.project();

loop {
match futures::ready!(this.stream.poll_next_unpin(cx)) {
Some(FollowStreamMsg::Ready(sub_id)) => {
*this.sub_id = Some(sub_id);
}
Some(FollowStreamMsg::Event(FollowEvent::Finalized(finalized))) => {
*this.last_seen_block = finalized.finalized_block_hashes.last().cloned();
let sub_id = this
.sub_id
.clone()
.expect("Ready is always emitted before Finalized; qed");
return Poll::Ready(Some(Ok((
sub_id,
finalized.finalized_block_hashes.clone(),
))));
}
Some(FollowStreamMsg::Event(FollowEvent::Initialized(init))) => {
let prev = this.last_seen_block.take();
*this.last_seen_block = init.finalized_block_hashes.last().cloned();

if let Some(p) = prev {
if !init.finalized_block_hashes.contains(&p) {
return Poll::Ready(Some(Err(RpcError::DisconnectedWillReconnect(
"Missed at least one block when the connection was lost".to_owned(),
)
.into())));
}
}

let sub_id = this
.sub_id
.clone()
.expect("Ready is always emitted before Initialized; qed");

return Poll::Ready(Some(Ok((sub_id, init.finalized_block_hashes))));
}
// Ignore other events.
_ => (),
};
}
}
}

#[cfg(test)]
mod test_utils {
use super::super::follow_stream_unpin::test_utils::test_unpin_stream_getter;
Expand Down
2 changes: 1 addition & 1 deletion subxt/src/backend/unstable/follow_stream_unpin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -474,7 +474,7 @@ pub(super) mod test_utils {

pub type UnpinRx<Hash> = std::sync::mpsc::Receiver<(Hash, Arc<str>)>;

/// Get a `FolowStreamUnpin` from an iterator over events.
/// Get a [`FollowStreamUnpin`] from an iterator over events.
pub fn test_unpin_stream_getter<Hash, F, I>(
events: F,
max_life: usize,
Expand Down
43 changes: 35 additions & 8 deletions subxt/src/backend/unstable/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ mod storage_items;

pub mod rpc_methods;

use self::follow_stream_driver::FollowStreamFinalizedHeads;
use self::follow_stream_unpin::FollowStreamMsg;
use self::rpc_methods::{
FollowEvent, MethodResponse, RuntimeEvent, StorageQuery, StorageQueryType, StorageResultType,
Expand All @@ -31,6 +32,7 @@ use crate::error::{Error, RpcError};
use crate::Config;
use async_trait::async_trait;
use follow_stream_driver::{FollowStreamDriver, FollowStreamDriverHandle};
use futures::future::Either;
use futures::{Stream, StreamExt};
use std::collections::HashMap;
use std::task::Poll;
Expand Down Expand Up @@ -496,15 +498,40 @@ impl<T: Config + Send + Sync + 'static> Backend<T> for UnstableBackend<T> {
async fn stream_finalized_block_headers(
&self,
) -> Result<StreamOfResults<(T::Header, BlockRef<T::Hash>)>, Error> {
let stream = self
niklasad1 marked this conversation as resolved.
Show resolved Hide resolved
.stream_headers(|ev| match ev {
FollowEvent::Initialized(init) => init.finalized_block_hashes,
FollowEvent::Finalized(ev) => ev.finalized_block_hashes,
_ => vec![],
})
.await?;
let methods = self.methods.clone();

Ok(StreamOf::new(Box::pin(stream)))
let headers =
FollowStreamFinalizedHeads::new(self.follow_handle.subscribe()).flat_map(move |r| {
let methods = methods.clone();

let (sub_id, block_refs) = match r {
Ok(ev) => ev,
Err(e) => return Either::Left(futures::stream::once(async { Err(e) })),
};

Either::Right(
futures::stream::iter(block_refs).filter_map(move |block_ref| {
let methods = methods.clone();
let sub_id = sub_id.clone();

async move {
let res = methods
.chainhead_v1_header(&sub_id, block_ref.hash())
.await
.transpose()?;

let header = match res {
Ok(header) => header,
Err(e) => return Some(Err(e)),
};

Some(Ok((header, block_ref.into())))
}
}),
)
});

Ok(StreamOf::new(Box::pin(headers)))
}

async fn submit_transaction(
Expand Down
Loading