Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix/gateway message processing #1991

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
Open
2 changes: 1 addition & 1 deletion libs/traits/src/liquidity_pools.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ pub trait MessageProcessor {
/// Process a message.
fn process(msg: Self::Message) -> (DispatchResult, Weight);

/// Max weight that processing a message can take
/// Max weight that processing a message can take.
fn max_processing_weight(msg: &Self::Message) -> Weight;
}

Expand Down
37 changes: 14 additions & 23 deletions pallets/liquidity-pools-gateway-queue/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,9 +202,15 @@ pub mod pallet {
fn service_message_queue(max_weight: Weight) -> Weight {
let mut weight_used = Weight::zero();

let mut processed_entries = Vec::new();
let mut nonces = MessageQueue::<T>::iter_keys().collect::<Vec<_>>();
nonces.sort();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we are collecting the keys here, it also made sense to me to sort them, just in case. Please let me know if there are any objections to this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although this new solution is more simple, I think there is a problem here:

MessageQueue::<T>::iter_keys().collect::<Vec<_>>();

It can collect many keys, making the block impossible to build.

I think we need a complex structure here that allow us to store them already organized

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It can collect many keys, making the block impossible to build.

Can you elaborate on this please?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about limiting the number of keys that we collect via:

let mut nonces = MessageQueue::<T>::iter_keys().take(MAX_MESSAGES_PER_BLOCK).collect::<Vec<_>>();

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you elaborate on this please?

When you collect, the iterator will make one read per item, and could be a number of items that overpass the limit for the block weight capacity.

The take(MAX_MESSAGES_PER_BLOCK) still does not work because could be left a message in the queue that ideally should be processed first.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if exists some order structure available in substrate for this. If not, we should create some complex/annoying structure to organize the way the messages are stored.

But I'm not able to see a super simple way TBH. We can leave that fix for another PR to unlock this if we see it's not easy

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe there's a simpler solution that involves using the latest message nonce. I'll try something on a different branch.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Something similar to - #1992


for nonce in nonces {
let message =
MessageQueue::<T>::get(nonce).expect("valid nonce ensured by `iter_keys`");

weight_used.saturating_accrue(T::DbWeight::get().reads(1));

for (nonce, message) in MessageQueue::<T>::iter() {
let remaining_weight = max_weight.saturating_sub(weight_used);
let next_weight = T::MessageProcessor::max_processing_weight(&message);

Expand All @@ -214,36 +220,21 @@ pub mod pallet {
}

let weight = match Self::process_message_and_deposit_event(nonce, message.clone()) {
(Ok(()), weight) => {
// Extra weight breakdown:
//
// 1 read for the message
// 1 write for the message removal
weight.saturating_add(T::DbWeight::get().reads_writes(1, 1))
}
(Ok(()), weight) => weight,
(Err(e), weight) => {
FailedMessageQueue::<T>::insert(nonce, (message, e));

// Extra weight breakdown:
//
// 1 read for the message
// 1 write for the failed message
// 1 write for the message removal
weight.saturating_add(T::DbWeight::get().reads_writes(1, 2))
weight.saturating_add(T::DbWeight::get().writes(1))
}
};

processed_entries.push(nonce);

weight_used = weight_used.saturating_add(weight);
weight_used.saturating_accrue(weight);

if weight_used.all_gte(max_weight) {
cdamian marked this conversation as resolved.
Show resolved Hide resolved
break;
}
}
MessageQueue::<T>::remove(nonce);

for entry in processed_entries {
MessageQueue::<T>::remove(entry);
// 1 write for removing the message
weight_used.saturating_accrue(T::DbWeight::get().writes(1));
}

weight_used
Expand Down
61 changes: 26 additions & 35 deletions pallets/liquidity-pools-gateway/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,17 @@ use cfg_traits::liquidity_pools::{
OutboundMessageHandler, RouterProvider,
};
use cfg_types::domain_address::{Domain, DomainAddress};
use frame_support::{dispatch::DispatchResult, pallet_prelude::*};
use frame_support::{
dispatch::DispatchResult,
pallet_prelude::*,
storage::{with_transaction, TransactionOutcome},
};
use frame_system::pallet_prelude::{ensure_signed, OriginFor};
use message::GatewayMessage;
use orml_traits::GetByKey;
pub use pallet::*;
use parity_scale_codec::FullCodec;
use sp_arithmetic::traits::{BaseArithmetic, EnsureAddAssign, One};
use sp_runtime::SaturatedConversion;
use sp_std::convert::TryInto;

use crate::{
Expand Down Expand Up @@ -469,8 +472,8 @@ pub mod pallet {
router_ids.iter().any(|x| x == &router_id),
Error::<T>::UnknownRouter
);
// Message recovery shouldn't be supported for setups that have less than 1
// router since no proofs are required in that case.
// Message recovery shouldn't be supported for setups that have less than 2
// routers since no proofs are required in that case.
ensure!(router_ids.len() > 1, Error::<T>::NotEnoughRoutersForDomain);

let session_id = SessionIdStore::<T>::get();
Expand Down Expand Up @@ -621,45 +624,33 @@ pub mod pallet {
type Message = GatewayMessage<T::Message, T::RouterId>;

fn process(msg: Self::Message) -> (DispatchResult, Weight) {
match msg {
GatewayMessage::Inbound {
domain_address,
message,
router_id,
} => {
let mut counter = 0;

let res = Self::process_inbound_message(
// The #[transactional] macro only works for functions that return a
// `DispatchResult` therefore, we need to manually add this here.
let res = with_transaction(|| {
let res = match msg {
GatewayMessage::Inbound {
domain_address,
message,
router_id,
&mut counter,
);

let weight = match counter {
0 => LP_DEFENSIVE_WEIGHT / 10,
n => LP_DEFENSIVE_WEIGHT.saturating_mul(n),
};
} => Self::process_inbound_message(domain_address, message, router_id),
GatewayMessage::Outbound { message, router_id } => {
T::MessageSender::send(router_id, T::Sender::get(), message)
}
};

(res, weight)
if res.is_ok() {
TransactionOutcome::Commit(res)
} else {
TransactionOutcome::Rollback(res)
}
GatewayMessage::Outbound { message, router_id } => {
let res = T::MessageSender::send(router_id, T::Sender::get(), message);
});

(res, LP_DEFENSIVE_WEIGHT)
}
}
(res, LP_DEFENSIVE_WEIGHT)
}

/// Returns the max processing weight for a message, based on its
/// direction.
fn max_processing_weight(msg: &Self::Message) -> Weight {
match msg {
GatewayMessage::Inbound { message, .. } => {
LP_DEFENSIVE_WEIGHT.saturating_mul(message.submessages().len().saturated_into())
}
GatewayMessage::Outbound { .. } => LP_DEFENSIVE_WEIGHT,
}
/// Returns the maximum weight for processing one message.
fn max_processing_weight(_: &Self::Message) -> Weight {
LP_DEFENSIVE_WEIGHT
}
}

Expand Down
63 changes: 30 additions & 33 deletions pallets/liquidity-pools-gateway/src/message_processing.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use cfg_traits::liquidity_pools::{
InboundMessageHandler, LpMessageBatch, LpMessageHash, LpMessageProof, MessageHash,
MessageQueue, RouterProvider,
InboundMessageHandler, LpMessageHash, LpMessageProof, MessageHash, MessageQueue, RouterProvider,
};
use cfg_types::domain_address::{Domain, DomainAddress};
use frame_support::{
Expand Down Expand Up @@ -333,7 +332,11 @@ impl<T: Config> Pallet<T> {
// we can return.
None => return Ok(()),
Some(stored_inbound_entry) => match stored_inbound_entry {
InboundEntry::Message(message_entry) => message = Some(message_entry.message),
InboundEntry::Message(message_entry)
if message_entry.session_id == session_id =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question: If session_id is different, should we remove the entry?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question: If session_id is different, should we remove the entry?

We should not because then it's impossible to replay the message and funds are stuck on the EVM side. Keeping entries for an old session id can be made unstuck via execute_message_recovery.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

execute_message_recovery will only increase the proof count for a specific router ID, we will still hit this logic and be unable to execute a message from an older session. Maybe we should extend execute_message_recovery and either:

  • set the session of a message entry to the current one;
  • increase the proof count - current behavior;

{
message = Some(message_entry.message)
}
InboundEntry::Proof(proof_entry)
if proof_entry.has_valid_vote_for_session(session_id) =>
{
Expand All @@ -349,10 +352,10 @@ impl<T: Config> Pallet<T> {
}

if let Some(msg) = message {
Self::execute_post_voting_dispatch(message_hash, router_ids, expected_proof_count)?;

T::InboundMessageHandler::handle(domain_address.clone(), msg)?;

Self::execute_post_voting_dispatch(message_hash, router_ids, expected_proof_count)?;

Self::deposit_event(Event::<T>::InboundMessageExecuted {
domain_address,
message_hash,
Expand Down Expand Up @@ -401,42 +404,36 @@ impl<T: Config> Pallet<T> {
domain_address: DomainAddress,
message: T::Message,
router_id: T::RouterId,
counter: &mut u64,
) -> DispatchResult {
let router_ids = Self::get_router_ids_for_domain(domain_address.domain())?;
let session_id = SessionIdStore::<T>::get();
let expected_proof_count = Self::get_expected_proof_count(&router_ids)?;
let message_hash = message.get_message_hash();
let inbound_entry: InboundEntry<T> = InboundEntry::create(
message.clone(),
session_id,
domain_address.clone(),
expected_proof_count,
);

for submessage in message.submessages() {
counter.ensure_add_assign(1)?;
inbound_entry.validate(&router_ids, &router_id.clone())?;

let message_hash = submessage.get_message_hash();
Self::upsert_pending_entry(message_hash, &router_id, inbound_entry)?;

let inbound_entry: InboundEntry<T> = InboundEntry::create(
submessage.clone(),
session_id,
domain_address.clone(),
expected_proof_count,
);

inbound_entry.validate(&router_ids, &router_id.clone())?;
Self::upsert_pending_entry(message_hash, &router_id, inbound_entry)?;

Self::deposit_processing_event(
domain_address.clone(),
submessage,
message_hash,
router_id.clone(),
);
Self::deposit_processing_event(
domain_address.clone(),
message,
message_hash,
router_id.clone(),
);

Self::execute_if_requirements_are_met(
message_hash,
&router_ids,
session_id,
expected_proof_count,
domain_address.clone(),
)?;
}
Self::execute_if_requirements_are_met(
message_hash,
&router_ids,
session_id,
expected_proof_count,
domain_address.clone(),
)?;

Ok(())
}
Expand Down
Loading
Loading