Skip to content

Commit

Permalink
rpc-v2/transaction: Generate Invalid events and add tests (#3784)
Browse files Browse the repository at this point in the history
This PR ensures that the transaction API generates an `Invalid` events
for transaction bytes that fail to decode.

The spec mentioned the `Invalid` event at the jsonrpc error section,
however this spec PR makes things clearer:
- paritytech/json-rpc-interface-spec#146

While at it have discovered an inconsistency with the generated events.
The drop event from the transaction pool was incorrectly mapped to the
`invalid` event.

Added tests for the API stabilize the API soon:
- paritytech/json-rpc-interface-spec#144


Closes: #3083


cc @paritytech/subxt-team

---------

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>
  • Loading branch information
lexnv committed Apr 9, 2024
1 parent a26d25d commit 598e955
Show file tree
Hide file tree
Showing 5 changed files with 224 additions and 22 deletions.
22 changes: 19 additions & 3 deletions substrate/client/rpc-spec-v2/src/chain_head/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,13 @@ use sp_runtime::{
};
use sp_version::RuntimeVersion;
use std::sync::Arc;
use substrate_test_runtime::{Block, Hash, Header};
use substrate_test_runtime::{Block, Hash, Header, H256};

pub struct ChainHeadMockClient<Client> {
client: Arc<Client>,
import_sinks: Mutex<Vec<TracingUnboundedSender<BlockImportNotification<Block>>>>,
finality_sinks: Mutex<Vec<TracingUnboundedSender<FinalityNotification<Block>>>>,
best_block: Mutex<Option<(H256, u64)>>,
}

impl<Client> ChainHeadMockClient<Client> {
Expand All @@ -48,6 +49,7 @@ impl<Client> ChainHeadMockClient<Client> {
client,
import_sinks: Default::default(),
finality_sinks: Default::default(),
best_block: Default::default(),
}
}

Expand Down Expand Up @@ -86,6 +88,11 @@ impl<Client> ChainHeadMockClient<Client> {
let _ = sink.unbounded_send(notification.clone());
}
}

/// Set the best block hash and number that is reported by the `info` method.
pub fn set_best_block(&self, hash: H256, number: u64) {
*self.best_block.lock() = Some((hash, number));
}
}

// ChainHead calls `import_notification_stream` and `finality_notification_stream` in order to
Expand Down Expand Up @@ -309,8 +316,10 @@ impl<Block: BlockT, Client: HeaderMetadata<Block> + Send + Sync> HeaderMetadata<
}
}

impl<Block: BlockT, Client: HeaderBackend<Block> + Send + Sync> HeaderBackend<Block>
impl<Block: BlockT<Hash = H256>, Client: HeaderBackend<Block> + Send + Sync> HeaderBackend<Block>
for ChainHeadMockClient<Client>
where
<<Block as sp_runtime::traits::Block>::Header as HeaderT>::Number: From<u64>,
{
fn header(
&self,
Expand All @@ -320,7 +329,14 @@ impl<Block: BlockT, Client: HeaderBackend<Block> + Send + Sync> HeaderBackend<Bl
}

fn info(&self) -> Info<Block> {
self.client.info()
let mut info = self.client.info();

if let Some((block_hash, block_num)) = self.best_block.lock().take() {
info.best_hash = block_hash;
info.best_number = block_num.into();
}

info
}

fn status(&self, hash: Block::Hash) -> sc_client_api::blockchain::Result<BlockStatus> {
Expand Down
1 change: 1 addition & 0 deletions substrate/client/rpc-spec-v2/src/transaction/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,4 @@ mod middleware_pool;
mod setup;

mod transaction_broadcast_tests;
mod transaction_tests;
39 changes: 37 additions & 2 deletions substrate/client/rpc-spec-v2/src/transaction/tests/setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@
use crate::{
chain_head::test_utils::ChainHeadMockClient,
transaction::{
api::TransactionBroadcastApiServer,
api::{TransactionApiServer, TransactionBroadcastApiServer},
tests::executor::{TaskExecutorBroadcast, TaskExecutorState},
TransactionBroadcast as RpcTransactionBroadcast,
Transaction as RpcTransaction, TransactionBroadcast as RpcTransactionBroadcast,
},
};
use futures::Future;
Expand Down Expand Up @@ -92,6 +92,29 @@ pub fn setup_api(
(api, pool, client_mock, tx_api, executor_recv, pool_state)
}

pub fn setup_api_tx() -> (
Arc<TestApi>,
Arc<MiddlewarePool>,
Arc<ChainHeadMockClient<Client<Backend>>>,
RpcModule<RpcTransaction<MiddlewarePool, ChainHeadMockClient<Client<Backend>>>>,
TaskExecutorState,
MiddlewarePoolRecv,
) {
let (pool, api, _) = maintained_pool(Default::default());
let (pool, pool_state) = MiddlewarePool::new(Arc::new(pool).clone());
let pool = Arc::new(pool);

let builder = TestClientBuilder::new();
let client = Arc::new(builder.build());
let client_mock = Arc::new(ChainHeadMockClient::new(client.clone()));
let (task_executor, executor_recv) = TaskExecutorBroadcast::new();

let tx_api =
RpcTransaction::new(client_mock.clone(), pool.clone(), Arc::new(task_executor)).into_rpc();

(api, pool, client_mock, tx_api, executor_recv, pool_state)
}

/// Get the next event from the provided middleware in at most 5 seconds.
macro_rules! get_next_event {
($middleware:expr) => {
Expand All @@ -102,6 +125,18 @@ macro_rules! get_next_event {
};
}

/// Get the next event from the provided middleware in at most 5 seconds.
macro_rules! get_next_event_sub {
($sub:expr) => {
tokio::time::timeout(std::time::Duration::from_secs(5), $sub.next())
.await
.unwrap()
.unwrap()
.unwrap()
.0
};
}

/// Collect the next number of transaction events from the provided middleware.
macro_rules! get_next_tx_events {
($middleware:expr, $num:expr) => {{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
// This file is part of Substrate.

// Copyright (C) Parity Technologies (UK) Ltd.
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0

// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.

use crate::{
hex_string,
transaction::{TransactionBlock, TransactionEvent},
};
use assert_matches::assert_matches;
use codec::Encode;
use jsonrpsee::rpc_params;
use sc_transaction_pool_api::{ChainEvent, MaintainedTransactionPool};
use sp_core::H256;
use std::sync::Arc;
use substrate_test_runtime_client::AccountKeyring::*;
use substrate_test_runtime_transaction_pool::uxt;

// Test helpers.
use crate::transaction::tests::setup::{setup_api_tx, ALICE_NONCE};

#[tokio::test]
async fn tx_invalid_bytes() {
let (_api, _pool, _client_mock, tx_api, _exec_middleware, _pool_middleware) = setup_api_tx();

// This should not rely on the tx pool state.
let mut sub = tx_api
.subscribe_unbounded("transactionWatch_unstable_submitAndWatch", rpc_params![&"0xdeadbeef"])
.await
.unwrap();

let event: TransactionEvent<H256> = get_next_event_sub!(&mut sub);
assert_matches!(event, TransactionEvent::Invalid(_));
}

#[tokio::test]
async fn tx_in_finalized() {
let (api, pool, client, tx_api, _exec_middleware, _pool_middleware) = setup_api_tx();
let block_1_header = api.push_block(1, vec![], true);
client.set_best_block(block_1_header.hash(), 1);

let uxt = uxt(Alice, ALICE_NONCE);
let xt = hex_string(&uxt.encode());

let mut sub = tx_api
.subscribe_unbounded("transactionWatch_unstable_submitAndWatch", rpc_params![&xt])
.await
.unwrap();

let event: TransactionEvent<H256> = get_next_event_sub!(&mut sub);
assert_eq!(event, TransactionEvent::Validated);

// Import block 2 with the transaction included.
let block_2_header = api.push_block(2, vec![uxt.clone()], true);
let block_2 = block_2_header.hash();

// Announce block 2 to the pool.
let event = ChainEvent::NewBestBlock { hash: block_2, tree_route: None };
pool.inner_pool.maintain(event).await;
let event = ChainEvent::Finalized { hash: block_2, tree_route: Arc::from(vec![]) };
pool.inner_pool.maintain(event).await;

let event: TransactionEvent<H256> = get_next_event_sub!(&mut sub);
assert_eq!(
event,
TransactionEvent::BestChainBlockIncluded(Some(TransactionBlock {
hash: block_2,
index: 0
}))
);
let event: TransactionEvent<H256> = get_next_event_sub!(&mut sub);
assert_eq!(event, TransactionEvent::Finalized(TransactionBlock { hash: block_2, index: 0 }));
}

#[tokio::test]
async fn tx_with_pruned_best_block() {
let (api, pool, client, tx_api, _exec_middleware, _pool_middleware) = setup_api_tx();
let block_1_header = api.push_block(1, vec![], true);
client.set_best_block(block_1_header.hash(), 1);

let uxt = uxt(Alice, ALICE_NONCE);
let xt = hex_string(&uxt.encode());

let mut sub = tx_api
.subscribe_unbounded("transactionWatch_unstable_submitAndWatch", rpc_params![&xt])
.await
.unwrap();

let event: TransactionEvent<H256> = get_next_event_sub!(&mut sub);
assert_eq!(event, TransactionEvent::Validated);

// Import block 2 with the transaction included.
let block_2_header = api.push_block(2, vec![uxt.clone()], true);
let block_2 = block_2_header.hash();
let event = ChainEvent::NewBestBlock { hash: block_2, tree_route: None };
pool.inner_pool.maintain(event).await;

let event: TransactionEvent<H256> = get_next_event_sub!(&mut sub);
assert_eq!(
event,
TransactionEvent::BestChainBlockIncluded(Some(TransactionBlock {
hash: block_2,
index: 0
}))
);

// Import block 2 again without the transaction included.
let block_2_header = api.push_block(2, vec![], true);
let block_2 = block_2_header.hash();
let event = ChainEvent::NewBestBlock { hash: block_2, tree_route: None };
pool.inner_pool.maintain(event).await;

let event: TransactionEvent<H256> = get_next_event_sub!(&mut sub);
assert_eq!(event, TransactionEvent::BestChainBlockIncluded(None));

let block_2_header = api.push_block(2, vec![uxt.clone()], true);
let block_2 = block_2_header.hash();
let event = ChainEvent::NewBestBlock { hash: block_2, tree_route: None };
pool.inner_pool.maintain(event).await;

// The tx is validated again against the new block.
let event: TransactionEvent<H256> = get_next_event_sub!(&mut sub);
assert_eq!(event, TransactionEvent::Validated);

let event: TransactionEvent<H256> = get_next_event_sub!(&mut sub);
assert_eq!(
event,
TransactionEvent::BestChainBlockIncluded(Some(TransactionBlock {
hash: block_2,
index: 0
}))
);

let event = ChainEvent::Finalized { hash: block_2, tree_route: Arc::from(vec![]) };
pool.inner_pool.maintain(event).await;
let event: TransactionEvent<H256> = get_next_event_sub!(&mut sub);
assert_eq!(event, TransactionEvent::Finalized(TransactionBlock { hash: block_2, index: 0 }));
}
33 changes: 16 additions & 17 deletions substrate/client/rpc-spec-v2/src/transaction/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ use crate::{
};
use codec::Decode;
use futures::{StreamExt, TryFutureExt};
use jsonrpsee::{core::async_trait, types::error::ErrorObject, PendingSubscriptionSink};
use sc_rpc::utils::pipe_from_stream;
use jsonrpsee::{core::async_trait, PendingSubscriptionSink};
use sc_rpc::utils::{pipe_from_stream, to_sub_message};
use sc_transaction_pool_api::{
error::IntoPoolError, BlockHash, TransactionFor, TransactionPool, TransactionSource,
TransactionStatus,
Expand All @@ -39,6 +39,8 @@ use sp_core::Bytes;
use sp_runtime::traits::Block as BlockT;
use std::sync::Arc;

pub(crate) const LOG_TARGET: &str = "rpc-spec-v2";

/// An API for transaction RPC calls.
pub struct Transaction<Pool, Client> {
/// Substrate client.
Expand All @@ -63,13 +65,6 @@ impl<Pool, Client> Transaction<Pool, Client> {
/// some unique transactions via RPC and have them included in the pool.
const TX_SOURCE: TransactionSource = TransactionSource::External;

/// Extrinsic has an invalid format.
///
/// # Note
///
/// This is similar to the old `author` API error code.
const BAD_FORMAT: i32 = 1001;

#[async_trait]
impl<Pool, Client> TransactionApiServer<BlockHash<Pool>> for Transaction<Pool, Client>
where
Expand All @@ -83,17 +78,21 @@ where
let pool = self.pool.clone();

let fut = async move {
// This is the only place where the RPC server can return an error for this
// subscription. Other defects must be signaled as events to the sink.
let decoded_extrinsic = match TransactionFor::<Pool>::decode(&mut &xt[..]) {
Ok(decoded_extrinsic) => decoded_extrinsic,
Err(e) => {
let err = ErrorObject::owned(
BAD_FORMAT,
format!("Extrinsic has invalid format: {}", e),
None::<()>,
log::debug!(target: LOG_TARGET, "Extrinsic bytes cannot be decoded: {:?}", e);

let Ok(sink) = pending.accept().await else { return };

// The transaction is invalid.
let msg = to_sub_message(
&sink,
&TransactionEvent::Invalid::<BlockHash<Pool>>(TransactionError {
error: "Extrinsic bytes cannot be decoded".into(),
}),
);
let _ = pending.reject(err).await;
let _ = sink.send(msg).await;
return
},
};
Expand Down Expand Up @@ -147,7 +146,7 @@ pub fn handle_event<Hash: Clone, BlockHash: Clone>(
TransactionStatus::Usurped(_) => Some(TransactionEvent::Invalid(TransactionError {
error: "Extrinsic was rendered invalid by another extrinsic".into(),
})),
TransactionStatus::Dropped => Some(TransactionEvent::Invalid(TransactionError {
TransactionStatus::Dropped => Some(TransactionEvent::Dropped(TransactionDropped {
error: "Extrinsic dropped from the pool due to exceeding limits".into(),
})),
TransactionStatus::Invalid => Some(TransactionEvent::Invalid(TransactionError {
Expand Down

0 comments on commit 598e955

Please sign in to comment.