Skip to content

Commit

Permalink
Merge #747: Profiling for the UDP tracker
Browse files Browse the repository at this point in the history
9015668 refactor: [#746] rename functions and extract named closures (Jose Celano)
cc1cbc1 test: [#746] add a new binary for profiling (Jose Celano)
8395c42 test: [#746] disable tracker stats for profiling (Jose Celano)

Pull request description:

  Profiling for the UDP tracker.

  - [x] Add a new binary for profiling.
  - [x] Minor refactorings. Rename functions and extract named closures.

ACKs for top commit:
  josecelano:
    ACK 9015668

Tree-SHA512: 3940ed11f04950da34941e45da89f43d74ce8d5bc97e2b6a11e8473b617e73cd742376c197456352ae9dfff201b72705d26866b8de4535d93c5ac8e5be73977b
  • Loading branch information
josecelano committed Mar 20, 2024
2 parents b777d3a + 9015668 commit ea33f1c
Show file tree
Hide file tree
Showing 7 changed files with 256 additions and 34 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@
/.coverage/
/.idea/
/.vscode/launch.json
/tracker.toml
/data.db
/database.db
/database.json.bz2
/storage/
/target
/tracker.*
/tracker.toml
callgrind.out
5 changes: 5 additions & 0 deletions cSpell.json
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
"bufs",
"Buildx",
"byteorder",
"callgrind",
"canonicalize",
"canonicalized",
"certbot",
Expand All @@ -35,6 +36,7 @@
"Cyberneering",
"datagram",
"datetime",
"debuginfo",
"Deque",
"Dijke",
"distroless",
Expand All @@ -60,6 +62,7 @@
"infoschema",
"Intermodal",
"intervali",
"kcachegrind",
"keyout",
"lcov",
"leecher",
Expand Down Expand Up @@ -134,7 +137,9 @@
"untuple",
"uroot",
"Vagaa",
"valgrind",
"Vuze",
"Weidendorfer",
"Werror",
"whitespaces",
"XBTT",
Expand Down
4 changes: 2 additions & 2 deletions share/default/config/tracker.udp.benchmarking.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ min_announce_interval = 120
mode = "public"
on_reverse_proxy = false
persistent_torrent_completed_stat = false
remove_peerless_torrents = true
tracker_usage_statistics = true
remove_peerless_torrents = false
tracker_usage_statistics = false

[[udp_trackers]]
bind_address = "0.0.0.0:6969"
Expand Down
8 changes: 8 additions & 0 deletions src/bin/profiling.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
//! This binary is used for profiling with [valgrind](https://valgrind.org/)
//! and [kcachegrind](https://kcachegrind.github.io/).
use torrust_tracker::console::profiling::run;

#[tokio::main]
async fn main() {
run().await;
}
1 change: 1 addition & 0 deletions src/console/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
//! Console apps.
pub mod ci;
pub mod clients;
pub mod profiling;
202 changes: 202 additions & 0 deletions src/console/profiling.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
//! This binary is used for profiling with [valgrind](https://valgrind.org/)
//! and [kcachegrind](https://kcachegrind.github.io/).
//!
//! # Requirements
//!
//! [valgrind](https://valgrind.org/) and [kcachegrind](https://kcachegrind.github.io/).
//!
//! On Ubuntu you can install them with:
//!
//! ```text
//! sudo apt install valgrind kcachegrind
//! ```
//!
//! > NOTICE: valgrind executes the program you wan to profile and waits until
//! it ends. Since the tracker is a service and does not end the profiling
//! binary accepts an arguments with the duration you want to run the tracker,
//! so that it terminates automatically after that period of time.
//!
//! # Run profiling
//!
//! To run the profiling you have to:
//!
//! 1. Build and run the tracker for profiling.
//! 2. Run the aquatic UDP load test tool to start collecting data in the tracker.
//!
//! Build and run the tracker for profiling:
//!
//! ```text
//! RUSTFLAGS='-g' cargo build --release --bin profiling \
//! && export TORRUST_TRACKER_PATH_CONFIG="./share/default/config/tracker.udp.benchmarking.toml" \
//! && valgrind \
//! --tool=callgrind \
//! --callgrind-out-file=callgrind.out \
//! --collect-jumps=yes \
//! --simulate-cache=yes \
//! ./target/release/profiling 60
//! ```
//!
//! The output should be something like:
//!
//! ```text
//! RUSTFLAGS='-g' cargo build --release --bin profiling \
//! && export TORRUST_TRACKER_PATH_CONFIG="./share/default/config/tracker.udp.benchmarking.toml" \
//! && valgrind \
//! --tool=callgrind \
//! --callgrind-out-file=callgrind.out \
//! --collect-jumps=yes \
//! --simulate-cache=yes \
//! ./target/release/profiling 60
//!
//! Compiling torrust-tracker v3.0.0-alpha.12-develop (/home/developer/Documents/git/committer/me/github/torrust/torrust-tracker)
//! Finished `release` profile [optimized + debuginfo] target(s) in 1m 15s
//! ==122801== Callgrind, a call-graph generating cache profiler
//! ==122801== Copyright (C) 2002-2017, and GNU GPL'd, by Josef Weidendorfer et al.
//! ==122801== Using Valgrind-3.19.0 and LibVEX; rerun with -h for copyright info
//! ==122801== Command: ./target/release/profiling 60
//! ==122801==
//! --122801-- warning: L3 cache found, using its data for the LL simulation.
//! ==122801== For interactive control, run 'callgrind_control -h'.
//! Loading configuration file: `./share/default/config/tracker.udp.benchmarking.toml` ...
//! Torrust successfully shutdown.
//! ==122801==
//! ==122801== Events : Ir Dr Dw I1mr D1mr D1mw ILmr DLmr DLmw
//! ==122801== Collected : 1160654816 278135882 247755311 24453652 12650490 16315690 10932 2481624 4832145
//! ==122801==
//! ==122801== I refs: 1,160,654,816
//! ==122801== I1 misses: 24,453,652
//! ==122801== LLi misses: 10,932
//! ==122801== I1 miss rate: 2.11%
//! ==122801== LLi miss rate: 0.00%
//! ==122801==
//! ==122801== D refs: 525,891,193 (278,135,882 rd + 247,755,311 wr)
//! ==122801== D1 misses: 28,966,180 ( 12,650,490 rd + 16,315,690 wr)
//! ==122801== LLd misses: 7,313,769 ( 2,481,624 rd + 4,832,145 wr)
//! ==122801== D1 miss rate: 5.5% ( 4.5% + 6.6% )
//! ==122801== LLd miss rate: 1.4% ( 0.9% + 2.0% )
//! ==122801==
//! ==122801== LL refs: 53,419,832 ( 37,104,142 rd + 16,315,690 wr)
//! ==122801== LL misses: 7,324,701 ( 2,492,556 rd + 4,832,145 wr)
//! ==122801== LL miss rate: 0.4% ( 0.2% + 2.0% )
//! ```
//!
//! > NOTICE: We are using an specific tracker configuration for profiling that
//! removes all features except the UDP tracker and sets the logging level to `error`.
//!
//! Build the aquatic UDP load test command:
//!
//! ```text
//! cd /tmp
//! git clone git@github.com:greatest-ape/aquatic.git
//! cd aquatic
//! cargo build --profile=release-debug -p aquatic_udp_load_test
//! ./target/release-debug/aquatic_udp_load_test -p > "load-test-config.toml"
//! ```
//!
//! Modify the "load-test-config.toml" file to change the UDP tracker port from
//! `3000` to `6969`.
//!
//! Running the aquatic UDP load test command:
//!
//! ```text
//! ./target/release-debug/aquatic_udp_load_test -c "load-test-config.toml"
//! ```
//!
//! The output should be something like this:
//!
//! ```text
//! Starting client with config: Config {
//! server_address: 127.0.0.1:6969,
//! log_level: Error,
//! workers: 1,
//! duration: 0,
//! summarize_last: 0,
//! extra_statistics: true,
//! network: NetworkConfig {
//! multiple_client_ipv4s: true,
//! sockets_per_worker: 4,
//! recv_buffer: 8000000,
//! },
//! requests: RequestConfig {
//! number_of_torrents: 1000000,
//! number_of_peers: 2000000,
//! scrape_max_torrents: 10,
//! announce_peers_wanted: 30,
//! weight_connect: 50,
//! weight_announce: 50,
//! weight_scrape: 1,
//! peer_seeder_probability: 0.75,
//! },
//! }
//!
//! Requests out: 45097.51/second
//! Responses in: 4212.70/second
//! - Connect responses: 2098.15
//! - Announce responses: 2074.95
//! - Scrape responses: 39.59
//! - Error responses: 0.00
//! Peers per announce response: 0.00
//! Announce responses per info hash:
//! - p10: 1
//! - p25: 1
//! - p50: 1
//! - p75: 2
//! - p90: 3
//! - p95: 4
//! - p99: 6
//! - p99.9: 8
//! - p100: 10
//! ```
//!
//! After running the tracker for some seconds the tracker will automatically stop
//! and `valgrind`will write the file `callgrind.out` with the data.
//!
//! You can now analyze the collected data with:
//!
//! ```text
//! kcachegrind callgrind.out
//! ```
use std::env;
use std::time::Duration;

use log::info;
use tokio::time::sleep;

use crate::{app, bootstrap};

pub async fn run() {
// Parse command line arguments
let args: Vec<String> = env::args().collect();

// Ensure an argument for duration is provided
if args.len() != 2 {
eprintln!("Usage: {} <duration_in_seconds>", args[0]);
return;
}

// Parse duration argument
let Ok(duration_secs) = args[1].parse::<u64>() else {
eprintln!("Invalid duration provided");
return;
};

let (config, tracker) = bootstrap::app::setup();

let jobs = app::start(&config, tracker).await;

// Run the tracker for a fixed duration
let run_duration = sleep(Duration::from_secs(duration_secs));

tokio::select! {
() = run_duration => {
info!("Torrust timed shutdown..");
},
_ = tokio::signal::ctrl_c() => {
info!("Torrust shutting down via Ctrl+C..");
// Await for all jobs to shutdown
futures::future::join_all(jobs).await;
}
}

println!("Torrust successfully shutdown.");
}
67 changes: 36 additions & 31 deletions src/servers/udp/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,24 +255,7 @@ impl Udp {

let running = tokio::task::spawn(async move {
debug!(target: "UDP TRACKER", "Started: Waiting for packets on socket address: udp://{address} ...");

let tracker = tracker.clone();
let socket = socket.clone();

let reqs = &mut ActiveRequests::default();

// Main Waiting Loop, awaits on async [`receive_request`].
loop {
if let Some(h) = reqs.rb.push_overwrite(
Self::do_request(Self::receive_request(socket.clone()).await, tracker.clone(), socket.clone()).abort_handle(),
) {
if !h.is_finished() {
// the task is still running, lets yield and give it a chance to flush.
tokio::task::yield_now().await;
h.abort();
}
}
}
Self::run_udp_server(tracker, socket).await;
});

tx_start
Expand All @@ -292,6 +275,27 @@ impl Udp {
task::yield_now().await; // lets allow the other threads to complete.
}

async fn run_udp_server(tracker: Arc<Tracker>, socket: Arc<UdpSocket>) {
let tracker = tracker.clone();
let socket = socket.clone();

let reqs = &mut ActiveRequests::default();

// Main Waiting Loop, awaits on async [`receive_request`].
loop {
if let Some(h) = reqs.rb.push_overwrite(
Self::spawn_request_processor(Self::receive_request(socket.clone()).await, tracker.clone(), socket.clone())
.abort_handle(),
) {
if !h.is_finished() {
// the task is still running, lets yield and give it a chance to flush.
tokio::task::yield_now().await;
h.abort();
}
}
}
}

async fn receive_request(socket: Arc<UdpSocket>) -> Result<UdpRequest, Box<std::io::Error>> {
// Wait for the socket to be readable
socket.readable().await?;
Expand All @@ -309,26 +313,27 @@ impl Udp {
}
}

fn do_request(
fn spawn_request_processor(
result: Result<UdpRequest, Box<std::io::Error>>,
tracker: Arc<Tracker>,
socket: Arc<UdpSocket>,
) -> JoinHandle<()> {
// timeout not needed, as udp is non-blocking.
tokio::task::spawn(async move {
match result {
Ok(udp_request) => {
trace!("Received Request from: {}", udp_request.from);
Self::make_response(tracker.clone(), socket.clone(), udp_request).await;
}
Err(error) => {
debug!("error: {error}");
}
tokio::task::spawn(Self::process_request(result, tracker, socket))
}

async fn process_request(result: Result<UdpRequest, Box<std::io::Error>>, tracker: Arc<Tracker>, socket: Arc<UdpSocket>) {
match result {
Ok(udp_request) => {
trace!("Received Request from: {}", udp_request.from);
Self::process_valid_request(tracker.clone(), socket.clone(), udp_request).await;
}
})
Err(error) => {
debug!("error: {error}");
}
}
}

async fn make_response(tracker: Arc<Tracker>, socket: Arc<UdpSocket>, udp_request: UdpRequest) {
async fn process_valid_request(tracker: Arc<Tracker>, socket: Arc<UdpSocket>, udp_request: UdpRequest) {
trace!("Making Response to {udp_request:?}");
let from = udp_request.from;
let response = handlers::handle_packet(udp_request, &tracker.clone(), socket.clone()).await;
Expand Down

0 comments on commit ea33f1c

Please sign in to comment.