Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix core sharing and make use of scheduling_lookahead #4724

Merged
merged 37 commits into from
Jun 19, 2024
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
b64eb22
Zombienet test for two parachains sharing a core
tdimitrov May 30, 2024
4deb507
fixes
alindima Jun 3, 2024
c94c398
further simplify the fix
alindima Jun 3, 2024
732a74f
prospective-parachains: hack fetch_upcoming_paras
tdimitrov Jun 5, 2024
49193f3
Use claim queue in the validator side of collator protocol
tdimitrov Jun 6, 2024
00eed19
Use claim queue in the collator side of collator protocol
tdimitrov Jun 6, 2024
4ddb874
Use claim queue in `fetch_upcoming_paras` from `prospective-parachains`
tdimitrov Jun 6, 2024
8190687
add prospective parachains tests
alindima Jun 11, 2024
a2f0a25
fix collator side and collator side tests
alindima Jun 11, 2024
b979563
Merge remote-tracking branch 'origin/master' into tsv-ct-core-sharing
alindima Jun 11, 2024
c996a90
clippy
alindima Jun 11, 2024
c874cf1
it works!!
alindima Jun 12, 2024
ec70d6b
Merge remote-tracking branch 'origin/master' into tsv-ct-core-sharing
alindima Jun 12, 2024
b88f4e8
properly fix backing
alindima Jun 12, 2024
8afe82f
fix unit tests
alindima Jun 12, 2024
7050c2e
update comments
alindima Jun 13, 2024
e23187b
add proper zombienet test
alindima Jun 13, 2024
a3560a3
review comments
alindima Jun 17, 2024
2fe2420
symlink assign-core.js
alindima Jun 17, 2024
8f1d8e0
try fixing zombienet
alindima Jun 17, 2024
3469f2d
fix compilation
alindima Jun 17, 2024
889cb32
clippy again
alindima Jun 17, 2024
a4387a2
add prdoc
alindima Jun 17, 2024
8532f7d
try fixing prdoc
alindima Jun 17, 2024
7c1fc91
Merge remote-tracking branch 'origin/master' into tsv-ct-core-sharing
alindima Jun 17, 2024
e85b19f
try fixing zombienet
alindima Jun 17, 2024
4d69a3a
more zombienet
alindima Jun 17, 2024
829af4d
semver
alindima Jun 18, 2024
a6998be
use relative symlinks
alindima Jun 18, 2024
21be690
CI stuff
alindima Jun 18, 2024
2cb6bf2
small review comment
alindima Jun 18, 2024
8f860c1
add copy command to CI file
alindima Jun 18, 2024
e343989
fix gitlab yaml
alindima Jun 18, 2024
69d16cd
lower glutton compute
alindima Jun 18, 2024
4c3635b
Merge remote-tracking branch 'origin/master' into tsv-ct-core-sharing
alindima Jun 18, 2024
9eea0c6
relax zombienet constraint
alindima Jun 18, 2024
9c118cb
don't compute the validator group for an unscheduled core when using …
alindima Jun 18, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions .gitlab/pipeline/zombienet/polkadot.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
VERSION_TO_USE=$(echo "${BUILD_RELEASE_VERSION}\n$VERSIONS"|sort -r|grep -A1 "${BUILD_RELEASE_VERSION}"|tail -1);
export ZOMBIENET_INTEGRATION_TEST_SECONDARY_IMAGE="docker.io/parity/polkadot:${VERSION_TO_USE}";
fi
- cp ${LOCAL_DIR}/assign-core.js ${LOCAL_DIR}/functional
- cp ${LOCAL_DIR}/assign-core.js ${LOCAL_DIR}/elastic_scaling
- echo "Zombienet Tests Config"
- echo "gh-dir ${GH_DIR}"
- echo "local-dir ${LOCAL_DIR}"
Expand Down Expand Up @@ -199,6 +201,14 @@ zombienet-polkadot-functional-0014-chunk-fetching-network-compatibility:
--local-dir="${LOCAL_DIR}/functional"
--test="0014-chunk-fetching-network-compatibility.zndsl"

zombienet-polkadot-functional-0015-coretime-shared-core:
extends:
- .zombienet-polkadot-common
script:
- /home/nonroot/zombie-net/scripts/ci/run-test-local-env-manager.sh
--local-dir="${LOCAL_DIR}/functional"
--test="0015-coretime-shared-core.zndsl"

zombienet-polkadot-smoke-0001-parachains-smoke-test:
extends:
- .zombienet-polkadot-common
Expand Down
72 changes: 30 additions & 42 deletions polkadot/node/core/backing/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,8 +213,6 @@ struct PerRelayParentState {
parent: Hash,
/// Session index.
session_index: SessionIndex,
/// The `ParaId`s assigned to the local validator at this relay parent.
assigned_paras: Vec<ParaId>,
/// The `CoreIndex` assigned to the local validator at this relay parent.
assigned_core: Option<CoreIndex>,
/// The candidates that are backed by enough validators in their group, by hash.
Expand All @@ -234,10 +232,11 @@ struct PerRelayParentState {
/// If true, we're appending extra bits in the BackedCandidate validator indices bitfield,
/// which represent the assigned core index. True if ElasticScalingMVP is enabled.
inject_core_index: bool,
/// The core states for all cores.
cores: Vec<CoreState>,
/// Optional claim queue state. Will be `None` if the runtime does not support this API.
claim_queue: Option<ClaimQueueSnapshot>,
/// The number of cores.
n_cores: u32,
/// Claim queue state. If the runtime API is not available, it'll be populated with info from
/// availability cores.
claim_queue: ClaimQueueSnapshot,
/// The validator index -> group mapping at this relay parent.
validator_to_group: Arc<IndexedVec<ValidatorIndex, Option<GroupIndex>>>,
/// The associated group rotation information.
Expand Down Expand Up @@ -1007,15 +1006,13 @@ macro_rules! try_runtime_api {
fn core_index_from_statement(
validator_to_group: &IndexedVec<ValidatorIndex, Option<GroupIndex>>,
group_rotation_info: &GroupRotationInfo,
cores: &[CoreState],
maybe_claim_queue: Option<&ClaimQueueSnapshot>,
n_cores: u32,
claim_queue: &ClaimQueueSnapshot,
statement: &SignedFullStatementWithPVD,
) -> Option<CoreIndex> {
let compact_statement = statement.as_unchecked();
let candidate_hash = CandidateHash(*compact_statement.unchecked_payload().candidate_hash());

let n_cores = cores.len();

gum::trace!(
target:LOG_TARGET,
?group_rotation_info,
Expand Down Expand Up @@ -1044,32 +1041,21 @@ fn core_index_from_statement(
// First check if the statement para id matches the core assignment.
let core_index = group_rotation_info.core_for_group(*group_index, n_cores as _);

if core_index.0 as usize > n_cores {
if core_index.0 > n_cores {
gum::warn!(target: LOG_TARGET, ?candidate_hash, ?core_index, n_cores, "Invalid CoreIndex");
return None
}

if let StatementWithPVD::Seconded(candidate, _pvd) = statement.payload() {
let candidate_para_id = candidate.descriptor.para_id;
let assigned_paras: Vec<ParaId> = if let Some(claim_queue) = maybe_claim_queue {
claim_queue.iter_claims_for_core(&core_index).into_iter().collect()
} else {
match &cores[core_index.0 as usize] {
CoreState::Free => None,
CoreState::Occupied(occupied) =>
occupied.next_up_on_available.as_ref().map(|c| c.para_id),
CoreState::Scheduled(scheduled) => Some(scheduled.para_id),
}
.into_iter()
.collect()
};
let mut assigned_paras = claim_queue.iter_claims_for_core(&core_index);

if !assigned_paras.contains(&candidate_para_id) {
if !assigned_paras.any(|id| id == &candidate_para_id) {
gum::debug!(
target: LOG_TARGET,
?candidate_hash,
?core_index,
?assigned_paras,
assigned_paras = ?claim_queue.iter_claims_for_core(&core_index).collect::<Vec<_>>(),
?candidate_para_id,
"Invalid CoreIndex, core is not assigned to this para_id"
);
Expand Down Expand Up @@ -1158,32 +1144,33 @@ async fn construct_per_relay_parent_state<Context>(

let mut groups = HashMap::<CoreIndex, Vec<ValidatorIndex>>::new();
let mut assigned_core = None;
let mut assigned_paras = vec![];

let has_claim_queue = maybe_claim_queue.is_some();
let mut claim_queue = maybe_claim_queue.unwrap_or_default().0;
eskimor marked this conversation as resolved.
Show resolved Hide resolved

for (idx, core) in cores.iter().enumerate() {
let core_index = CoreIndex(idx as _);
let core_paras = if let Some(claim_queue) = &maybe_claim_queue {
claim_queue.iter_claims_for_core(&core_index).into_iter().collect()
} else {

if !has_claim_queue {
match core {
CoreState::Scheduled(scheduled) => vec![scheduled.para_id],
CoreState::Scheduled(scheduled) =>
claim_queue.insert(core_index, [scheduled.para_id].into_iter().collect()),
CoreState::Occupied(occupied) if mode.is_enabled() => {
// Async backing makes it legal to build on top of
// occupied core.
if let Some(next) = &occupied.next_up_on_available {
vec![next.para_id]
claim_queue.insert(core_index, [next.para_id].into_iter().collect())
} else {
continue
}
},
_ => continue,
}
};
};
}

let group_index = group_rotation_info.group_for_core(core_index, n_cores);
if let Some(g) = validator_groups.get(group_index.0 as usize) {
if validator.as_ref().map_or(false, |v| g.contains(&v.index())) {
assigned_paras = core_paras;
assigned_core = Some(core_index);
}
groups.insert(core_index, g.clone());
Expand Down Expand Up @@ -1218,7 +1205,6 @@ async fn construct_per_relay_parent_state<Context>(
parent,
session_index,
assigned_core,
assigned_paras,
backed: HashSet::new(),
table: Table::new(table_config),
table_context,
Expand All @@ -1227,8 +1213,8 @@ async fn construct_per_relay_parent_state<Context>(
fallbacks: HashMap::new(),
minimum_backing_votes,
inject_core_index,
cores,
claim_queue: maybe_claim_queue,
n_cores: cores.len() as u32,
claim_queue: ClaimQueueSnapshot::from(claim_queue),
validator_to_group: validator_to_group.clone(),
group_rotation_info,
}))
Expand Down Expand Up @@ -1681,8 +1667,8 @@ async fn import_statement<Context>(
let core = core_index_from_statement(
&rp_state.validator_to_group,
&rp_state.group_rotation_info,
&rp_state.cores,
rp_state.claim_queue.as_ref(),
rp_state.n_cores,
&rp_state.claim_queue,
statement,
)
.ok_or(Error::CoreIndexUnavailable)?;
Expand Down Expand Up @@ -2106,12 +2092,14 @@ async fn handle_second_message<Context>(
return Ok(())
}

let assigned_paras = rp_state.assigned_core.and_then(|core| rp_state.claim_queue.0.get(&core));

// Sanity check that candidate is from our assignment.
if !rp_state.assigned_paras.contains(&candidate.descriptor().para_id) {
if !matches!(assigned_paras, Some(paras) if paras.contains(&candidate.descriptor().para_id)) {
gum::debug!(
target: LOG_TARGET,
our_assignment_core = ?rp_state.assigned_core,
our_assignment_paras = ?rp_state.assigned_paras,
our_assignment_paras = ?assigned_paras,
collation = ?candidate.descriptor().para_id,
"Subsystem asked to second for para outside of our assignment",
);
Expand All @@ -2121,7 +2109,7 @@ async fn handle_second_message<Context>(
gum::debug!(
target: LOG_TARGET,
our_assignment_core = ?rp_state.assigned_core,
our_assignment_paras = ?rp_state.assigned_paras,
our_assignment_paras = ?assigned_paras,
collation = ?candidate.descriptor().para_id,
"Current assignments vs collation",
);
Expand Down
12 changes: 6 additions & 6 deletions polkadot/node/core/backing/src/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1137,8 +1137,8 @@ fn extract_core_index_from_statement_works() {
let core_index_1 = core_index_from_statement(
&test_state.validator_to_group,
&test_state.validator_groups.1,
&test_state.availability_cores,
Some(&test_state.claim_queue.clone().into()),
test_state.availability_cores.len() as _,
&test_state.claim_queue.clone().into(),
&signed_statement_1,
)
.unwrap();
Expand All @@ -1148,8 +1148,8 @@ fn extract_core_index_from_statement_works() {
let core_index_2 = core_index_from_statement(
&test_state.validator_to_group,
&test_state.validator_groups.1,
&test_state.availability_cores,
Some(&test_state.claim_queue.clone().into()),
test_state.availability_cores.len() as _,
&test_state.claim_queue.clone().into(),
&signed_statement_2,
);

Expand All @@ -1159,8 +1159,8 @@ fn extract_core_index_from_statement_works() {
let core_index_3 = core_index_from_statement(
&test_state.validator_to_group,
&test_state.validator_groups.1,
&test_state.availability_cores,
Some(&test_state.claim_queue.clone().into()),
test_state.availability_cores.len() as _,
&test_state.claim_queue.clone().into(),
&signed_statement_3,
)
.unwrap();
Expand Down
24 changes: 12 additions & 12 deletions polkadot/node/core/prospective-parachains/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -871,17 +871,15 @@ async fn fetch_backing_state<Context>(
async fn fetch_upcoming_paras<Context>(
ctx: &mut Context,
relay_parent: Hash,
) -> JfyiErrorResult<Vec<ParaId>> {
let mut upcoming = HashSet::new();

match fetch_claim_queue(ctx.sender(), relay_parent).await? {
) -> JfyiErrorResult<HashSet<ParaId>> {
Ok(match fetch_claim_queue(ctx.sender(), relay_parent).await? {
Some(claim_queue) => {
// Runtime supports claim queue - use it
for (_, claims) in claim_queue.iter_all_claims() {
for claim in claims {
upcoming.insert(*claim);
}
}
claim_queue
.iter_all_claims()
.flat_map(|(_, paras)| paras.into_iter())
.copied()
.collect()
},
None => {
// fallback to availability cores - remove this branch once claim queue is released
Expand All @@ -894,6 +892,8 @@ async fn fetch_upcoming_paras<Context>(
.await;

let cores = rx.await.map_err(JfyiError::RuntimeApiRequestCanceled)??;

let mut upcoming = HashSet::with_capacity(cores.len());
for core in cores {
match core {
CoreState::Occupied(occupied) => {
Expand All @@ -912,10 +912,10 @@ async fn fetch_upcoming_paras<Context>(
CoreState::Free => {},
}
}
},
}

Ok(upcoming.into_iter().collect())
upcoming
},
})
}

// Fetch ancestors in descending order, up to the amount requested.
Expand Down
13 changes: 2 additions & 11 deletions polkadot/node/core/prospective-parachains/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -319,18 +319,9 @@ async fn handle_leaf_activation(
);
}

for _ in 0..test_state
.claim_queue
.values()
.fold(HashSet::new(), |mut acc, paras| {
for para in paras {
acc.insert(*para);
}
let paras: HashSet<_> = test_state.claim_queue.values().flatten().collect();

acc
})
.len()
{
for _ in 0..paras.len() {
let message = virtual_overseer.recv().await;
// Get the para we are working with since the order is not deterministic.
let para_id = match &message {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -588,8 +588,7 @@ async fn determine_cores(
// Runtime supports claim queue - use it.
claim_queue
.iter_claims_for_core(&CoreIndex(idx as u32))
alindima marked this conversation as resolved.
Show resolved Hide resolved
.into_iter()
.any(|para| para == para_id)
.any(|para| para == &para_id)
},
None => match core {
CoreState::Scheduled(scheduled) if scheduled.para_id == para_id => true,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ impl Collations {
}
}

/// Checks the limit of seconded candidates for a given para.
/// Checks the limit of seconded candidates.
pub(super) fn is_seconded_limit_reached(
&self,
relay_parent_mode: ProspectiveParachainsMode,
Expand Down
47 changes: 23 additions & 24 deletions polkadot/node/network/collator-protocol/src/validator_side/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -508,35 +508,34 @@ where
// `relay_parent_mode` is not examined here because if the runtime supports claim queue
// then it supports async backing params too (`ASYNC_BACKING_STATE_RUNTIME_REQUIREMENT`
// < `CLAIM_QUEUE_RUNTIME_REQUIREMENT`).
Some(claim_queue) =>
claim_queue.iter_claims_for_core(&core_now).into_iter().collect::<Vec<_>>(),
Some(mut claim_queue) => claim_queue.0.remove(&core_now),
// Claim queue is not supported by the runtime - use availability cores instead.
None => cores
.get(core_now.0 as usize)
.and_then(|c| match c {
CoreState::Occupied(core) if relay_parent_mode.is_enabled() =>
core.next_up_on_available.as_ref().map(|c| c.para_id),
CoreState::Scheduled(core) => Some(core.para_id),
CoreState::Occupied(_) | CoreState::Free => None,
})
.into_iter()
.collect::<Vec<_>>(),
None => cores.get(core_now.0 as usize).and_then(|c| match c {
CoreState::Occupied(core) if relay_parent_mode.is_enabled() =>
core.next_up_on_available.as_ref().map(|c| [c.para_id].into_iter().collect()),
CoreState::Scheduled(core) => Some([core.para_id].into_iter().collect()),
CoreState::Occupied(_) | CoreState::Free => None,
}),
};

for para_id in paras_now.iter() {
let entry = current_assignments.entry(*para_id).or_default();
*entry += 1;
if *entry == 1 {
gum::debug!(
target: LOG_TARGET,
?relay_parent,
para_id = ?para_id,
"Assigned to a parachain",
);
if let Some(paras) = paras_now {
alindima marked this conversation as resolved.
Show resolved Hide resolved
for para_id in paras.iter() {
let entry = current_assignments.entry(*para_id).or_default();
*entry += 1;
if *entry == 1 {
gum::debug!(
target: LOG_TARGET,
?relay_parent,
para_id = ?para_id,
"Assigned to a parachain",
);
}
}
}

*group_assignment = GroupAssignments { current: paras_now };
*group_assignment = GroupAssignments { current: paras.into_iter().collect() };
} else {
*group_assignment = GroupAssignments { current: vec![] };
}

Ok(())
}
Expand Down
2 changes: 1 addition & 1 deletion polkadot/node/network/statement-distribution/src/v2/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -741,7 +741,7 @@ fn find_active_validator_state(

let core_index = group_rotation_info.core_for_group(our_group, availability_cores.len());
let paras_assigned_to_core = if let Some(claim_queue) = maybe_claim_queue {
claim_queue.iter_claims_for_core(&core_index).into_iter().collect()
claim_queue.iter_claims_for_core(&core_index).copied().collect()
} else {
availability_cores
.get(core_index.0 as usize)
Expand Down
Loading
Loading