Skip to content

Commit

Permalink
Merge pull request #161 from greatest-ape/auto-load-tester
Browse files Browse the repository at this point in the history
Add aquatic_load_tester: multi-run multi-implementation load tests
  • Loading branch information
greatest-ape authored Dec 25, 2023
2 parents c7997d5 + afc3deb commit 5870b53
Show file tree
Hide file tree
Showing 18 changed files with 1,664 additions and 310 deletions.
427 changes: 327 additions & 100 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ members = [
"crates/http",
"crates/http_load_test",
"crates/http_protocol",
"crates/load_tester",
"crates/peer_id",
"crates/toml_config",
"crates/toml_config_derive",
Expand Down Expand Up @@ -35,6 +36,7 @@ aquatic_toml_config = { version = "0.8.0", path = "./crates/toml_config" }
aquatic_toml_config_derive = { version = "0.8.0", path = "./crates/toml_config_derive" }
aquatic_udp_protocol = { version = "0.8.0", path = "./crates/udp_protocol" }
aquatic_udp = { version = "0.8.0", path = "./crates/udp" }
aquatic_udp_load_test = { version = "0.8.0", path = "./crates/udp_load_test" }
aquatic_ws_protocol = { version = "0.8.0", path = "./crates/ws_protocol" }
aquatic_ws = { version = "0.8.0", path = "./crates/ws" }

Expand Down
2 changes: 1 addition & 1 deletion crates/common/src/access_list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ impl AccessListMode {
}
}

#[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize)]
#[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize, Serialize)]
#[serde(default, deny_unknown_fields)]
pub struct AccessListConfig {
pub mode: AccessListMode,
Expand Down
2 changes: 1 addition & 1 deletion crates/common/src/cpu_pinning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ pub mod mod_name {
use super::*;

/// Experimental cpu pinning
#[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize)]
#[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize, Serialize)]
pub struct struct_name {
pub active: bool,
pub direction: CpuPinningDirection,
Expand Down
4 changes: 2 additions & 2 deletions crates/common/src/privileges.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@ use std::{

use anyhow::Context;
use privdrop::PrivDrop;
use serde::Deserialize;
use serde::{Deserialize, Serialize};

use aquatic_toml_config::TomlConfig;

#[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize)]
#[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize, Serialize)]
#[serde(default, deny_unknown_fields)]
pub struct PrivilegeConfig {
/// Chroot and switch group and user after binding to sockets
Expand Down
35 changes: 35 additions & 0 deletions crates/load_tester/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
[package]
name = "aquatic_load_tester"
description = "Load test runner for aquatic BitTorrent tracker"
keywords = ["peer-to-peer", "torrent", "bittorrent"]
version.workspace = true
authors.workspace = true
edition.workspace = true
license.workspace = true
repository.workspace = true
rust-version.workspace = true
readme.workspace = true

[[bin]]
name = "aquatic_load_tester"

[features]
default = ["udp"]
udp = ["aquatic_udp", "aquatic_udp_load_test"]

[dependencies]
aquatic_udp = { optional = true, workspace = true }
aquatic_udp_load_test = { optional = true, workspace = true }

anyhow = "1"
clap = { version = "4", features = ["derive"] }
indexmap = "2"
itertools = "0.12"
nonblock = "0.2"
once_cell = "1"
regex = "1"
serde = "1"
tempfile = "3"
toml = "0.8"

[dev-dependencies]
258 changes: 258 additions & 0 deletions crates/load_tester/src/common.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,258 @@
use std::{fmt::Display, ops::Range, thread::available_parallelism};

use itertools::Itertools;

#[derive(Debug, Clone)]
pub struct TaskSetCpuList(pub Vec<TaskSetCpuIndicator>);

impl TaskSetCpuList {
pub fn as_cpu_list(&self) -> String {
let indicator = self.0.iter().map(|indicator| match indicator {
TaskSetCpuIndicator::Single(i) => i.to_string(),
TaskSetCpuIndicator::Range(range) => {
format!(
"{}-{}",
range.start,
range.clone().into_iter().last().unwrap()
)
}
});

Itertools::intersperse_with(indicator, || ",".to_string())
.into_iter()
.collect()
}

pub fn new(
mode: CpuMode,
direction: CpuDirection,
requested_cpus: usize,
) -> anyhow::Result<Self> {
let available_parallelism: usize = available_parallelism()?.into();

Ok(Self::new_with_available_parallelism(
available_parallelism,
mode,
direction,
requested_cpus,
))
}

fn new_with_available_parallelism(
available_parallelism: usize,
mode: CpuMode,
direction: CpuDirection,
requested_cpus: usize,
) -> Self {
match direction {
CpuDirection::Asc => match mode {
CpuMode::Split => {
let middle = available_parallelism / 2;

let range_a = 0..(middle.min(requested_cpus));
let range_b = middle..(available_parallelism.min(middle + requested_cpus));

Self(vec![
range_a.try_into().unwrap(),
range_b.try_into().unwrap(),
])
}
CpuMode::All => {
let range = 0..(available_parallelism.min(requested_cpus));

Self(vec![range.try_into().unwrap()])
}
},
CpuDirection::Desc => match mode {
CpuMode::Split => {
let middle = available_parallelism / 2;

let range_a = middle.saturating_sub(requested_cpus)..middle;
let range_b = available_parallelism
.saturating_sub(requested_cpus)
.max(middle)..available_parallelism;

Self(vec![
range_a.try_into().unwrap(),
range_b.try_into().unwrap(),
])
}
CpuMode::All => {
let range =
available_parallelism.saturating_sub(requested_cpus)..available_parallelism;

Self(vec![range.try_into().unwrap()])
}
},
}
}
}

impl TryFrom<Vec<Range<usize>>> for TaskSetCpuList {
type Error = String;

fn try_from(value: Vec<Range<usize>>) -> Result<Self, Self::Error> {
let mut output = Vec::new();

for range in value {
output.push(range.try_into()?);
}

Ok(Self(output))
}
}

#[derive(Debug, Clone)]
pub enum TaskSetCpuIndicator {
Single(usize),
Range(Range<usize>),
}

impl TryFrom<Range<usize>> for TaskSetCpuIndicator {
type Error = String;

fn try_from(value: Range<usize>) -> Result<Self, Self::Error> {
match value.len() {
0 => Err("Empty ranges not supported".into()),
1 => Ok(TaskSetCpuIndicator::Single(value.start)),
_ => Ok(TaskSetCpuIndicator::Range(value)),
}
}
}

#[derive(Debug, Clone, Copy, clap::ValueEnum)]
pub enum CpuMode {
Split,
All,
}

impl Display for CpuMode {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::All => f.write_str("all"),
Self::Split => f.write_str("split"),
}
}
}

#[derive(Debug, Clone, Copy)]
pub enum CpuDirection {
Asc,
Desc,
}

pub fn simple_load_test_runs(cpu_mode: CpuMode, workers: &[usize]) -> Vec<(usize, TaskSetCpuList)> {
workers
.into_iter()
.copied()
.map(|workers| {
(
workers,
TaskSetCpuList::new(cpu_mode, CpuDirection::Desc, workers).unwrap(),
)
})
.collect()
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_task_set_cpu_list_split_asc() {
let f = TaskSetCpuList::new_with_available_parallelism;

assert_eq!(
f(8, CpuMode::Split, CpuDirection::Asc, 1).as_cpu_list(),
"0,4"
);
assert_eq!(
f(8, CpuMode::Split, CpuDirection::Asc, 2).as_cpu_list(),
"0-1,4-5"
);
assert_eq!(
f(8, CpuMode::Split, CpuDirection::Asc, 4).as_cpu_list(),
"0-3,4-7"
);
assert_eq!(
f(8, CpuMode::Split, CpuDirection::Asc, 8).as_cpu_list(),
"0-3,4-7"
);
assert_eq!(
f(8, CpuMode::Split, CpuDirection::Asc, 9).as_cpu_list(),
"0-3,4-7"
);
}

#[test]
fn test_task_set_cpu_list_split_desc() {
let f = TaskSetCpuList::new_with_available_parallelism;

assert_eq!(
f(8, CpuMode::Split, CpuDirection::Desc, 1).as_cpu_list(),
"3,7"
);
assert_eq!(
f(8, CpuMode::Split, CpuDirection::Desc, 2).as_cpu_list(),
"2-3,6-7"
);
assert_eq!(
f(8, CpuMode::Split, CpuDirection::Desc, 4).as_cpu_list(),
"0-3,4-7"
);
assert_eq!(
f(8, CpuMode::Split, CpuDirection::Desc, 8).as_cpu_list(),
"0-3,4-7"
);
assert_eq!(
f(8, CpuMode::Split, CpuDirection::Desc, 9).as_cpu_list(),
"0-3,4-7"
);
}

#[test]
fn test_task_set_cpu_list_all_asc() {
let f = TaskSetCpuList::new_with_available_parallelism;

assert_eq!(f(8, CpuMode::All, CpuDirection::Asc, 1).as_cpu_list(), "0");
assert_eq!(
f(8, CpuMode::All, CpuDirection::Asc, 2).as_cpu_list(),
"0-1"
);
assert_eq!(
f(8, CpuMode::All, CpuDirection::Asc, 4).as_cpu_list(),
"0-3"
);
assert_eq!(
f(8, CpuMode::All, CpuDirection::Asc, 8).as_cpu_list(),
"0-7"
);
assert_eq!(
f(8, CpuMode::All, CpuDirection::Asc, 9).as_cpu_list(),
"0-7"
);
}

#[test]
fn test_task_set_cpu_list_all_desc() {
let f = TaskSetCpuList::new_with_available_parallelism;

assert_eq!(f(8, CpuMode::All, CpuDirection::Desc, 1).as_cpu_list(), "7");
assert_eq!(
f(8, CpuMode::All, CpuDirection::Desc, 2).as_cpu_list(),
"6-7"
);
assert_eq!(
f(8, CpuMode::All, CpuDirection::Desc, 4).as_cpu_list(),
"4-7"
);
assert_eq!(
f(8, CpuMode::All, CpuDirection::Desc, 8).as_cpu_list(),
"0-7"
);
assert_eq!(
f(8, CpuMode::All, CpuDirection::Desc, 9).as_cpu_list(),
"0-7"
);
}
}
28 changes: 28 additions & 0 deletions crates/load_tester/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
pub mod common;
pub mod protocols;
pub mod run;
pub mod set;

use clap::{Parser, Subcommand};

#[derive(Parser)]
#[command(author, version, about)]
struct Args {
#[command(subcommand)]
command: Command,
}

#[derive(Subcommand)]
enum Command {
#[cfg(feature = "udp")]
Udp(protocols::udp::UdpCommand),
}

fn main() {
let args = Args::parse();

match args.command {
#[cfg(feature = "udp")]
Command::Udp(command) => command.run().unwrap(),
}
}
2 changes: 2 additions & 0 deletions crates/load_tester/src/protocols/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
#[cfg(feature = "udp")]
pub mod udp;
Loading

0 comments on commit 5870b53

Please sign in to comment.