Skip to content

Commit

Permalink
fix: SDR: make it possible to bind to cores if units > groups
Browse files Browse the repository at this point in the history
For multicore SDR it is important that the producer and conumers share the
same (L3) cache. Hence we bind specific cores to threads. Prior to this
change there was one multicore SDR job per group, even if the group could
accompany multiple of such jobs. If there were more jobs scheduled than
groups available, those additional jobs wouldn't use specific cores, but
whatever the operating system decided.

With this change, additional jobs are now put into the groups in case there
is enough space to accompany them.

Fixes #1556.
  • Loading branch information
vmx committed Jun 20, 2022
1 parent 69e3355 commit fab7903
Show file tree
Hide file tree
Showing 2 changed files with 215 additions and 62 deletions.
3 changes: 2 additions & 1 deletion .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -139,11 +139,12 @@ jobs:
ulimit -u 20000
ulimit -n 20000
cargo test --all --verbose --release lifecycle -- --ignored --nocapture
cargo test -p storage-proofs-porep --features isolated-testing --release checkout_cores -- --test-threads=1
cargo test -p storage-proofs-porep --features isolated-testing --release --lib stacked::vanilla::cores
cargo test -p storage-proofs-porep --features isolated-testing --release test_parallel_generation_and_read_partial_range_v1_0
cargo test -p storage-proofs-porep --features isolated-testing --release test_parallel_generation_and_read_partial_range_v1_1
no_output_timeout: 30m
environment:
RUST_LOG: debug
RUST_TEST_THREADS: 1
FIL_PROOFS_USE_MULTICORE_SDR: true

Expand Down
274 changes: 213 additions & 61 deletions storage-proofs-porep/src/stacked/vanilla/cores.rs
Original file line number Diff line number Diff line change
@@ -1,32 +1,33 @@
use std::convert::TryInto;
use std::sync::{Mutex, MutexGuard};

use anyhow::{format_err, Result};
use hwloc::{Bitmap, ObjectType, Topology, TopologyObject, CPUBIND_THREAD};
use lazy_static::lazy_static;
use log::{debug, info, warn};
use log::{debug, warn};
use storage_proofs_core::settings::SETTINGS;

type CoreGroup = Vec<CoreIndex>;
type CoreUnit = Vec<CoreIndex>;
lazy_static! {
pub static ref TOPOLOGY: Mutex<Topology> = Mutex::new(Topology::new());
pub static ref CORE_GROUPS: Option<Vec<Mutex<CoreGroup>>> = {
pub static ref CORE_GROUPS: Option<Vec<Mutex<CoreUnit>>> = {
let num_producers = &SETTINGS.multicore_sdr_producers;
let cores_per_unit = num_producers + 1;

core_groups(cores_per_unit)
core_units(cores_per_unit)
};
}

#[derive(Clone, Copy, Debug, PartialEq)]
/// `CoreIndex` is a simple wrapper type for indexes into the set of vixible cores. A `CoreIndex` should only ever be
/// created with a value known to be less than the number of visible cores.
/// `CoreIndex` is a simple wrapper type for indexes into the set of visible cores. A `CoreIndex`
/// should only ever be created with a value known to be less than the number of visible cores.
pub struct CoreIndex(usize);

pub fn checkout_core_group() -> Option<MutexGuard<'static, CoreGroup>> {
pub fn checkout_core_group() -> Option<MutexGuard<'static, CoreUnit>> {
match &*CORE_GROUPS {
Some(groups) => {
for (i, group) in groups.iter().enumerate() {
match group.try_lock() {
Some(units) => {
for (i, unit) in units.iter().enumerate() {
match unit.try_lock() {
Ok(guard) => {
debug!("checked out core group {}", i);
return Some(guard);
Expand Down Expand Up @@ -122,70 +123,129 @@ fn get_core_by_index(topo: &Topology, index: CoreIndex) -> Result<&TopologyObjec
}
}

fn core_groups(cores_per_unit: usize) -> Option<Vec<Mutex<Vec<CoreIndex>>>> {
/// Group all available cores, that share a (L3) cache in a way, so that the multicore SDR can
/// operate most efficiently.
///
/// A single multicore SDR run needs a certain amount of cores, a so-called *unit*.
/// `cores_per_unit` defines how many cores are dedicated to a single multicore SDR instance.
///
/// On larget systems, the available cores (given by `core_count`) may be connected to separate
/// (L3) caches. All cores that belong to the same cache are called a *group*, the number of
/// groups is given by `group_count`. On smaller systems, like laptops, there usually is just a
/// single group.
///
/// A unit is always bound to a single group. Groups may be large enough to bind multiple units.
/// Though for performance reasons it is preferred that units don't share a cache, hence the units
/// are distributed across separate groups first. Only if all groups are already bound to a unit,
/// a group will be re-used.
///
/// Here's an example: you have a 48 core system, with 8 separate caches and you have units of size
/// 3. Your `core_count` is 48, the `group_count` is 8 and the `cores_per_unit` is 3. In every
/// group we have 6 cores available. This means that we can have two units bound to a single group.
/// You start scheduling multiple SDR multicore jobs. The first job is bound to the first group
/// which cointains cores 0, 1 and 2. The second job is then bound to the second group, which
/// contains cores 6, 7 and 8. It is *not* bound to the cores 3, 4 and 5, which belong to the first
/// group. They would fight for the same cache, which isn't ideal. Those cores will only be used
/// once all 8 groups have already a single unit bound.
///
/// Not necessarily all cores will be used. If you e.g. have a system as in the example above, but
/// your unit is of size 4 (instead of 3), then only a single unit fits (due to its size) into a
/// single group. This would mean that the first group would only consist of cores 0, 1, 2 and 3.
/// Cores 4, 5 would be unassigned. If you schedule more than 8 multicore SDR jobs, those jobs can
/// pick any core, whicher the operating system decides to use.
fn create_core_units(
core_count: usize,
group_count: usize,
cores_per_unit: usize,
) -> Vec<Vec<usize>> {
assert_eq!(0, core_count % group_count);
// The number of cores that belong to a single group.
let group_size = core_count / group_count;

// The number of units that can fit into single group.
let units_per_group = group_size / cores_per_unit;

// The total number of units that can be bound to specific cores on the system.
let unit_count = group_count * units_per_group;

debug!(
"Cores: {}, Shared Caches: {}, cores per cache (group_size): {}, cores per unit: {}",
core_count, group_count, group_size, cores_per_unit
);

let core_units = (0..unit_count)
.map(|i| {
(0..cores_per_unit)
.map(|j| {
// Every group gets a single unit assigned first. Only if all groups have
// already one unit, a second one will be assigned if possible. This would then
// be the second "round" of assignments.
let round = i / group_count;
// The index of the core that is bound to a unit.
let core_index = (j + i * group_size) % core_count + (round * cores_per_unit);
assert!(core_index < core_count);
core_index
})
.collect::<Vec<_>>()
})
.collect::<Vec<_>>();
debug!("Core units: {:?}", core_units);
core_units
}

/// Returns the number of caches that are shared between cores.
///
/// The hwloc topology is traverse upwards starting at the given depth. As soon as there are less
/// objects than cores, we expect it to be a cache that is shared between those cores.
///
/// When traversing upwards from the cores, the first level you reach could e.g. be a L2 cache
/// which every core has its own. But then you might reach the L3 cache, that is shared between
/// several cores.
fn get_shared_cache_count(topo: &Topology, depth: u32, core_count: usize) -> usize {
let mut depth = depth;
while depth > 0 {
let obj_count: usize = topo
.size_at_depth(depth)
.try_into()
.expect("Platform must be at lest 32-bit");
if obj_count < core_count {
return obj_count;
}
depth -= 1;
}
1
}

fn core_units(cores_per_unit: usize) -> Option<Vec<Mutex<CoreUnit>>> {
let topo = TOPOLOGY.lock().expect("poisoned lock");

// At which depths the cores within one package are. If you think of the "depths" as a
// directory tree, it's the directory where all cores are stored.
let core_depth = match topo.depth_or_below_for_type(&ObjectType::Core) {
Ok(depth) => depth,
Err(_) => return None,
};

let all_cores = topo
.objects_with_type(&ObjectType::Core)
.expect("objects_with_type failed");
// The total number of physical cores, even across packages.
let core_count = all_cores.len();

let mut cache_depth = core_depth;
let mut cache_count = 1;

while cache_depth > 0 {
let objs = topo.objects_at_depth(cache_depth);
let obj_count = objs.len();
if obj_count < core_count {
cache_count = obj_count;
break;
}

cache_depth -= 1;
}

assert_eq!(0, core_count % cache_count);
let mut group_size = core_count / cache_count;
let mut group_count = cache_count;

if cache_count <= 1 {
// If there are not more than one shared caches, there is no benefit in trying to group cores by cache.
// In that case, prefer more groups so we can still bind cores and also get some parallelism.
// Create as many full groups as possible. The last group may not be full.
group_count = core_count / cores_per_unit;
group_size = cores_per_unit;

info!(
"found only {} shared cache(s), heuristically grouping cores into {} groups",
cache_count, group_count
);
} else {
debug!(
"Cores: {}, Shared Caches: {}, cores per cache (group_size): {}",
core_count, cache_count, group_size
);
}

let core_groups = (0..group_count)
.map(|i| {
(0..group_size)
.map(|j| {
let core_index = i * group_size + j;
assert!(core_index < core_count);
CoreIndex(core_index)
})
.collect::<Vec<_>>()
})
.collect::<Vec<_>>();
// The number of separate caches the cores are grouped into. There could e.g. be a machine with
// 48 cores. Those cores are separated into 2 packages, where each of them has 4 sepearate
// caches, where each cache contains 6 cores. Then the `group_count` would be 8.
let group_count = get_shared_cache_count(&topo, core_depth, core_count);

// The list of units the multicore SDR threads can be bound to.
let core_units = create_core_units(core_count, group_count, cores_per_unit);
Some(
core_groups
core_units
.iter()
.map(|group| Mutex::new(group.clone()))
.map(|unit| {
let unit_core_index = unit.iter().map(|core| CoreIndex(*core)).collect();
Mutex::new(unit_core_index)
})
.collect::<Vec<_>>(),
)
}
Expand All @@ -196,7 +256,8 @@ mod tests {

#[test]
fn test_cores() {
core_groups(2);
fil_logger::maybe_init();
core_units(2);
}

#[test]
Expand All @@ -205,6 +266,7 @@ mod tests {
// the cores we're working with may otherwise be busy and cause a
// failure.
fn test_checkout_cores() {
fil_logger::maybe_init();
let checkout1 = checkout_core_group();
dbg!(&checkout1);
let checkout2 = checkout_core_group();
Expand All @@ -216,4 +278,94 @@ mod tests {
_ => panic!("failed to get two checkouts"),
}
}

#[test]
fn test_create_core_units() {
fil_logger::maybe_init();

let ci = create_core_units(18, 1, 4);
assert_eq!(
ci,
[[0, 1, 2, 3], [4, 5, 6, 7], [8, 9, 10, 11], [12, 13, 14, 15]]
);

let dc = create_core_units(32, 2, 4);
assert_eq!(
dc,
[
[0, 1, 2, 3],
[16, 17, 18, 19],
[4, 5, 6, 7],
[20, 21, 22, 23],
[8, 9, 10, 11],
[24, 25, 26, 27],
[12, 13, 14, 15],
[28, 29, 30, 31]
]
);

let amd = create_core_units(16, 4, 4);
assert_eq!(
amd,
[[0, 1, 2, 3], [4, 5, 6, 7], [8, 9, 10, 11], [12, 13, 14, 15]]
);

let amd_not_filled = create_core_units(16, 4, 3);
assert_eq!(
amd_not_filled,
[[0, 1, 2], [4, 5, 6], [8, 9, 10], [12, 13, 14]]
);

let amd_not_filled = create_core_units(16, 4, 3);
assert_eq!(
amd_not_filled,
[[0, 1, 2], [4, 5, 6], [8, 9, 10], [12, 13, 14]]
);

let intel = create_core_units(16, 2, 3);
assert_eq!(intel, [[0, 1, 2], [8, 9, 10], [3, 4, 5], [11, 12, 13]]);

let sp = create_core_units(48, 8, 3);
assert_eq!(
sp,
[
[0, 1, 2],
[6, 7, 8],
[12, 13, 14],
[18, 19, 20],
[24, 25, 26],
[30, 31, 32],
[36, 37, 38],
[42, 43, 44],
[3, 4, 5],
[9, 10, 11],
[15, 16, 17],
[21, 22, 23],
[27, 28, 29],
[33, 34, 35],
[39, 40, 41],
[45, 46, 47]
]
);

let sp_not_filled = create_core_units(48, 8, 4);
assert_eq!(
sp_not_filled,
[
[0, 1, 2, 3],
[6, 7, 8, 9],
[12, 13, 14, 15],
[18, 19, 20, 21],
[24, 25, 26, 27],
[30, 31, 32, 33],
[36, 37, 38, 39],
[42, 43, 44, 45]
]
);

let laptop = create_core_units(4, 1, 2);
assert_eq!(laptop, [[0, 1], [2, 3]]);
let laptop_not_filled = create_core_units(4, 1, 3);
assert_eq!(laptop_not_filled, [[0, 1, 2]]);
}
}

0 comments on commit fab7903

Please sign in to comment.