Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Runtime] Bound XCMP queue #3952

Merged
merged 29 commits into from
May 16, 2024
Merged
Show file tree
Hide file tree
Changes from 28 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
0f6c0e7
[Runtime] Bound XCMP queue (#2302)
ggwpez Jan 29, 2024
8729ffc
Fixup
ggwpez Apr 2, 2024
186d245
Rename
ggwpez Apr 2, 2024
63bb333
Merge remote-tracking branch 'origin/master' into oty-reapply-2302
ggwpez Apr 2, 2024
3332090
Use version unchecked migration
ggwpez Apr 2, 2024
b267b2b
Add config to template
ggwpez Apr 2, 2024
49d70ba
Clippy import
ggwpez Apr 2, 2024
b76c840
Use WeakBoundedVec
ggwpez Apr 3, 2024
7433d3e
Bump page size to 103 KiB
ggwpez Apr 3, 2024
f088bfa
Add prdoc
ggwpez Apr 3, 2024
f0034ee
Not needed for glutton
ggwpez Apr 3, 2024
bae2564
Use panic in post_upgrade to see error
ggwpez Apr 4, 2024
2cdf1ad
Clippy
ggwpez Apr 4, 2024
6985828
Merge remote-tracking branch 'origin/master' into oty-reapply-2302
ggwpez Apr 10, 2024
bed5164
Delete file from bad merge
ggwpez Apr 10, 2024
901ec1d
Rename prdoc
ggwpez Apr 10, 2024
7fe2c67
prdoc
ggwpez Apr 10, 2024
c326770
Fixup
ggwpez Apr 10, 2024
9a82b09
Dont increase the wrong config 🤦
ggwpez Apr 10, 2024
723eb54
Increase HRMP page size
ggwpez Apr 10, 2024
2e8566b
Undo needless renaming
ggwpez Apr 10, 2024
757ee3c
Extend prdoc
ggwpez Apr 10, 2024
26a2c10
Handle errors
ggwpez Apr 11, 2024
9aad0fd
Rename error
ggwpez Apr 15, 2024
176b4fa
Merge remote-tracking branch 'origin/master' into oty-reapply-2302
ggwpez Apr 15, 2024
2999453
Merge remote-tracking branch 'origin/master' into oty-reapply-2302
ggwpez Apr 22, 2024
c73c3f9
Cleanup
ggwpez Apr 22, 2024
22d7bc0
Bump MQ page size to 103 on all SPs
ggwpez Apr 22, 2024
b179452
Merge remote-tracking branch 'origin/master' into oty-reapply-2302
ggwpez May 16, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion cumulus/pallets/parachain-system/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
use codec::{Decode, Encode};
use cumulus_primitives_core::{
relay_chain, AbridgedHostConfiguration, ChannelInfo, ChannelStatus, CollationInfo,
GetChannelInfo, InboundDownwardMessage, InboundHrmpMessage, MessageSendError,
GetChannelInfo, InboundDownwardMessage, InboundHrmpMessage, ListChannelInfos, MessageSendError,
OutboundHrmpMessage, ParaId, PersistedValidationData, UpwardMessage, UpwardMessageSender,
XcmpMessageHandler, XcmpMessageSource,
};
Expand Down Expand Up @@ -1021,6 +1021,13 @@ impl<T: Config> FeeTracker for Pallet<T> {
}
}

impl<T: Config> ListChannelInfos for Pallet<T> {
fn outgoing_channels() -> Vec<ParaId> {
let Some(state) = RelevantMessagingState::<T>::get() else { return Vec::new() };
state.egress_channels.into_iter().map(|(id, _)| id).collect()
}
}

impl<T: Config> GetChannelInfo for Pallet<T> {
fn get_channel_status(id: ParaId) -> ChannelStatus {
// Note, that we are using `relevant_messaging_state` which may be from the previous
Expand Down
2 changes: 1 addition & 1 deletion cumulus/pallets/parachain-system/src/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ impl pallet_message_queue::Config for Test {
type Size = u32;
type QueueChangeHandler = ();
type QueuePausedQuery = ();
type HeapSize = sp_core::ConstU32<{ 64 * 1024 }>;
type HeapSize = sp_core::ConstU32<{ 103 * 1024 }>;
type MaxStale = sp_core::ConstU32<8>;
type ServiceWeight = MaxWeight;
type IdleMaxServiceWeight = ();
Expand Down
120 changes: 88 additions & 32 deletions cumulus/pallets/xcmp-queue/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,15 +51,15 @@ pub mod weights;
pub use weights::WeightInfo;

use bounded_collections::BoundedBTreeSet;
use codec::{Decode, DecodeLimit, Encode};
use codec::{Decode, DecodeLimit, Encode, MaxEncodedLen};
use cumulus_primitives_core::{
relay_chain::BlockNumber as RelayBlockNumber, ChannelStatus, GetChannelInfo, MessageSendError,
ParaId, XcmpMessageFormat, XcmpMessageHandler, XcmpMessageSource,
};

use frame_support::{
defensive, defensive_assert,
traits::{EnqueueMessage, EnsureOrigin, Get, QueueFootprint, QueuePausedQuery},
traits::{Defensive, EnqueueMessage, EnsureOrigin, Get, QueueFootprint, QueuePausedQuery},
weights::{Weight, WeightMeter},
BoundedVec,
};
Expand All @@ -68,7 +68,7 @@ use polkadot_runtime_common::xcm_sender::PriceForMessageDelivery;
use polkadot_runtime_parachains::FeeTracker;
use scale_info::TypeInfo;
use sp_core::MAX_POSSIBLE_ALLOCATION;
use sp_runtime::{FixedU128, RuntimeDebug, Saturating};
use sp_runtime::{FixedU128, RuntimeDebug, Saturating, WeakBoundedVec};
use sp_std::prelude::*;
use xcm::{latest::prelude::*, VersionedXcm, WrapVersion, MAX_XCM_DECODE_DEPTH};
use xcm_executor::traits::ConvertOrigin;
Expand Down Expand Up @@ -105,7 +105,6 @@ pub mod pallet {

#[pallet::pallet]
#[pallet::storage_version(migration::STORAGE_VERSION)]
#[pallet::without_storage_info]
pub struct Pallet<T>(_);

#[pallet::config]
Expand All @@ -132,6 +131,25 @@ pub mod pallet {
#[pallet::constant]
type MaxInboundSuspended: Get<u32>;

/// Maximal number of outbound XCMP channels that can have messages queued at the same time.
///
/// If this is reached, then no further messages can be sent to channels that do not yet
/// have a message queued. This should be set to the expected maximum of outbound channels
/// which is determined by [`Self::ChannelInfo`]. It is important to set this large enough,
/// since otherwise the congestion control protocol will not work as intended and messages
/// may be dropped. This value increases the PoV and should therefore not be picked too
/// high. Governance needs to pay attention to not open more channels than this value.
#[pallet::constant]
type MaxActiveOutboundChannels: Get<u32>;

/// The maximal page size for HRMP message pages.
///
/// A lower limit can be set dynamically, but this is the hard-limit for the PoV worst case
/// benchmarking. The limit for the size of a message is slightly below this, since some
/// overhead is incurred for encoding the format.
#[pallet::constant]
type MaxPageSize: Get<u32>;

/// The origin that is allowed to resume or suspend the XCMP queue.
type ControllerOrigin: EnsureOrigin<Self::RuntimeOrigin>;

Expand Down Expand Up @@ -276,6 +294,10 @@ pub mod pallet {
AlreadySuspended,
/// The execution is already resumed.
AlreadyResumed,
/// There are too many active outbound channels.
TooManyActiveOutboundChannels,
/// The message is too big.
TooBig,
}

/// The suspended inbound XCMP channels. All others are not suspended.
Expand All @@ -297,19 +319,28 @@ pub mod pallet {
/// case of the need to send a high-priority signal message this block.
/// The bool is true if there is a signal message waiting to be sent.
#[pallet::storage]
pub(super) type OutboundXcmpStatus<T: Config> =
StorageValue<_, Vec<OutboundChannelDetails>, ValueQuery>;
pub(super) type OutboundXcmpStatus<T: Config> = StorageValue<
_,
BoundedVec<OutboundChannelDetails, T::MaxActiveOutboundChannels>,
ggwpez marked this conversation as resolved.
Show resolved Hide resolved
ValueQuery,
>;

// The new way of doing it:
/// The messages outbound in a given XCMP channel.
#[pallet::storage]
pub(super) type OutboundXcmpMessages<T: Config> =
StorageDoubleMap<_, Blake2_128Concat, ParaId, Twox64Concat, u16, Vec<u8>, ValueQuery>;
pub(super) type OutboundXcmpMessages<T: Config> = StorageDoubleMap<
_,
Blake2_128Concat,
ParaId,
Twox64Concat,
u16,
WeakBoundedVec<u8, T::MaxPageSize>,
ggwpez marked this conversation as resolved.
Show resolved Hide resolved
ValueQuery,
>;

/// Any signal messages waiting to be sent.
#[pallet::storage]
pub(super) type SignalMessages<T: Config> =
StorageMap<_, Blake2_128Concat, ParaId, Vec<u8>, ValueQuery>;
StorageMap<_, Blake2_128Concat, ParaId, WeakBoundedVec<u8, T::MaxPageSize>, ValueQuery>;
ggwpez marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if MaxPageSize affects the weights a lot. From what I can read below, only one signal for every para id. The size can be constant, if it really make any difference.
I assume we could even use a Signal type here, but that would require a migration.

Copy link
Member Author

@ggwpez ggwpez May 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea true. In the past, there was the possibility for multiple signals to be enqueued, but i removed that, since later signals completely overwrite any previous ones.
Now indeed it is wasting weight here... it could probably be changed to something very small, like 16 without an issue. But it could also require a migration, so with the Weak vector and large size we at least are at no risk.


/// The configuration which controls the dynamics of the outbound queue.
#[pallet::storage]
Expand All @@ -331,15 +362,14 @@ pub mod pallet {
StorageMap<_, Twox64Concat, ParaId, FixedU128, ValueQuery, InitialFactor>;
}

#[derive(Copy, Clone, Eq, PartialEq, Encode, Decode, RuntimeDebug, TypeInfo)]
#[derive(Copy, Clone, Eq, PartialEq, Encode, Decode, RuntimeDebug, TypeInfo, MaxEncodedLen)]
pub enum OutboundState {
Ok,
Suspended,
}

/// Struct containing detailed information about the outbound channel.
#[derive(Clone, Eq, PartialEq, Encode, Decode, TypeInfo)]
#[cfg_attr(feature = "std", derive(Debug))]
#[derive(Clone, Eq, PartialEq, Encode, Decode, TypeInfo, RuntimeDebug, MaxEncodedLen)]
pub struct OutboundChannelDetails {
/// The `ParaId` of the parachain that this channel is connected with.
recipient: ParaId,
Expand Down Expand Up @@ -375,7 +405,7 @@ impl OutboundChannelDetails {
}
}

#[derive(Copy, Clone, Eq, PartialEq, Encode, Decode, RuntimeDebug, TypeInfo)]
#[derive(Copy, Clone, Eq, PartialEq, Encode, Decode, RuntimeDebug, TypeInfo, MaxEncodedLen)]
pub struct QueueConfigData {
/// The number of pages which must be in the queue for the other side to be told to suspend
/// their sending.
Expand Down Expand Up @@ -478,7 +508,10 @@ impl<T: Config> Pallet<T> {
{
details
} else {
all_channels.push(OutboundChannelDetails::new(recipient));
all_channels.try_push(OutboundChannelDetails::new(recipient)).map_err(|e| {
log::error!("Failed to activate HRMP channel: {:?}", e);
MessageSendError::TooManyChannels
})?;
all_channels
.last_mut()
.expect("can't be empty; a new element was just pushed; qed")
Expand All @@ -503,7 +536,9 @@ impl<T: Config> Pallet<T> {
if page.len() + encoded_fragment.len() > max_message_size {
return None
}
page.extend_from_slice(&encoded_fragment[..]);
for frag in encoded_fragment.iter() {
page.try_push(*frag).ok()?;
ggwpez marked this conversation as resolved.
Show resolved Hide resolved
}
Some(page.len())
},
)
Expand All @@ -521,7 +556,10 @@ impl<T: Config> Pallet<T> {
new_page.extend_from_slice(&encoded_fragment[..]);
let last_page_size = new_page.len();
let number_of_pages = (channel_details.last_index - channel_details.first_index) as u32;
<OutboundXcmpMessages<T>>::insert(recipient, page_index, new_page);
let bounded_page = BoundedVec::<u8, T::MaxPageSize>::try_from(new_page)
.map_err(|_| MessageSendError::TooBig)?;
let bounded_page = WeakBoundedVec::force_from(bounded_page.into_inner(), None);
<OutboundXcmpMessages<T>>::insert(recipient, page_index, bounded_page);
<OutboundXcmpStatus<T>>::put(all_channels);
(number_of_pages, last_page_size)
};
Expand All @@ -543,17 +581,24 @@ impl<T: Config> Pallet<T> {

/// Sends a signal to the `dest` chain over XCMP. This is guaranteed to be dispatched on this
/// block.
fn send_signal(dest: ParaId, signal: ChannelSignal) {
fn send_signal(dest: ParaId, signal: ChannelSignal) -> Result<(), Error<T>> {
let mut s = <OutboundXcmpStatus<T>>::get();
if let Some(details) = s.iter_mut().find(|item| item.recipient == dest) {
details.signals_exist = true;
} else {
s.push(OutboundChannelDetails::new(dest).with_signals());
s.try_push(OutboundChannelDetails::new(dest).with_signals())
.map_err(|_| Error::<T>::TooManyActiveOutboundChannels)?;
}
<SignalMessages<T>>::mutate(dest, |page| {
*page = (XcmpMessageFormat::Signals, signal).encode();
});

let page = BoundedVec::<u8, T::MaxPageSize>::try_from(
(XcmpMessageFormat::Signals, signal).encode(),
)
.map_err(|_| Error::<T>::TooBig)?;
let page = WeakBoundedVec::force_from(page.into_inner(), None);

<SignalMessages<T>>::insert(dest, page);
<OutboundXcmpStatus<T>>::put(s);
Ok(())
}

fn suspend_channel(target: ParaId) {
Expand All @@ -563,7 +608,9 @@ impl<T: Config> Pallet<T> {
defensive_assert!(ok, "WARNING: Attempt to suspend channel that was not Ok.");
details.state = OutboundState::Suspended;
} else {
s.push(OutboundChannelDetails::new(target).with_suspended_state());
if s.try_push(OutboundChannelDetails::new(target).with_suspended_state()).is_err() {
defensive!("Cannot pause channel; too many outbound channels");
}
}
});
}
Expand Down Expand Up @@ -664,18 +711,25 @@ impl<T: Config> OnQueueChanged<ParaId> for Pallet<T> {
let suspended = suspended_channels.contains(&para);

if suspended && fp.ready_pages <= resume_threshold {
Self::send_signal(para, ChannelSignal::Resume);

suspended_channels.remove(&para);
<InboundXcmpSuspended<T>>::put(suspended_channels);
if let Err(err) = Self::send_signal(para, ChannelSignal::Resume) {
log::error!("defensive: Could not send resumption signal to inbound channel of sibling {:?}: {:?}; channel remains suspended.", para, err);
} else {
suspended_channels.remove(&para);
<InboundXcmpSuspended<T>>::put(suspended_channels);
}
} else if !suspended && fp.ready_pages >= suspend_threshold {
log::warn!("XCMP queue for sibling {:?} is full; suspending channel.", para);
Self::send_signal(para, ChannelSignal::Suspend);

if let Err(err) = suspended_channels.try_insert(para) {
if let Err(err) = Self::send_signal(para, ChannelSignal::Suspend) {
// It will retry if `drop_threshold` is not reached, but it could be too late.
log::error!(
"defensive: Could not send suspension signal; future messages may be dropped: {:?}", err
);
} else if let Err(err) = suspended_channels.try_insert(para) {
log::error!("Too many channels suspended; cannot suspend sibling {:?}: {:?}; further messages may be dropped.", para, err);
} else {
<InboundXcmpSuspended<T>>::put(suspended_channels);
}
<InboundXcmpSuspended<T>>::put(suspended_channels);
}
}
}
Expand Down Expand Up @@ -842,7 +896,7 @@ impl<T: Config> XcmpMessageSource for Pallet<T> {
// since it's so unlikely then for now we just drop it.
defensive!("WARNING: oversize message in queue - dropping");
} else {
result.push((para_id, page));
result.push((para_id, page.into_inner()));
}

let max_total_size = match T::ChannelInfo::get_channel_info(para_id) {
Expand Down Expand Up @@ -890,7 +944,9 @@ impl<T: Config> XcmpMessageSource for Pallet<T> {
let pruned = old_statuses_len - statuses.len();
// removing an item from status implies a message being sent, so the result messages must
// be no less than the pruned channels.
statuses.rotate_left(result.len().saturating_sub(pruned));
let _ = statuses.try_rotate_left(result.len().saturating_sub(pruned)).defensive_proof(
"Could not store HRMP channels config. Some HRMP channels may be broken.",
);

<OutboundXcmpStatus<T>>::put(statuses);

Expand Down
4 changes: 3 additions & 1 deletion cumulus/pallets/xcmp-queue/src/migration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

//! A module that is responsible for migration of storage.

pub mod v5;

use crate::{Config, OverweightIndex, Pallet, QueueConfig, QueueConfigData, DEFAULT_POV_SIZE};
use cumulus_primitives_core::XcmpMessageFormat;
use frame_support::{
Expand All @@ -25,7 +27,7 @@ use frame_support::{
};

/// The in-code storage version.
pub const STORAGE_VERSION: StorageVersion = StorageVersion::new(4);
pub const STORAGE_VERSION: StorageVersion = StorageVersion::new(5);

pub const LOG: &str = "runtime::xcmp-queue-migration";

Expand Down
Loading
Loading