Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(congestion): reject new transactions on RPC level #11419

Merged
merged 5 commits into from
Jun 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions chain/chain/src/runtime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -643,9 +643,25 @@ impl RuntimeAdapter for NightshadeRuntime {
verify_signature: bool,
epoch_id: &EpochId,
current_protocol_version: ProtocolVersion,
receiver_congestion_info: Option<ExtendedCongestionInfo>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about the congestion info of the sender? If the sender is congested their tx limit will be lowered and some transactions may also be rejected because of that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we reject them. We just lower the gas limit. When we reach the gas limit, we keep following transactions in the pool rather than rejecting (dropping) them. Eventually, new transactions are dropped but that's because the pool is full, not due to something we added in nep-539.

We could still try to be smart about it, like reject something on the RPC if we think the pool is filling up with transactions all using the same sender shard. But it seems a bit tricky to get the heuristics right. If we are too aggressive, we risk frustrating users by rejecting their transactions for no good reason. Not to mention that "rogue RPCs" could abuse the system by filling the transaction pool beyond what our RPCs do, which would starve the honest RPCs completely under congestion.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good, we can observe what happens in the wild and make adjustments if we find it necessary.

) -> Result<Option<InvalidTxError>, Error> {
let runtime_config = self.runtime_config_store.get_config(current_protocol_version);

if let Some(congestion_info) = receiver_congestion_info {
let congestion_control = CongestionControl::new(
runtime_config.congestion_control_config,
congestion_info.congestion_info,
congestion_info.missed_chunks_count,
);
if !congestion_control.shard_accepts_transactions() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You may consider making that more flexible by making the threshold a configurable value rather than using the runtime parameter of 0.25. The drawback is that those would need to be kept somewhat in sync. I also dislike that some rpc node may artificially lower it in order to get their users priority but this is something they can do anyway.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. We can consider adding the flexibility.

But just to be clear, there should not much to be gained by setting a higher threshold. Unless the congestion level lowers between now and when the chunk producer considers it, it will be rejected and dropped anyway. Most of the time, the RPC will just end up waiting for a timeout before it inevitably fails, annoying the user and wasting resources on the RPC in the process.
At least that is my theory, I guess it could be different in practice. Maybe the congestion level keeps going below and above the threshold, which could potentially give an advantage to more aggressive forwarding on the RPC node compared to the retrying on the wallet/client side.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For some users the UX doesn't matter as much as just maximizing the chances of getting their transaction in. For those it may be useful. It may also be useful for us if we discover that we did the math wrong and need to change it quickly :)

let receiver_shard =
self.account_id_to_shard_uid(transaction.transaction.receiver_id(), epoch_id)?;
return Ok(Some(InvalidTxError::ShardCongested {
shard_id: receiver_shard.shard_id,
}));
}
}

if let Some(state_root) = state_root {
let shard_uid =
self.account_id_to_shard_uid(transaction.transaction.signer_id(), epoch_id)?;
Expand Down
3 changes: 2 additions & 1 deletion chain/chain/src/test_utils/kv_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use near_primitives::account::{AccessKey, Account};
use near_primitives::apply::ApplyChunkReason;
use near_primitives::block::Tip;
use near_primitives::block_header::{Approval, ApprovalInner};
use near_primitives::congestion_info::CongestionInfo;
use near_primitives::congestion_info::{CongestionInfo, ExtendedCongestionInfo};
use near_primitives::epoch_manager::block_info::BlockInfo;
use near_primitives::epoch_manager::epoch_info::EpochInfo;
use near_primitives::epoch_manager::EpochConfig;
Expand Down Expand Up @@ -1089,6 +1089,7 @@ impl RuntimeAdapter for KeyValueRuntime {
_verify_signature: bool,
_epoch_id: &EpochId,
_current_protocol_version: ProtocolVersion,
_receiver_congestion_info: Option<ExtendedCongestionInfo>,
) -> Result<Option<InvalidTxError>, Error> {
Ok(None)
}
Expand Down
1 change: 1 addition & 0 deletions chain/chain/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,7 @@ pub trait RuntimeAdapter: Send + Sync {
verify_signature: bool,
epoch_id: &EpochId,
current_protocol_version: ProtocolVersion,
receiver_congestion_info: Option<ExtendedCongestionInfo>,
) -> Result<Option<InvalidTxError>, Error>;

/// Returns an ordered list of valid transactions from the pool up the given limits.
Expand Down
28 changes: 24 additions & 4 deletions chain/client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2270,7 +2270,8 @@ impl Client {
) -> Result<ProcessTxResponse, Error> {
let head = self.chain.head()?;
let me = self.validator_signer.as_ref().map(|vs| vs.validator_id());
let cur_block_header = self.chain.head_header()?;
let cur_block = self.chain.get_head_block()?;
let cur_block_header = cur_block.header();
let transaction_validity_period = self.chain.transaction_validity_period;
// here it is fine to use `cur_block_header` as it is a best effort estimate. If the transaction
// were to be included, the block that the chunk points to will have height >= height of
Expand All @@ -2285,12 +2286,23 @@ impl Client {
}
let gas_price = cur_block_header.next_gas_price();
let epoch_id = self.epoch_manager.get_epoch_id_from_prev_block(&head.last_block_hash)?;

let receiver_shard =
self.epoch_manager.account_id_to_shard_id(tx.transaction.receiver_id(), &epoch_id)?;
let receiver_congestion_info =
cur_block.shards_congestion_info().get(&receiver_shard).copied();
let protocol_version = self.epoch_manager.get_epoch_protocol_version(&epoch_id)?;

if let Some(err) = self
.runtime_adapter
.validate_tx(gas_price, None, tx, true, &epoch_id, protocol_version)
.validate_tx(
gas_price,
None,
tx,
true,
&epoch_id,
protocol_version,
receiver_congestion_info,
)
.expect("no storage errors")
{
debug!(target: "client", tx_hash = ?tx.get_hash(), ?err, "Invalid tx during basic validation");
Expand Down Expand Up @@ -2322,7 +2334,15 @@ impl Client {
};
if let Some(err) = self
.runtime_adapter
.validate_tx(gas_price, Some(state_root), tx, false, &epoch_id, protocol_version)
.validate_tx(
gas_price,
Some(state_root),
tx,
false,
&epoch_id,
protocol_version,
receiver_congestion_info,
)
.expect("no storage errors")
{
debug!(target: "client", ?err, "Invalid tx");
Expand Down
10 changes: 9 additions & 1 deletion chain/jsonrpc/res/rpc_errors_schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -570,7 +570,8 @@
"ActionsValidation",
"TransactionSizeExceeded",
"InvalidTransactionVersion",
"StorageError"
"StorageError",
"ShardCongested"
],
"props": {}
},
Expand Down Expand Up @@ -773,6 +774,13 @@
"subtypes": [],
"props": {}
},
"ShardCongested": {
"name": "ShardCongested",
"subtypes": [],
"props": {
"shard_id": ""
}
},
"SignerDoesNotExist": {
"name": "SignerDoesNotExist",
"subtypes": [],
Expand Down
8 changes: 8 additions & 0 deletions core/primitives/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,11 @@ pub enum InvalidTxError {
InvalidTransactionVersion,
// Error occurred during storage access
StorageError(StorageError),
/// The receiver shard of the transaction is too congested to accept new
/// transactions at the moment.
ShardCongested {
shard_id: u32,
},
}

impl From<StorageError> for InvalidTxError {
Expand Down Expand Up @@ -620,6 +625,9 @@ impl Display for InvalidTxError {
InvalidTxError::StorageError(error) => {
write!(f, "Storage error: {}", error)
}
InvalidTxError::ShardCongested { shard_id } => {
write!(f, "Shard {shard_id} is currently congested and rejects new transactions.")
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use assert_matches::assert_matches;
use near_chain_configs::Genesis;
use near_client::test_utils::TestEnv;
use near_client::ProcessTxResponse;
Expand All @@ -6,7 +7,9 @@ use near_o11y::testonly::init_test_logger;
use near_parameters::{RuntimeConfig, RuntimeConfigStore};
use near_primitives::account::id::AccountId;
use near_primitives::congestion_info::{CongestionControl, CongestionInfo};
use near_primitives::errors::{ActionErrorKind, FunctionCallError, TxExecutionError};
use near_primitives::errors::{
ActionErrorKind, FunctionCallError, InvalidTxError, TxExecutionError,
};
use near_primitives::hash::CryptoHash;
use near_primitives::shard_layout::ShardLayout;
use near_primitives::sharding::{ShardChunk, ShardChunkHeader};
Expand Down Expand Up @@ -292,7 +295,11 @@ fn submit_n_100tgas_fns(env: &mut TestEnv, n: u32, nonce: &mut u64, signer: &InM
let fn_tx = new_fn_call_100tgas(nonce, signer, *block.hash());
// this only adds the tx to the pool, no chain progress is made
let response = env.clients[0].process_tx(fn_tx, false, false);
assert_eq!(response, ProcessTxResponse::ValidTx);
match response {
ProcessTxResponse::ValidTx
| ProcessTxResponse::InvalidTx(InvalidTxError::ShardCongested { .. }) => (),
other => panic!("unexpected result from submitting tx: {other:?}"),
}
}
}

Expand Down Expand Up @@ -565,3 +572,53 @@ fn measure_tx_limit(
local_tx_included_with_congestion,
)
}

/// Test that RPC clients stop accepting transactions when the receiver is
/// congested.
#[test]
fn test_rpc_client_rejection() {
let sender_id: AccountId = "test0".parse().unwrap();
let mut env = setup_runtime(sender_id.clone(), PROTOCOL_VERSION);

// prepare a contract to call
setup_contract(&mut env);

let signer = InMemorySigner::from_seed(sender_id.clone(), KeyType::ED25519, sender_id.as_str());
let mut nonce = 10;

// Check we can send transactions at the start.
let fn_tx = new_fn_call_100tgas(
&mut nonce,
&signer,
*env.clients[0].chain.head_header().unwrap().hash(),
);
let response = env.clients[0].process_tx(fn_tx, false, false);
assert_eq!(response, ProcessTxResponse::ValidTx);

// Congest the network with a burst of 100 PGas.
submit_n_100tgas_fns(&mut env, 1_000, &mut nonce, &signer);

// Allow transactions to enter the chain and enough receipts to arrive at
// the receiver shard for it to become congested.
let tip = env.clients[0].chain.head().unwrap();
for i in 1..10 {
env.produce_block(0, tip.height + i);
}

// Check that congestion control rejects new transactions.
let fn_tx = new_fn_call_100tgas(
&mut nonce,
&signer,
*env.clients[0].chain.head_header().unwrap().hash(),
);
let response = env.clients[0].process_tx(fn_tx, false, false);

if ProtocolFeature::CongestionControl.enabled(PROTOCOL_VERSION) {
assert_matches!(
response,
ProcessTxResponse::InvalidTx(InvalidTxError::ShardCongested { .. })
);
} else {
assert_eq!(response, ProcessTxResponse::ValidTx);
}
}
2 changes: 1 addition & 1 deletion integration-tests/src/tests/client/resharding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ impl TestReshardingEnv {
.validator_seats(num_validators)
.real_stores()
.epoch_managers_with_test_overrides(epoch_config_test_overrides)
.nightshade_runtimes(&genesis)
.nightshade_runtimes_congestion_control_disabled(&genesis)
.track_all_shards()
.build();
assert_eq!(env.validators.len(), num_validators);
Expand Down
2 changes: 1 addition & 1 deletion integration-tests/src/tests/client/sync_state_nodes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -570,7 +570,7 @@ fn test_dump_epoch_missing_chunk_in_last_block() {
.clients_count(2)
.use_state_snapshots()
.real_stores()
.nightshade_runtimes(&genesis)
.nightshade_runtimes_congestion_control_disabled(&genesis)
.build();

let genesis_block = env.clients[0].chain.get_block_by_height(0).unwrap();
Expand Down
Loading