From 8395c42b4e4cf4c2a98d4d5e51036d7352de9dce Mon Sep 17 00:00:00 2001 From: Jose Celano Date: Wed, 20 Mar 2024 11:15:46 +0000 Subject: [PATCH 1/3] test: [#746] disable tracker stats for profiling --- share/default/config/tracker.udp.benchmarking.toml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/share/default/config/tracker.udp.benchmarking.toml b/share/default/config/tracker.udp.benchmarking.toml index 080c67e8..70298e9d 100644 --- a/share/default/config/tracker.udp.benchmarking.toml +++ b/share/default/config/tracker.udp.benchmarking.toml @@ -9,8 +9,8 @@ min_announce_interval = 120 mode = "public" on_reverse_proxy = false persistent_torrent_completed_stat = false -remove_peerless_torrents = true -tracker_usage_statistics = true +remove_peerless_torrents = false +tracker_usage_statistics = false [[udp_trackers]] bind_address = "0.0.0.0:6969" From cc1cbc12f5acbb8f91c2814bb9110789757c8033 Mon Sep 17 00:00:00 2001 From: Jose Celano Date: Wed, 20 Mar 2024 11:21:08 +0000 Subject: [PATCH 2/3] test: [#746] add a new binary for profiling --- .gitignore | 3 +- cSpell.json | 5 + src/bin/profiling.rs | 8 ++ src/console/mod.rs | 1 + src/console/profiling.rs | 202 +++++++++++++++++++++++++++++++++++++++ 5 files changed, 218 insertions(+), 1 deletion(-) create mode 100644 src/bin/profiling.rs create mode 100644 src/console/profiling.rs diff --git a/.gitignore b/.gitignore index 2d8d0b8b..caa52754 100644 --- a/.gitignore +++ b/.gitignore @@ -3,10 +3,11 @@ /.coverage/ /.idea/ /.vscode/launch.json -/tracker.toml /data.db /database.db /database.json.bz2 /storage/ /target /tracker.* +/tracker.toml +callgrind.out \ No newline at end of file diff --git a/cSpell.json b/cSpell.json index 29751798..d15355d5 100644 --- a/cSpell.json +++ b/cSpell.json @@ -21,6 +21,7 @@ "bufs", "Buildx", "byteorder", + "callgrind", "canonicalize", "canonicalized", "certbot", @@ -35,6 +36,7 @@ "Cyberneering", "datagram", "datetime", + "debuginfo", "Deque", "Dijke", "distroless", @@ -60,6 +62,7 @@ "infoschema", "Intermodal", "intervali", + "kcachegrind", "keyout", "lcov", "leecher", @@ -134,7 +137,9 @@ "untuple", "uroot", "Vagaa", + "valgrind", "Vuze", + "Weidendorfer", "Werror", "whitespaces", "XBTT", diff --git a/src/bin/profiling.rs b/src/bin/profiling.rs new file mode 100644 index 00000000..bc1ac652 --- /dev/null +++ b/src/bin/profiling.rs @@ -0,0 +1,8 @@ +//! This binary is used for profiling with [valgrind](https://valgrind.org/) +//! and [kcachegrind](https://kcachegrind.github.io/). +use torrust_tracker::console::profiling::run; + +#[tokio::main] +async fn main() { + run().await; +} diff --git a/src/console/mod.rs b/src/console/mod.rs index 54ed8e41..dab338e4 100644 --- a/src/console/mod.rs +++ b/src/console/mod.rs @@ -1,3 +1,4 @@ //! Console apps. pub mod ci; pub mod clients; +pub mod profiling; diff --git a/src/console/profiling.rs b/src/console/profiling.rs new file mode 100644 index 00000000..e0867159 --- /dev/null +++ b/src/console/profiling.rs @@ -0,0 +1,202 @@ +//! This binary is used for profiling with [valgrind](https://valgrind.org/) +//! and [kcachegrind](https://kcachegrind.github.io/). +//! +//! # Requirements +//! +//! [valgrind](https://valgrind.org/) and [kcachegrind](https://kcachegrind.github.io/). +//! +//! On Ubuntu you can install them with: +//! +//! ```text +//! sudo apt install valgrind kcachegrind +//! ``` +//! +//! > NOTICE: valgrind executes the program you wan to profile and waits until +//! it ends. Since the tracker is a service and does not end the profiling +//! binary accepts an arguments with the duration you want to run the tracker, +//! so that it terminates automatically after that period of time. +//! +//! # Run profiling +//! +//! To run the profiling you have to: +//! +//! 1. Build and run the tracker for profiling. +//! 2. Run the aquatic UDP load test tool to start collecting data in the tracker. +//! +//! Build and run the tracker for profiling: +//! +//! ```text +//! RUSTFLAGS='-g' cargo build --release --bin profiling \ +//! && export TORRUST_TRACKER_PATH_CONFIG="./share/default/config/tracker.udp.benchmarking.toml" \ +//! && valgrind \ +//! --tool=callgrind \ +//! --callgrind-out-file=callgrind.out \ +//! --collect-jumps=yes \ +//! --simulate-cache=yes \ +//! ./target/release/profiling 60 +//! ``` +//! +//! The output should be something like: +//! +//! ```text +//! RUSTFLAGS='-g' cargo build --release --bin profiling \ +//! && export TORRUST_TRACKER_PATH_CONFIG="./share/default/config/tracker.udp.benchmarking.toml" \ +//! && valgrind \ +//! --tool=callgrind \ +//! --callgrind-out-file=callgrind.out \ +//! --collect-jumps=yes \ +//! --simulate-cache=yes \ +//! ./target/release/profiling 60 +//! +//! Compiling torrust-tracker v3.0.0-alpha.12-develop (/home/developer/Documents/git/committer/me/github/torrust/torrust-tracker) +//! Finished `release` profile [optimized + debuginfo] target(s) in 1m 15s +//! ==122801== Callgrind, a call-graph generating cache profiler +//! ==122801== Copyright (C) 2002-2017, and GNU GPL'd, by Josef Weidendorfer et al. +//! ==122801== Using Valgrind-3.19.0 and LibVEX; rerun with -h for copyright info +//! ==122801== Command: ./target/release/profiling 60 +//! ==122801== +//! --122801-- warning: L3 cache found, using its data for the LL simulation. +//! ==122801== For interactive control, run 'callgrind_control -h'. +//! Loading configuration file: `./share/default/config/tracker.udp.benchmarking.toml` ... +//! Torrust successfully shutdown. +//! ==122801== +//! ==122801== Events : Ir Dr Dw I1mr D1mr D1mw ILmr DLmr DLmw +//! ==122801== Collected : 1160654816 278135882 247755311 24453652 12650490 16315690 10932 2481624 4832145 +//! ==122801== +//! ==122801== I refs: 1,160,654,816 +//! ==122801== I1 misses: 24,453,652 +//! ==122801== LLi misses: 10,932 +//! ==122801== I1 miss rate: 2.11% +//! ==122801== LLi miss rate: 0.00% +//! ==122801== +//! ==122801== D refs: 525,891,193 (278,135,882 rd + 247,755,311 wr) +//! ==122801== D1 misses: 28,966,180 ( 12,650,490 rd + 16,315,690 wr) +//! ==122801== LLd misses: 7,313,769 ( 2,481,624 rd + 4,832,145 wr) +//! ==122801== D1 miss rate: 5.5% ( 4.5% + 6.6% ) +//! ==122801== LLd miss rate: 1.4% ( 0.9% + 2.0% ) +//! ==122801== +//! ==122801== LL refs: 53,419,832 ( 37,104,142 rd + 16,315,690 wr) +//! ==122801== LL misses: 7,324,701 ( 2,492,556 rd + 4,832,145 wr) +//! ==122801== LL miss rate: 0.4% ( 0.2% + 2.0% ) +//! ``` +//! +//! > NOTICE: We are using an specific tracker configuration for profiling that +//! removes all features except the UDP tracker and sets the logging level to `error`. +//! +//! Build the aquatic UDP load test command: +//! +//! ```text +//! cd /tmp +//! git clone git@github.com:greatest-ape/aquatic.git +//! cd aquatic +//! cargo build --profile=release-debug -p aquatic_udp_load_test +//! ./target/release-debug/aquatic_udp_load_test -p > "load-test-config.toml" +//! ``` +//! +//! Modify the "load-test-config.toml" file to change the UDP tracker port from +//! `3000` to `6969`. +//! +//! Running the aquatic UDP load test command: +//! +//! ```text +//! ./target/release-debug/aquatic_udp_load_test -c "load-test-config.toml" +//! ``` +//! +//! The output should be something like this: +//! +//! ```text +//! Starting client with config: Config { +//! server_address: 127.0.0.1:6969, +//! log_level: Error, +//! workers: 1, +//! duration: 0, +//! summarize_last: 0, +//! extra_statistics: true, +//! network: NetworkConfig { +//! multiple_client_ipv4s: true, +//! sockets_per_worker: 4, +//! recv_buffer: 8000000, +//! }, +//! requests: RequestConfig { +//! number_of_torrents: 1000000, +//! number_of_peers: 2000000, +//! scrape_max_torrents: 10, +//! announce_peers_wanted: 30, +//! weight_connect: 50, +//! weight_announce: 50, +//! weight_scrape: 1, +//! peer_seeder_probability: 0.75, +//! }, +//! } +//! +//! Requests out: 45097.51/second +//! Responses in: 4212.70/second +//! - Connect responses: 2098.15 +//! - Announce responses: 2074.95 +//! - Scrape responses: 39.59 +//! - Error responses: 0.00 +//! Peers per announce response: 0.00 +//! Announce responses per info hash: +//! - p10: 1 +//! - p25: 1 +//! - p50: 1 +//! - p75: 2 +//! - p90: 3 +//! - p95: 4 +//! - p99: 6 +//! - p99.9: 8 +//! - p100: 10 +//! ``` +//! +//! After running the tracker for some seconds the tracker will automatically stop +//! and `valgrind`will write the file `callgrind.out` with the data. +//! +//! You can now analyze the collected data with: +//! +//! ```text +//! kcachegrind callgrind.out +//! ``` +use std::env; +use std::time::Duration; + +use log::info; +use tokio::time::sleep; + +use crate::{app, bootstrap}; + +pub async fn run() { + // Parse command line arguments + let args: Vec = env::args().collect(); + + // Ensure an argument for duration is provided + if args.len() != 2 { + eprintln!("Usage: {} ", args[0]); + return; + } + + // Parse duration argument + let Ok(duration_secs) = args[1].parse::() else { + eprintln!("Invalid duration provided"); + return; + }; + + let (config, tracker) = bootstrap::app::setup(); + + let jobs = app::start(&config, tracker).await; + + // Run the tracker for a fixed duration + let run_duration = sleep(Duration::from_secs(duration_secs)); + + tokio::select! { + () = run_duration => { + info!("Torrust timed shutdown.."); + }, + _ = tokio::signal::ctrl_c() => { + info!("Torrust shutting down via Ctrl+C.."); + // Await for all jobs to shutdown + futures::future::join_all(jobs).await; + } + } + + println!("Torrust successfully shutdown."); +} From 901566873d813356ae817602e427bb74803b81ec Mon Sep 17 00:00:00 2001 From: Jose Celano Date: Wed, 20 Mar 2024 11:45:11 +0000 Subject: [PATCH 3/3] refactor: [#746] rename functions and extract named closures --- src/servers/udp/server.rs | 67 +++++++++++++++++++++------------------ 1 file changed, 36 insertions(+), 31 deletions(-) diff --git a/src/servers/udp/server.rs b/src/servers/udp/server.rs index 95c8145c..98c4bf72 100644 --- a/src/servers/udp/server.rs +++ b/src/servers/udp/server.rs @@ -255,24 +255,7 @@ impl Udp { let running = tokio::task::spawn(async move { debug!(target: "UDP TRACKER", "Started: Waiting for packets on socket address: udp://{address} ..."); - - let tracker = tracker.clone(); - let socket = socket.clone(); - - let reqs = &mut ActiveRequests::default(); - - // Main Waiting Loop, awaits on async [`receive_request`]. - loop { - if let Some(h) = reqs.rb.push_overwrite( - Self::do_request(Self::receive_request(socket.clone()).await, tracker.clone(), socket.clone()).abort_handle(), - ) { - if !h.is_finished() { - // the task is still running, lets yield and give it a chance to flush. - tokio::task::yield_now().await; - h.abort(); - } - } - } + Self::run_udp_server(tracker, socket).await; }); tx_start @@ -292,6 +275,27 @@ impl Udp { task::yield_now().await; // lets allow the other threads to complete. } + async fn run_udp_server(tracker: Arc, socket: Arc) { + let tracker = tracker.clone(); + let socket = socket.clone(); + + let reqs = &mut ActiveRequests::default(); + + // Main Waiting Loop, awaits on async [`receive_request`]. + loop { + if let Some(h) = reqs.rb.push_overwrite( + Self::spawn_request_processor(Self::receive_request(socket.clone()).await, tracker.clone(), socket.clone()) + .abort_handle(), + ) { + if !h.is_finished() { + // the task is still running, lets yield and give it a chance to flush. + tokio::task::yield_now().await; + h.abort(); + } + } + } + } + async fn receive_request(socket: Arc) -> Result> { // Wait for the socket to be readable socket.readable().await?; @@ -309,26 +313,27 @@ impl Udp { } } - fn do_request( + fn spawn_request_processor( result: Result>, tracker: Arc, socket: Arc, ) -> JoinHandle<()> { - // timeout not needed, as udp is non-blocking. - tokio::task::spawn(async move { - match result { - Ok(udp_request) => { - trace!("Received Request from: {}", udp_request.from); - Self::make_response(tracker.clone(), socket.clone(), udp_request).await; - } - Err(error) => { - debug!("error: {error}"); - } + tokio::task::spawn(Self::process_request(result, tracker, socket)) + } + + async fn process_request(result: Result>, tracker: Arc, socket: Arc) { + match result { + Ok(udp_request) => { + trace!("Received Request from: {}", udp_request.from); + Self::process_valid_request(tracker.clone(), socket.clone(), udp_request).await; } - }) + Err(error) => { + debug!("error: {error}"); + } + } } - async fn make_response(tracker: Arc, socket: Arc, udp_request: UdpRequest) { + async fn process_valid_request(tracker: Arc, socket: Arc, udp_request: UdpRequest) { trace!("Making Response to {udp_request:?}"); let from = udp_request.from; let response = handlers::handle_packet(udp_request, &tracker.clone(), socket.clone()).await;