diff --git a/.circleci/config.yml b/.circleci/config.yml index a6c49f4b4..5f488cb09 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -139,11 +139,12 @@ jobs: ulimit -u 20000 ulimit -n 20000 cargo test --all --verbose --release lifecycle -- --ignored --nocapture - cargo test -p storage-proofs-porep --features isolated-testing --release checkout_cores -- --test-threads=1 + cargo test -p storage-proofs-porep --features isolated-testing --release --lib stacked::vanilla::cores cargo test -p storage-proofs-porep --features isolated-testing --release test_parallel_generation_and_read_partial_range_v1_0 cargo test -p storage-proofs-porep --features isolated-testing --release test_parallel_generation_and_read_partial_range_v1_1 no_output_timeout: 30m environment: + RUST_LOG: debug RUST_TEST_THREADS: 1 FIL_PROOFS_USE_MULTICORE_SDR: true diff --git a/storage-proofs-porep/src/stacked/vanilla/cores.rs b/storage-proofs-porep/src/stacked/vanilla/cores.rs index fc486c923..850b03a74 100644 --- a/storage-proofs-porep/src/stacked/vanilla/cores.rs +++ b/storage-proofs-porep/src/stacked/vanilla/cores.rs @@ -1,32 +1,33 @@ +use std::convert::TryInto; use std::sync::{Mutex, MutexGuard}; use anyhow::{format_err, Result}; use hwloc::{Bitmap, ObjectType, Topology, TopologyObject, CPUBIND_THREAD}; use lazy_static::lazy_static; -use log::{debug, info, warn}; +use log::{debug, warn}; use storage_proofs_core::settings::SETTINGS; -type CoreGroup = Vec; +type CoreUnit = Vec; lazy_static! { pub static ref TOPOLOGY: Mutex = Mutex::new(Topology::new()); - pub static ref CORE_GROUPS: Option>> = { + pub static ref CORE_GROUPS: Option>> = { let num_producers = &SETTINGS.multicore_sdr_producers; let cores_per_unit = num_producers + 1; - core_groups(cores_per_unit) + core_units(cores_per_unit) }; } #[derive(Clone, Copy, Debug, PartialEq)] -/// `CoreIndex` is a simple wrapper type for indexes into the set of vixible cores. A `CoreIndex` should only ever be -/// created with a value known to be less than the number of visible cores. +/// `CoreIndex` is a simple wrapper type for indexes into the set of visible cores. A `CoreIndex` +/// should only ever be created with a value known to be less than the number of visible cores. pub struct CoreIndex(usize); -pub fn checkout_core_group() -> Option> { +pub fn checkout_core_group() -> Option> { match &*CORE_GROUPS { - Some(groups) => { - for (i, group) in groups.iter().enumerate() { - match group.try_lock() { + Some(units) => { + for (i, unit) in units.iter().enumerate() { + match unit.try_lock() { Ok(guard) => { debug!("checked out core group {}", i); return Some(guard); @@ -122,70 +123,129 @@ fn get_core_by_index(topo: &Topology, index: CoreIndex) -> Result<&TopologyObjec } } -fn core_groups(cores_per_unit: usize) -> Option>>> { +/// Group all available cores, that share a (L3) cache in a way, so that the multicore SDR can +/// operate most efficiently. +/// +/// A single multicore SDR run needs a certain amount of cores, a so-called *unit*. +/// `cores_per_unit` defines how many cores are dedicated to a single multicore SDR instance. +/// +/// On larget systems, the available cores (given by `core_count`) may be connected to separate +/// (L3) caches. All cores that belong to the same cache are called a *group*, the number of +/// groups is given by `group_count`. On smaller systems, like laptops, there usually is just a +/// single group. +/// +/// A unit is always bound to a single group. Groups may be large enough to bind multiple units. +/// Though for performance reasons it is preferred that units don't share a cache, hence the units +/// are distributed across separate groups first. Only if all groups are already bound to a unit, +/// a group will be re-used. +/// +/// Here's an example: you have a 48 core system, with 8 separate caches and you have units of size +/// 3. Your `core_count` is 48, the `group_count` is 8 and the `cores_per_unit` is 3. In every +/// group we have 6 cores available. This means that we can have two units bound to a single group. +/// You start scheduling multiple SDR multicore jobs. The first job is bound to the first group +/// which cointains cores 0, 1 and 2. The second job is then bound to the second group, which +/// contains cores 6, 7 and 8. It is *not* bound to the cores 3, 4 and 5, which belong to the first +/// group. They would fight for the same cache, which isn't ideal. Those cores will only be used +/// once all 8 groups have already a single unit bound. +/// +/// Not necessarily all cores will be used. If you e.g. have a system as in the example above, but +/// your unit is of size 4 (instead of 3), then only a single unit fits (due to its size) into a +/// single group. This would mean that the first group would only consist of cores 0, 1, 2 and 3. +/// Cores 4, 5 would be unassigned. If you schedule more than 8 multicore SDR jobs, those jobs can +/// pick any core, whicher the operating system decides to use. +fn create_core_units( + core_count: usize, + group_count: usize, + cores_per_unit: usize, +) -> Vec> { + assert_eq!(0, core_count % group_count); + // The number of cores that belong to a single group. + let group_size = core_count / group_count; + + // The number of units that can fit into single group. + let units_per_group = group_size / cores_per_unit; + + // The total number of units that can be bound to specific cores on the system. + let unit_count = group_count * units_per_group; + + debug!( + "Cores: {}, Shared Caches: {}, cores per cache (group_size): {}, cores per unit: {}", + core_count, group_count, group_size, cores_per_unit + ); + + let core_units = (0..unit_count) + .map(|i| { + (0..cores_per_unit) + .map(|j| { + // Every group gets a single unit assigned first. Only if all groups have + // already one unit, a second one will be assigned if possible. This would then + // be the second "round" of assignments. + let round = i / group_count; + // The index of the core that is bound to a unit. + let core_index = (j + i * group_size) % core_count + (round * cores_per_unit); + assert!(core_index < core_count); + core_index + }) + .collect::>() + }) + .collect::>(); + debug!("Core units: {:?}", core_units); + core_units +} + +/// Returns the number of caches that are shared between cores. +/// +/// The hwloc topology is traverse upwards starting at the given depth. As soon as there are less +/// objects than cores, we expect it to be a cache that is shared between those cores. +/// +/// When traversing upwards from the cores, the first level you reach could e.g. be a L2 cache +/// which every core has its own. But then you might reach the L3 cache, that is shared between +/// several cores. +fn get_shared_cache_count(topo: &Topology, depth: u32, core_count: usize) -> usize { + let mut depth = depth; + while depth > 0 { + let obj_count: usize = topo + .size_at_depth(depth) + .try_into() + .expect("Platform must be at lest 32-bit"); + if obj_count < core_count { + return obj_count; + } + depth -= 1; + } + 1 +} + +fn core_units(cores_per_unit: usize) -> Option>> { let topo = TOPOLOGY.lock().expect("poisoned lock"); + // At which depths the cores within one package are. If you think of the "depths" as a + // directory tree, it's the directory where all cores are stored. let core_depth = match topo.depth_or_below_for_type(&ObjectType::Core) { Ok(depth) => depth, Err(_) => return None, }; + let all_cores = topo .objects_with_type(&ObjectType::Core) .expect("objects_with_type failed"); + // The total number of physical cores, even across packages. let core_count = all_cores.len(); - let mut cache_depth = core_depth; - let mut cache_count = 1; - - while cache_depth > 0 { - let objs = topo.objects_at_depth(cache_depth); - let obj_count = objs.len(); - if obj_count < core_count { - cache_count = obj_count; - break; - } - - cache_depth -= 1; - } - - assert_eq!(0, core_count % cache_count); - let mut group_size = core_count / cache_count; - let mut group_count = cache_count; - - if cache_count <= 1 { - // If there are not more than one shared caches, there is no benefit in trying to group cores by cache. - // In that case, prefer more groups so we can still bind cores and also get some parallelism. - // Create as many full groups as possible. The last group may not be full. - group_count = core_count / cores_per_unit; - group_size = cores_per_unit; - - info!( - "found only {} shared cache(s), heuristically grouping cores into {} groups", - cache_count, group_count - ); - } else { - debug!( - "Cores: {}, Shared Caches: {}, cores per cache (group_size): {}", - core_count, cache_count, group_size - ); - } - - let core_groups = (0..group_count) - .map(|i| { - (0..group_size) - .map(|j| { - let core_index = i * group_size + j; - assert!(core_index < core_count); - CoreIndex(core_index) - }) - .collect::>() - }) - .collect::>(); + // The number of separate caches the cores are grouped into. There could e.g. be a machine with + // 48 cores. Those cores are separated into 2 packages, where each of them has 4 sepearate + // caches, where each cache contains 6 cores. Then the `group_count` would be 8. + let group_count = get_shared_cache_count(&topo, core_depth, core_count); + // The list of units the multicore SDR threads can be bound to. + let core_units = create_core_units(core_count, group_count, cores_per_unit); Some( - core_groups + core_units .iter() - .map(|group| Mutex::new(group.clone())) + .map(|unit| { + let unit_core_index = unit.iter().map(|core| CoreIndex(*core)).collect(); + Mutex::new(unit_core_index) + }) .collect::>(), ) } @@ -196,7 +256,8 @@ mod tests { #[test] fn test_cores() { - core_groups(2); + fil_logger::maybe_init(); + core_units(2); } #[test] @@ -205,6 +266,7 @@ mod tests { // the cores we're working with may otherwise be busy and cause a // failure. fn test_checkout_cores() { + fil_logger::maybe_init(); let checkout1 = checkout_core_group(); dbg!(&checkout1); let checkout2 = checkout_core_group(); @@ -216,4 +278,94 @@ mod tests { _ => panic!("failed to get two checkouts"), } } + + #[test] + fn test_create_core_units() { + fil_logger::maybe_init(); + + let ci = create_core_units(18, 1, 4); + assert_eq!( + ci, + [[0, 1, 2, 3], [4, 5, 6, 7], [8, 9, 10, 11], [12, 13, 14, 15]] + ); + + let dc = create_core_units(32, 2, 4); + assert_eq!( + dc, + [ + [0, 1, 2, 3], + [16, 17, 18, 19], + [4, 5, 6, 7], + [20, 21, 22, 23], + [8, 9, 10, 11], + [24, 25, 26, 27], + [12, 13, 14, 15], + [28, 29, 30, 31] + ] + ); + + let amd = create_core_units(16, 4, 4); + assert_eq!( + amd, + [[0, 1, 2, 3], [4, 5, 6, 7], [8, 9, 10, 11], [12, 13, 14, 15]] + ); + + let amd_not_filled = create_core_units(16, 4, 3); + assert_eq!( + amd_not_filled, + [[0, 1, 2], [4, 5, 6], [8, 9, 10], [12, 13, 14]] + ); + + let amd_not_filled = create_core_units(16, 4, 3); + assert_eq!( + amd_not_filled, + [[0, 1, 2], [4, 5, 6], [8, 9, 10], [12, 13, 14]] + ); + + let intel = create_core_units(16, 2, 3); + assert_eq!(intel, [[0, 1, 2], [8, 9, 10], [3, 4, 5], [11, 12, 13]]); + + let sp = create_core_units(48, 8, 3); + assert_eq!( + sp, + [ + [0, 1, 2], + [6, 7, 8], + [12, 13, 14], + [18, 19, 20], + [24, 25, 26], + [30, 31, 32], + [36, 37, 38], + [42, 43, 44], + [3, 4, 5], + [9, 10, 11], + [15, 16, 17], + [21, 22, 23], + [27, 28, 29], + [33, 34, 35], + [39, 40, 41], + [45, 46, 47] + ] + ); + + let sp_not_filled = create_core_units(48, 8, 4); + assert_eq!( + sp_not_filled, + [ + [0, 1, 2, 3], + [6, 7, 8, 9], + [12, 13, 14, 15], + [18, 19, 20, 21], + [24, 25, 26, 27], + [30, 31, 32, 33], + [36, 37, 38, 39], + [42, 43, 44, 45] + ] + ); + + let laptop = create_core_units(4, 1, 2); + assert_eq!(laptop, [[0, 1], [2, 3]]); + let laptop_not_filled = create_core_units(4, 1, 3); + assert_eq!(laptop_not_filled, [[0, 1, 2]]); + } }