Skip to content
This repository has been archived by the owner on Feb 15, 2024. It is now read-only.

Commit

Permalink
Merge pull request #237 from subspace/upgrade-to-jan-08
Browse files Browse the repository at this point in the history
Upgrade sdk to gemini-3g-2024-jan-08
  • Loading branch information
ParthDesai authored Jan 17, 2024
2 parents 081db39 + ddd6f65 commit 000c6c7
Show file tree
Hide file tree
Showing 8 changed files with 412 additions and 159 deletions.
313 changes: 262 additions & 51 deletions Cargo.lock

Large diffs are not rendered by default.

11 changes: 7 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@ edition = "2021"

[dependencies]
sdk-dsn = { path = "dsn" }
sdk-farmer = { path = "farmer" }
sdk-farmer = { path = "farmer", default-features = false }
sdk-node = { path = "node" }
sdk-substrate = { path = "substrate" }
sdk-utils = { path = "utils" }
static_assertions = "1.1.0"

subspace-proof-of-space = { git = "https://github.com/subspace/subspace", rev = "d504fed67492e5363b34308767d3281a0b9e21cf" }
subspace-proof-of-space = { git = "https://github.com/subspace/subspace", rev = "bd435100200b3dcce6d6f50534d52e3cd039ca8e" }

# The only triple tested and confirmed as working in `jemallocator` crate is `x86_64-unknown-linux-gnu`
[target.'cfg(all(target_arch = "x86_64", target_vendor = "unknown", target_os = "linux", target_env = "gnu"))'.dev-dependencies]
Expand All @@ -28,7 +28,7 @@ derive_more = "0.99"
fdlimit = "0.2"
futures = "0.3"
serde_json = "1"
subspace-farmer-components = { git = "https://github.com/subspace/subspace", rev = "d504fed67492e5363b34308767d3281a0b9e21cf" }
subspace-farmer-components = { git = "https://github.com/subspace/subspace", rev = "bd435100200b3dcce6d6f50534d52e3cd039ca8e" }
tempfile = "3"
tokio = { version = "1.34.0", features = ["rt-multi-thread", "macros"] }
tracing = "0.1"
Expand Down Expand Up @@ -150,7 +150,10 @@ members = [
]

[features]
default = []
default = ["numa"]
numa = [
"sdk-farmer/numa",
]
integration-test = [
"sdk-utils/integration-test",
"sdk-dsn/integration-test",
Expand Down
6 changes: 3 additions & 3 deletions dsn/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,13 @@ hex = "0.4.3"
parking_lot = "0.12"
prometheus-client = "0.22.0"
sc-client-api = { version = "4.0.0-dev", git = "https://github.com/subspace/polkadot-sdk", rev = "c63a8b28a9fd26d42116b0dcef1f2a5cefb9cd1c" }
sc-consensus-subspace = { git = "https://github.com/subspace/subspace", rev = "d504fed67492e5363b34308767d3281a0b9e21cf" }
sc-consensus-subspace = { git = "https://github.com/subspace/subspace", rev = "bd435100200b3dcce6d6f50534d52e3cd039ca8e" }
sdk-utils = { path = "../utils" }
serde = { version = "1", features = ["derive"] }
sp-blockchain = { version = "4.0.0-dev", git = "https://github.com/subspace/polkadot-sdk", rev = "c63a8b28a9fd26d42116b0dcef1f2a5cefb9cd1c" }
sp-runtime = { version = "24.0.0", git = "https://github.com/subspace/polkadot-sdk", rev = "c63a8b28a9fd26d42116b0dcef1f2a5cefb9cd1c" }
subspace-farmer = { git = "https://github.com/subspace/subspace", rev = "d504fed67492e5363b34308767d3281a0b9e21cf" }
subspace-networking = { git = "https://github.com/subspace/subspace", rev = "d504fed67492e5363b34308767d3281a0b9e21cf" }
subspace-farmer = { git = "https://github.com/subspace/subspace", rev = "bd435100200b3dcce6d6f50534d52e3cd039ca8e", default-features = false }
subspace-networking = { git = "https://github.com/subspace/subspace", rev = "bd435100200b3dcce6d6f50534d52e3cd039ca8e" }
tracing = "0.1"

[features]
Expand Down
20 changes: 12 additions & 8 deletions farmer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,27 +13,31 @@ derive_builder = "0.12"
derive_more = "0.99"
futures = "0.3"
lru = "0.11.0"
libmimalloc-sys = { version = "0.1.35", features = ["extended"] }
parking_lot = "0.12"
pin-project = "1"
rayon = "1.7.0"
sdk-traits = { path = "../traits" }
sdk-utils = { path = "../utils" }
serde = { version = "1", features = ["derive"] }
subspace-core-primitives = { git = "https://github.com/subspace/subspace", rev = "d504fed67492e5363b34308767d3281a0b9e21cf" }
subspace-erasure-coding = { git = "https://github.com/subspace/subspace", rev = "d504fed67492e5363b34308767d3281a0b9e21cf" }
subspace-farmer = { git = "https://github.com/subspace/subspace", rev = "d504fed67492e5363b34308767d3281a0b9e21cf" }
subspace-farmer-components = { git = "https://github.com/subspace/subspace", rev = "d504fed67492e5363b34308767d3281a0b9e21cf" }
subspace-networking = { git = "https://github.com/subspace/subspace", rev = "d504fed67492e5363b34308767d3281a0b9e21cf" }
subspace-proof-of-space = { git = "https://github.com/subspace/subspace", rev = "d504fed67492e5363b34308767d3281a0b9e21cf", features = ["parallel"] }
subspace-rpc-primitives = { git = "https://github.com/subspace/subspace", rev = "d504fed67492e5363b34308767d3281a0b9e21cf" }
subspace-core-primitives = { git = "https://github.com/subspace/subspace", rev = "bd435100200b3dcce6d6f50534d52e3cd039ca8e" }
subspace-erasure-coding = { git = "https://github.com/subspace/subspace", rev = "bd435100200b3dcce6d6f50534d52e3cd039ca8e" }
subspace-farmer = { git = "https://github.com/subspace/subspace", rev = "bd435100200b3dcce6d6f50534d52e3cd039ca8e", default-features = false }
subspace-farmer-components = { git = "https://github.com/subspace/subspace", rev = "bd435100200b3dcce6d6f50534d52e3cd039ca8e" }
subspace-networking = { git = "https://github.com/subspace/subspace", rev = "bd435100200b3dcce6d6f50534d52e3cd039ca8e" }
subspace-proof-of-space = { git = "https://github.com/subspace/subspace", rev = "bd435100200b3dcce6d6f50534d52e3cd039ca8e", features = ["parallel"] }
subspace-rpc-primitives = { git = "https://github.com/subspace/subspace", rev = "bd435100200b3dcce6d6f50534d52e3cd039ca8e" }
thiserror = "1"
tokio = { version = "1.34.0", features = ["fs", "rt", "tracing", "macros", "parking_lot", "rt-multi-thread", "signal"] }
tokio-stream = { version = "0.1", features = ["sync", "time"] }
tracing = "0.1"
tracing-futures = "0.2"

[features]
default = []
default = ["numa"]
numa = [
"subspace-farmer/numa",
]
integration-test = [
"sdk-utils/integration-test",
"sdk-traits/integration-test"
Expand Down
157 changes: 96 additions & 61 deletions farmer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,13 @@ use subspace_farmer::single_disk_farm::{
SingleDiskFarm, SingleDiskFarmError, SingleDiskFarmId, SingleDiskFarmInfo,
SingleDiskFarmOptions, SingleDiskFarmSummary,
};
use subspace_farmer::thread_pool_manager::PlottingThreadPoolManager;
use subspace_farmer::utils::farmer_piece_getter::FarmerPieceGetter;
use subspace_farmer::utils::piece_validator::SegmentCommitmentPieceValidator;
use subspace_farmer::utils::readers_and_pieces::ReadersAndPieces;
use subspace_farmer::utils::{
all_cpu_cores, create_plotting_thread_pool_manager, thread_pool_core_indices,
};
use subspace_farmer::{Identity, KNOWN_PEERS_CACHE_SIZE};
use subspace_farmer_components::plotting::PlottedSector;
use subspace_farmer_components::sector::{sector_size, SectorMetadataChecksummed};
Expand All @@ -43,7 +47,7 @@ use subspace_networking::utils::multihash::ToMultihash;
use subspace_networking::KnownPeersManager;
use subspace_rpc_primitives::{FarmerAppInfo, SolutionResponse};
use tokio::sync::{mpsc, oneshot, watch, Mutex, Semaphore};
use tracing::{debug, error, warn};
use tracing::{debug, error, info, warn};
use tracing_futures::Instrument;

/// Description of the farm
Expand Down Expand Up @@ -77,26 +81,10 @@ mod builder {
use sdk_traits::Node;
use sdk_utils::{ByteSize, PublicKey};
use serde::{Deserialize, Serialize};
use tracing::warn;

use super::BuildError;
use crate::{FarmDescription, Farmer};

fn available_parallelism() -> usize {
match std::thread::available_parallelism() {
Ok(parallelism) => parallelism.get(),
Err(error) => {
warn!(
%error,
"Unable to identify available parallelism, you might want to configure thread pool sizes with CLI \
options manually"
);

0
}
}
}

#[derive(
Debug,
Clone,
Expand Down Expand Up @@ -175,38 +163,36 @@ mod builder {
pub max_pieces_in_sector: Option<u16>,
/// Size of PER FARM thread pool used for farming (mostly for blocking
/// I/O, but also for some compute-intensive operations during
/// proving), defaults to number of CPU cores available in
/// the system
#[builder(default = "available_parallelism()")]
pub farming_thread_pool_size: usize,
/// Size of PER FARM thread pool used for plotting, defaults to number
/// of CPU cores available in the system.
/// proving), defaults to number of logical CPUs
/// available on UMA system and number of logical CPUs in
/// first NUMA node on NUMA system.
#[builder(default)]
pub farming_thread_pool_size: Option<NonZeroUsize>,
/// Size of one thread pool used for plotting, defaults to number of
/// logical CPUs available on UMA system and number of logical
/// CPUs available in NUMA node on NUMA system.
///
/// NOTE: The fact that this parameter is per farm doesn't mean farmer
/// will plot multiple sectors concurrently, see
/// `sector_downloading_concurrency` and
/// `sector_encoding_concurrency` options.
#[builder(default = "available_parallelism()")]
pub plotting_thread_pool_size: usize,
/// Size of PER FARM thread pool used for replotting, typically smaller
/// pool than for plotting to not affect farming as much,
/// defaults to half of the number of CPU cores available in the
/// system.
/// Number of thread pools is defined by `--sector-encoding-concurrency`
/// option, different thread pools might have different number
/// of threads if NUMA nodes do not have the same size.
///
/// NOTE: The fact that this parameter is per farm doesn't mean farmer
/// will replot multiple sectors concurrently, see
/// `sector-downloading-concurrency` and
/// `sector-encoding-concurrency` options.
#[builder(default = "available_parallelism() / 2")]
pub replotting_thread_pool_size: usize,
/// Sector downloading concurrency
#[builder(default = "NonZeroUsize::new(2).expect(\"2 > 0\")")]
#[derivative(Default(value = "NonZeroUsize::new(2).expect(\"2 > 0\")"))]
pub sector_downloading_concurrency: NonZeroUsize,
/// Sector encoding concurrency
#[builder(default = "NonZeroUsize::new(1).expect(\"1 > 0\")")]
#[derivative(Default(value = "NonZeroUsize::new(1).expect(\"1 > 0\")"))]
pub sector_encoding_concurrency: NonZeroUsize,
/// Threads will be pinned to corresponding CPU cores at creation.
#[builder(default)]
pub plotting_thread_pool_size: Option<NonZeroUsize>,
/// the plotting process, defaults to `--sector-downloading-concurrency`
/// + 1 to download future sector ahead of time
#[builder(default)]
pub sector_downloading_concurrency: Option<NonZeroUsize>,
/// Defines how many sectors farmer will encode concurrently, defaults
/// to 1 on UMA system and number of NUMA nodes on NUMA system.
/// It is further restricted by `sector_downloading_concurrency`
/// and setting this option higher than
/// `sector_downloading_concurrency` will have no effect.
#[builder(default)]
pub sector_encoding_concurrency: Option<NonZeroUsize>,
/// Threads will be pinned to corresponding CPU cores at creation.
#[builder(default)]
pub replotting_thread_pool_size: Option<NonZeroUsize>,
}

impl Builder {
Expand Down Expand Up @@ -484,8 +470,65 @@ impl Config {
};

let mut plotting_delay_senders = Vec::with_capacity(farms.len());
let downloading_semaphore = Arc::new(Semaphore::new(sector_downloading_concurrency.get()));
let encoding_semaphore = Arc::new(Semaphore::new(sector_encoding_concurrency.get()));

let plotting_thread_pool_core_indices =
thread_pool_core_indices(plotting_thread_pool_size, sector_encoding_concurrency);
let replotting_thread_pool_core_indices = {
let mut replotting_thread_pool_core_indices =
thread_pool_core_indices(replotting_thread_pool_size, sector_encoding_concurrency);
if replotting_thread_pool_size.is_none() {
// The default behavior is to use all CPU cores, but for replotting we just want
// half
replotting_thread_pool_core_indices
.iter_mut()
.for_each(|set| set.truncate(set.cpu_cores().len() / 2));
}
replotting_thread_pool_core_indices
};

let downloading_semaphore = Arc::new(Semaphore::new(
sector_downloading_concurrency
.map(|sector_downloading_concurrency| sector_downloading_concurrency.get())
.unwrap_or(plotting_thread_pool_core_indices.len() + 1),
));

let all_cpu_cores = all_cpu_cores();
let plotting_thread_pool_manager = create_plotting_thread_pool_manager(
plotting_thread_pool_core_indices.into_iter().zip(replotting_thread_pool_core_indices),
)?;
let farming_thread_pool_size = farming_thread_pool_size
.map(|farming_thread_pool_size| farming_thread_pool_size.get())
.unwrap_or_else(|| {
all_cpu_cores
.first()
.expect("Not empty according to function description; qed")
.cpu_cores()
.len()
});

if all_cpu_cores.len() > 1 {
info!(numa_nodes = %all_cpu_cores.len(), "NUMA system detected");

if all_cpu_cores.len() > farms.len() {
warn!(
numa_nodes = %all_cpu_cores.len(),
farms_count = %farms.len(),
"Too few disk farms, CPU will not be utilized fully during plotting, same number of farms as NUMA \
nodes or more is recommended"
);
}
}

// TODO: Remove code or environment variable once identified whether it helps or
// not
if std::env::var("NUMA_ALLOCATOR").is_ok() && all_cpu_cores.len() > 1 {
unsafe {
libmimalloc_sys::mi_option_set(
libmimalloc_sys::mi_option_use_numa_nodes,
all_cpu_cores.len() as std::ffi::c_long,
);
}
}

for (disk_farm_idx, description) in farms.iter().enumerate() {
let (plotting_delay_sender, plotting_delay_receiver) =
Expand All @@ -503,11 +546,9 @@ impl Config {
kzg: kzg.clone(),
erasure_coding: erasure_coding.clone(),
farming_thread_pool_size,
plotting_thread_pool_size,
replotting_thread_pool_size,
plotting_delay: Some(plotting_delay_receiver),
downloading_semaphore: Arc::clone(&downloading_semaphore),
encoding_semaphore: Arc::clone(&encoding_semaphore),
plotting_thread_pool_manager: plotting_thread_pool_manager.clone(),
})
.await?;

Expand Down Expand Up @@ -835,11 +876,9 @@ struct FarmOptions<'a, PG, N: sdk_traits::Node> {
pub erasure_coding: ErasureCoding,
pub max_pieces_in_sector: u16,
pub farming_thread_pool_size: usize,
pub plotting_thread_pool_size: usize,
pub replotting_thread_pool_size: usize,
pub plotting_delay: Option<futures::channel::oneshot::Receiver<()>>,
pub downloading_semaphore: Arc<Semaphore>,
pub encoding_semaphore: Arc<Semaphore>,
pub plotting_thread_pool_manager: PlottingThreadPoolManager,
}

impl<T: subspace_proof_of_space::Table> Farm<T> {
Expand All @@ -855,11 +894,9 @@ impl<T: subspace_proof_of_space::Table> Farm<T> {
erasure_coding,
max_pieces_in_sector,
farming_thread_pool_size,
plotting_thread_pool_size,
replotting_thread_pool_size,
plotting_delay,
downloading_semaphore,
encoding_semaphore,
plotting_thread_pool_manager,
}: FarmOptions<
'_,
impl subspace_farmer_components::plotting::PieceGetter + Clone + Send + Sync + 'static,
Expand All @@ -884,11 +921,9 @@ impl<T: subspace_proof_of_space::Table> Farm<T> {
piece_getter,
cache_percentage,
downloading_semaphore,
encoding_semaphore,
farm_during_initial_plotting: false,
farming_thread_pool_size,
plotting_thread_pool_size,
replotting_thread_pool_size,
plotting_thread_pool_manager,
plotting_delay,
};
let single_disk_farm_fut = SingleDiskFarm::new::<_, _, T>(description, disk_farm_idx);
Expand Down
Loading

0 comments on commit 000c6c7

Please sign in to comment.