Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

light-client: Add experimental light-client support #965

Merged
merged 88 commits into from
Jun 26, 2023
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
88 commits
Select commit Hold shift + click to select a range
68e239a
rpc/types: Decode `SubstrateTxStatus` for substrate and smoldot
lexnv May 18, 2023
e54aa6a
lightclient: Add light client Error
lexnv May 18, 2023
aaace2c
lightclient: Add background task to manage RPC responses
lexnv May 18, 2023
e9a6491
lightclient: Implement the light client RPC in subxt
lexnv May 18, 2023
2efdef1
subxt: Expose light client under experimental feature-flag
lexnv May 18, 2023
f715092
artifacts: Add development chain spec for local nodes
lexnv May 18, 2023
0dc47f2
Update cargo lock
lexnv May 18, 2023
d59a331
examples: Add light client example
lexnv May 18, 2023
d25c133
Update sp-* crates and smoldot to use git with branch / rev
lexnv May 22, 2023
96b428c
Apply cargo fmt
lexnv May 22, 2023
5a04b21
Fix clippy
lexnv May 22, 2023
4140615
Import hashmap entry
lexnv May 22, 2023
c6668f6
lightclient: Fetch spec only if jsonrpsee feature is enabled
lexnv May 22, 2023
890a0e1
Update subxt/src/rpc/lightclient/background.rs
lexnv May 22, 2023
5b15e2d
Fix typo
lexnv May 22, 2023
01549d0
artifacts: Update dev chain spec
lexnv May 25, 2023
922e19a
types: Handle storage replies from chainHead_storage
lexnv May 25, 2023
14b6013
artifacts: Add polkadot spec
lexnv May 25, 2023
c241506
lightclient: Handle RPC error responses
lexnv May 25, 2023
973c423
examples: Tx basic with light client for local nodes
lexnv May 25, 2023
1ba4b82
example: Light client coprehensive example for live chains
lexnv May 25, 2023
e470006
examples: Remove prior light client example
lexnv May 25, 2023
f33d4cf
feature: Rename experimental to unstable
lexnv May 25, 2023
b4c818c
Merge remote-tracking branch 'origin/master' into lexnv/light_client_…
lexnv May 25, 2023
8600842
book: Add light client section
lexnv May 25, 2023
8624d72
testing: Fix clippy
lexnv May 25, 2023
5aa0ac8
lightclient: Ignore validated events
lexnv May 25, 2023
d6f69ef
Adjust tests for light-clients and normal clients
lexnv May 25, 2023
f405c89
testing: Keep lightclient variant
lexnv May 25, 2023
8e46da7
Remove support for chainHead_storage for light client
lexnv May 25, 2023
df12146
Update light client to point to crates.io
lexnv May 26, 2023
c03a93e
Merge remote-tracking branch 'origin/master' into lexnv/light_client_…
lexnv May 31, 2023
77cfeb7
Update sp-crates from crates.io
lexnv May 31, 2023
20f314b
Replace Atomic with u64
lexnv Jun 14, 2023
d4095a3
Add LightClientBuilder
lexnv Jun 14, 2023
be80883
Adjust chainspec with provided bootnodes
lexnv Jun 14, 2023
a010ace
Add potential_relay_chains to light client builder
lexnv Jun 14, 2023
3141161
Merge remote-tracking branch 'origin/lexnv/light_client_support' into…
lexnv Jun 14, 2023
4780966
Move the light-client to the background task
lexnv Jun 15, 2023
86344e7
Adjust tracing logs
lexnv Jun 15, 2023
9d7e452
Merge remote-tracking branch 'origin/master' into lexnv/light_client_…
lexnv Jun 15, 2023
b8678df
Update book and example
lexnv Jun 15, 2023
e8abb7c
Apply cargo fmt
lexnv Jun 15, 2023
f3f7bc5
Remove dev_spec.json artifact
lexnv Jun 15, 2023
65dbf3d
Examples fix duplicate Cargo.toml
lexnv Jun 15, 2023
2069d53
Use tracing_subscriber crate
lexnv Jun 15, 2023
ddcd5a1
Fix clippy for different features
lexnv Jun 16, 2023
e4757ae
Add comment about bootNodes
lexnv Jun 16, 2023
2421005
Add comment about tracing-sub dependency
lexnv Jun 16, 2023
a0a716d
Run integration-tests with light-client
lexnv Jun 20, 2023
2b2cd40
Feature guard some incompatible tests
lexnv Jun 20, 2023
8247481
ci: Enable light-client tests under feature flag
lexnv Jun 20, 2023
7843be6
Merge remote-tracking branch 'origin/master' into lexnv/light_client_…
lexnv Jun 20, 2023
5f9050c
ci: Fix git step name
lexnv Jun 20, 2023
a646d32
Adjust flags for testing
lexnv Jun 20, 2023
f8a4ac0
Adjust warnings
lexnv Jun 20, 2023
51aec8d
Rename feature flag jsonrpsee-ws to jsonrpsee
lexnv Jun 21, 2023
88447d4
Fix cargo check
lexnv Jun 21, 2023
a167609
ci: Run tests on just 2 threads
lexnv Jun 21, 2023
24aaeee
Move light-client to subxt/src/client
lexnv Jun 21, 2023
90b455d
Adjust LightClientBuilder
lexnv Jun 21, 2023
97d983c
Use ws_url to construct light client for testing
lexnv Jun 21, 2023
27172e7
Refactor background
lexnv Jun 21, 2023
f4e893a
Address feedback
lexnv Jun 21, 2023
7421e25
Remove polkadot.spec and trim sub_id
lexnv Jun 21, 2023
97ce4b2
Wait for substrate to produce block before connecting light client
lexnv Jun 21, 2023
0be66ff
Adjust builder and tests
lexnv Jun 21, 2023
7ec3302
Merge remote-tracking branch 'origin/master' into lexnv/light_client_…
lexnv Jun 21, 2023
aa6ef52
Apply fmt
lexnv Jun 21, 2023
bbd5474
ci: Use release for light client testing
lexnv Jun 21, 2023
3b8b21a
Add single test for light-client
lexnv Jun 22, 2023
608e225
Wait for more blocks
lexnv Jun 22, 2023
56ba7dc
Use polkadot endpoint for testing
lexnv Jun 22, 2023
21ebb6d
Adjust cargo check
lexnv Jun 22, 2023
45d35c1
examples: Remove light client chain connection example
lexnv Jun 22, 2023
c5054ed
Adjust cargo.toml section for the old example
lexnv Jun 22, 2023
32b9212
Adjust background task to use usize for subscription Id
lexnv Jun 23, 2023
e84cc09
Build bootnodes with serde_json::Value directly
lexnv Jun 23, 2023
0bde689
Make channel between subxt user and subxt background unbounded
lexnv Jun 23, 2023
4d53500
Update subxt/src/client/lightclient/builder.rs
lexnv Jun 23, 2023
0adfed4
Switch to smoldot 0.6.0 from 0.5.0
lexnv Jun 23, 2023
3f01f92
Move testing to `full_client` and `light_client` higher modules
lexnv Jun 23, 2023
5b586a5
Remove subscriptionID type
lexnv Jun 26, 2023
836c173
Remove subxt/integration-testing feature flag
lexnv Jun 26, 2023
1497478
Adjust wait_for_blocks documentation
lexnv Jun 26, 2023
5e39e41
Adjust utils import for testing
lexnv Jun 26, 2023
88dfb6b
Remove into_iter from builder construction
lexnv Jun 26, 2023
e88b6ef
Merge remote-tracking branch 'origin/master' into lexnv/light_client_…
lexnv Jun 26, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,395 changes: 1,223 additions & 172 deletions Cargo.lock

Large diffs are not rendered by default.

5 changes: 5 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,11 @@ wabt = "0.10.0"
wasm-bindgen-test = "0.3.24"
which = "4.4.0"

# Light client support:
smoldot-light = { git = "https://github.com/smol-dot/smoldot.git", default-features = false }
niklasad1 marked this conversation as resolved.
Show resolved Hide resolved
tokio-stream = "0.1.14"
futures-util = "0.3.28"

# Substrate crates:
sp-core = { version = "20.0.0", default-features = false }
sp-core-hashing = "8.0.0"
Expand Down
178 changes: 178 additions & 0 deletions artifacts/dev_spec.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ homepage.workspace = true
description = "Subxt example usage"

[dev-dependencies]
subxt = { workspace = true }
subxt = { workspace = true, default-features = false, features = ["default", "experimental-light-client"]}
tokio = { workspace = true }
futures = { workspace = true }
hex = { workspace = true }
Expand Down
38 changes: 38 additions & 0 deletions examples/examples/tx_basic_light_client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
use sp_keyring::AccountKeyring;
use subxt::rpc::LightClient;
use subxt::{tx::PairSigner, OnlineClient, PolkadotConfig};

use std::sync::Arc;

// Generate an interface that we can use from the node's metadata.
#[subxt::subxt(runtime_metadata_path = "../artifacts/polkadot_metadata_small.scale")]
pub mod polkadot {}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Create a light client from the provided chain spec.
let light_client = LightClient::new(include_str!("../../artifacts/dev_spec.json"))?;
let api = OnlineClient::<PolkadotConfig>::from_rpc_client(Arc::new(light_client)).await?;

// Build a balance transfer extrinsic.
let dest = AccountKeyring::Bob.to_account_id().into();
let balance_transfer_tx = polkadot::tx().balances().transfer(dest, 10_000);

// Submit the balance transfer extrinsic from Alice, and wait for it to be successful
// and in a finalized block. We get back the extrinsic events if all is well.
let from = PairSigner::new(AccountKeyring::Alice.pair());
let events = api
.tx()
.sign_and_submit_then_watch_default(&balance_transfer_tx, &from)
.await?
.wait_for_finalized_success()
.await?;

// Find a Transfer event and print it.
let transfer_event = events.find_first::<polkadot::balances::events::Transfer>()?;
if let Some(event) = transfer_event {
println!("Balance transfer success: {event:?}");
}

Ok(())
}
16 changes: 16 additions & 0 deletions subxt/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,16 @@ jsonrpsee-web = ["jsonrpsee/async-wasm-client", "jsonrpsee/client-web-transport"
# latest features exposed by the metadata.
unstable-metadata = []

# Activate this to expose the Light Client functionality.
# Note that this feature is experimental and things may break or not work as expected.
experimental-light-client = [
lexnv marked this conversation as resolved.
Show resolved Hide resolved
"smoldot-light/std",
"tokio-stream",
"tokio/sync",
"tokio/rt",
"futures-util",
]

[dependencies]
codec = { package = "parity-scale-codec", workspace = true, features = ["derive"] }
scale-info = { workspace = true }
Expand Down Expand Up @@ -78,6 +88,12 @@ sp-runtime = { workspace = true, optional = true }
subxt-macro = { workspace = true }
subxt-metadata = { workspace = true }

# Light client support:
smoldot-light = { workspace = true, optional = true }
tokio = { workspace = true, optional = true }
tokio-stream = { workspace = true, optional = true }
futures-util = { workspace = true, optional = true }

[target.wasm32-unknown-unknown.dependencies]
getrandom = { workspace = true, features = ["js"] }

Expand Down
7 changes: 7 additions & 0 deletions subxt/src/error/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ mod dispatch_error;

use core::fmt::Debug;

#[cfg(feature = "experimental-light-client")]
pub use crate::rpc::LightClientError;

// Re-export dispatch error types:
pub use dispatch_error::{
ArithmeticError, DispatchError, ModuleError, RawModuleError, TokenError, TransactionalError,
Expand Down Expand Up @@ -63,6 +66,10 @@ pub enum Error {
/// The bytes representing an error that we were unable to decode.
#[error("An error occurred but it could not be decoded: {0:?}")]
Unknown(Vec<u8>),
/// Light client error.
#[cfg(feature = "experimental-light-client")]
#[error("An error occurred but it could not be decoded: {0:?}")]
LightClient(#[from] LightClientError),
/// Other error.
#[error("Other error: {0}")]
Other(String),
Expand Down
275 changes: 275 additions & 0 deletions subxt/src/rpc/lightclient/background.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,275 @@
use futures::stream::StreamExt;
use futures_util::future::{self, Either};
use serde::Deserialize;
use serde_json::value::RawValue;
use std::{collections::HashMap, str::FromStr};
use tokio::sync::{mpsc, oneshot};

///! The background task of the light client.

/// The number of notifications that are buffered, before the user
/// registers to [`LightClientInner::register_subscription`].
/// Only the first `BUFFER_NUM_NOTIFICATIONS` are buffered, while the
/// others are ignored.
///
/// In practice, the Light Client produces notifications at a lower rate
/// than the substrate full node.
const BUFFER_NUM_NOTIFICATIONS: usize = 16;

const LOG_TARGET: &str = "light-client-background";

/// Message protocol between the front-end client that submits the RPC requests
/// and the backend handler that produces responses from the chain.
///
/// The light client uses a single object [`smoldot_light::JsonRpcResponses`] to
/// handle all requests and subscriptions from a chain. A background task is spawned
/// to multiplex the rpc responses and to provide them back to their rightful submitters.
///
/// This presumes that the request ID for both method calls and subscriptions is unique.
#[derive(Debug)]
pub enum BackendMessage {
/// The RPC method request.
Request {
/// ID of the request, needed to match the result.
id: String,
/// Channel used to send back the result.
sender: oneshot::Sender<Box<RawValue>>,
},
/// The RPC subscription (pub/sub) request.
Subscription {
/// ID of the subscription, needed to match the result.
id: String,
/// Channel used to send back the notifcations.
sender: mpsc::Sender<Box<RawValue>>,
},
}

/// Background task data.
#[derive(Default)]
pub struct BackgroundTask {
/// Map the request ID of a RPC method to the frontend `Sender`.
requests: HashMap<String, oneshot::Sender<Box<RawValue>>>,
/// Map the subscription ID to the frontend `Sender`.
subscriptions: HashMap<String, mpsc::Sender<Box<RawValue>>>,
/// Notifications are cached for users that did not subscribe yet.
subscriptions_cache: HashMap<String, Vec<Box<RawValue>>>,
}

impl BackgroundTask {
/// Constructs a new [`BackgroundTask`].
pub fn new() -> BackgroundTask {
BackgroundTask::default()
}

/// Handle the registration messages received from the user.
async fn handle_register(&mut self, message: BackendMessage) {
match message {
BackendMessage::Request { id, sender } => {
self.requests.insert(id, sender);
}
BackendMessage::Subscription { id, sender } => {
// Drain the subscription cache, that holds messages that
niklasad1 marked this conversation as resolved.
Show resolved Hide resolved
// were not propagated to this subscription yet (because the
// RPC server produced a notification before the user registered
// to receive notifications).
if let Some(cached_responses) = self.subscriptions_cache.remove(&id) {
tracing::debug!(target: LOG_TARGET, "Some messages were cached before susbcribing");
niklasad1 marked this conversation as resolved.
Show resolved Hide resolved

for response in cached_responses {
if sender.send(response).await.is_err() {
tracing::warn!(target: LOG_TARGET, "Cannot send notification to susbcription {:?}", id);
niklasad1 marked this conversation as resolved.
Show resolved Hide resolved
}
}
}

self.subscriptions.insert(id, sender);
}
};
}

/// Parse the response received from the light client and sent it to the appropriate user.
async fn handle_rpc_response(&mut self, response: String) {
match RpcResponse::from_str(&response) {
Ok(RpcResponse::Method { id, result }) => {
// Send the response back.
if let Some(sender) = self.requests.remove(&id) {
if sender.send(result).is_err() {
tracing::warn!(target: LOG_TARGET, " Cannot send method response to id {:?}", id);
}
}
}
Ok(RpcResponse::Subscription { method, id, result }) => {
// Subxt calls into `author_submitAndWatchExtrinsic`, however the smoldot produces
// `{"event":"broadcasted","numPeers":1}` which is part of the RPC V2 API. Ignore
// this spurious event.
lexnv marked this conversation as resolved.
Show resolved Hide resolved
if method == "transaction_unstable_watchEvent"
&& result.to_string().contains("broadcasted")
{
tracing::debug!(target: LOG_TARGET, "Ignoring notification {:?}", result);
return;
}

if let Some(sender) = self.subscriptions.get_mut(&id) {
// Send the current notification response.
if sender.send(result).await.is_err() {
tracing::warn!(target: LOG_TARGET, "Cannot send notification to susbcription {:?}", id);
}
return;
}

// Subscription ID not registered yet, cache the response.
// Note: Compiler complains about moving the `result` for `.entry.and_modify(..).or_insert(..)`,
// because it sees it as used on both closures and apparently cannot determine that only one
// closure can be executed.
match self.subscriptions_cache.entry(id) {
std::collections::hash_map::Entry::Occupied(mut entry) => {
niklasad1 marked this conversation as resolved.
Show resolved Hide resolved
let cached_responses: &mut Vec<_> = entry.get_mut();
// Do not cache notification if exceeded capacity.
if cached_responses.len() > BUFFER_NUM_NOTIFICATIONS {
return;
}

cached_responses.push(result);
}
std::collections::hash_map::Entry::Vacant(entry) => {
let mut vec = Vec::with_capacity(BUFFER_NUM_NOTIFICATIONS);
vec.push(result);
entry.insert(vec);
}
};
}
Err(err) => {
tracing::warn!(target: LOG_TARGET, "annot decode RPC response {:?}", err);
lexnv marked this conversation as resolved.
Show resolved Hide resolved
}
}
}

/// Perform the main background task:
/// - receiving user registration for RPC method / subscriptions
/// - providing the results from the light client back to users.
pub async fn start_task(
&mut self,
backend: mpsc::Receiver<BackendMessage>,
rpc_responses: smoldot_light::JsonRpcResponses,
lexnv marked this conversation as resolved.
Show resolved Hide resolved
) {
let backend_event = tokio_stream::wrappers::ReceiverStream::new(backend);
let rpc_responses_event =
futures_util::stream::unfold(rpc_responses, |mut rpc_responses| async {
rpc_responses
.next()
.await
.map(|result| (result, rpc_responses))
});

tokio::pin!(backend_event, rpc_responses_event);

let mut backend_event_fut = backend_event.next();
let mut rpc_responses_fut = rpc_responses_event.next();

loop {
match future::select(backend_event_fut, rpc_responses_fut).await {
// Message received from the backend: user registered.
Either::Left((backend_value, previous_fut)) => {
let Some(message) = backend_value else {
tracing::trace!(target: LOG_TARGET, "Frontend channel closed");
lexnv marked this conversation as resolved.
Show resolved Hide resolved
break;
};
tracing::trace!(target: LOG_TARGET, "Received register message {:?}", message);

self.handle_register(message).await;

backend_event_fut = backend_event.next();
rpc_responses_fut = previous_fut;
}
// Message received from rpc handler: lightclient response.
Either::Right((response, previous_fut)) => {
// Smoldot returns `None` if the chain has been removed (which subxt does not remove).
let Some(response) = response else {
tracing::trace!(target: LOG_TARGET, "Smoldot RPC responses channel closed");
break;
};
tracing::trace!(target: LOG_TARGET, "Received smoldot RPC result {:?}", response);

self.handle_rpc_response(response).await;

// Advance backend, save frontend.
backend_event_fut = previous_fut;
rpc_responses_fut = rpc_responses_event.next();
}
}
}

tracing::trace!(target: LOG_TARGET, "Task closed");
}
}

/// The RPC response from the light-client.
/// This can either be a response of a method, or a notification from a subscription.
#[derive(Debug, Clone)]
enum RpcResponse {
niklasad1 marked this conversation as resolved.
Show resolved Hide resolved
Method {
/// Response ID.
id: String,
/// The result of the method call.
result: Box<RawValue>,
},
Subscription {
/// RPC method that generated the notification.
method: String,
/// Subscription ID.
id: String,
/// Result.
result: Box<RawValue>,
},
}

impl std::str::FromStr for RpcResponse {
type Err = serde_json::Error;

fn from_str(response: &str) -> Result<Self, Self::Err> {
// Helper structures to deserialize from raw RPC strings.
#[derive(Deserialize, Debug)]
struct Response {
/// JSON-RPC version.
#[allow(unused)]
jsonrpc: String,
/// Result.
result: Box<RawValue>,
/// Request ID
id: String,
}
#[derive(Deserialize)]
struct NotificationParams {
/// The ID of the subscription.
subscription: String,
/// Result.
result: Box<RawValue>,
}
#[derive(Deserialize)]
struct ResponseNotification {
/// JSON-RPC version.
#[allow(unused)]
jsonrpc: String,
/// RPC method that generated the notification.
method: String,
/// Result.
params: NotificationParams,
}

// Check if the response can be mapped as an RPC method response.
let result: Result<Response, _> = serde_json::from_str(&response);
if let Ok(response) = result {
return Ok(RpcResponse::Method {
id: response.id,
result: response.result,
});
}

let notification: ResponseNotification = serde_json::from_str(&response)?;
Ok(RpcResponse::Subscription {
id: notification.params.subscription,
method: notification.method,
result: notification.params.result,
})
}
}
Loading