diff --git a/CHANGELOG.md b/CHANGELOG.md index 2ed43774..cf4ab65e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,28 +18,28 @@ #### Changed +* Switch from socket worker/swarm worker division to a single type of worker, + for performance reasons. Several config file keys were removed since they + are no longer needed. * Index peers by packet source IP and provided port, instead of by peer_id. This prevents users from impersonating others and is likely also slightly faster for IPv4 peers. -* Remove support for unbounded worker channels -* Add backpressure in socket workers. They will postpone reading from the - socket if sending a request to a swarm worker failed * Avoid a heap allocation for torrents with two or less peers. This can save a lot of memory if many torrents are tracked * Improve announce performance by avoiding having to filter response peers * In announce response statistics, don't include announcing peer -* Distribute announce responses from swarm workers over socket workers to - decrease performance loss due to underutilized threads * Harden ConnectionValidator to make IP spoofing even more costly * Remove config key `network.poll_event_capacity` (always use 1) * Speed up parsing and serialization of requests and responses by using [zerocopy](https://crates.io/crates/zerocopy) * Report socket worker related prometheus stats per worker +* Remove CPU pinning support #### Fixed * Quit whole application if any worker thread quits * Disallow announce requests with port value of 0 +* Fix io_uring UB issues ### aquatic_http diff --git a/Cargo.lock b/Cargo.lock index bc446f5e..b82edb27 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -315,6 +315,7 @@ dependencies = [ "mimalloc", "mio", "num-format", + "parking_lot", "quickcheck", "quickcheck_macros", "rand", @@ -2001,6 +2002,29 @@ version = "2.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bb813b8af86854136c6922af0598d719255ecb2179515e6e7730d468f05c9cae" +[[package]] +name = "parking_lot" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3742b2c103b9f06bc9fff0a37ff4912935851bee6d36f3c02bcc755bcfec228f" +dependencies = [ + "lock_api", + "parking_lot_core", +] + +[[package]] +name = "parking_lot_core" +version = "0.9.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4c42a9226546d68acdd9c0a280d17ce19bfe27a46bf68784e4066115788d008e" +dependencies = [ + "cfg-if", + "libc", + "redox_syscall", + "smallvec", + "windows-targets 0.48.5", +] + [[package]] name = "percent-encoding" version = "2.3.1" @@ -2269,6 +2293,15 @@ dependencies = [ "crossbeam-utils", ] +[[package]] +name = "redox_syscall" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4722d768eff46b75989dd134e5c353f0d6296e5aaa3132e776cbdb56be7731aa" +dependencies = [ + "bitflags 1.3.2", +] + [[package]] name = "ref-cast" version = "1.0.22" diff --git a/README.md b/README.md index 9621f8c1..b1446aec 100644 --- a/README.md +++ b/README.md @@ -31,19 +31,15 @@ Known users: ## Performance of the UDP implementation -![UDP BitTorrent tracker throughput comparison](./documents/aquatic-udp-load-test-illustration-2023-01-11.png) +![UDP BitTorrent tracker throughput](./documents/aquatic-udp-load-test-2024-02-10.png) -More benchmark details are available [here](./documents/aquatic-udp-load-test-2023-01-11.pdf). +More benchmark details are available [here](./documents/aquatic-udp-load-test-2024-02-10.md). ## Usage Please refer to the README pages for the respective implementations listed in the table above. -## Architectural overview - -![Architectural overview of aquatic](./documents/aquatic-architecture-2024.svg) - ## Copyright and license Copyright (c) Joakim Frostegård diff --git a/crates/bencher/src/protocols/udp.rs b/crates/bencher/src/protocols/udp.rs index e60d730c..700dea6b 100644 --- a/crates/bencher/src/protocols/udp.rs +++ b/crates/bencher/src/protocols/udp.rs @@ -58,6 +58,12 @@ impl UdpCommand { indexmap::indexmap! { 1 => SetConfig { implementations: indexmap! { + UdpTracker::Aquatic => vec![ + AquaticUdpRunner::with_mio(1, Priority::High), + ], + UdpTracker::AquaticIoUring => vec![ + AquaticUdpRunner::with_io_uring(1, Priority::High), + ], UdpTracker::OpenTracker => vec![ OpenTrackerUdpRunner::new(0, Priority::Medium), // Handle requests within event loop OpenTrackerUdpRunner::new(1, Priority::High), @@ -74,16 +80,13 @@ impl UdpCommand { 2 => SetConfig { implementations: indexmap! { UdpTracker::Aquatic => vec![ - AquaticUdpRunner::with_mio(1, 1, Priority::Medium), - AquaticUdpRunner::with_mio(2, 1, Priority::High), + AquaticUdpRunner::with_mio(2, Priority::High), ], UdpTracker::AquaticIoUring => vec![ - AquaticUdpRunner::with_io_uring(1, 1, Priority::Medium), - AquaticUdpRunner::with_io_uring(2, 1, Priority::High), + AquaticUdpRunner::with_io_uring(2, Priority::High), ], UdpTracker::OpenTracker => vec![ OpenTrackerUdpRunner::new(2, Priority::High), - OpenTrackerUdpRunner::new(4, Priority::Medium), ], UdpTracker::Chihaya => vec![ ChihayaUdpRunner::new(), @@ -97,12 +100,10 @@ impl UdpCommand { 4 => SetConfig { implementations: indexmap! { UdpTracker::Aquatic => vec![ - AquaticUdpRunner::with_mio(3, 1, Priority::High), - AquaticUdpRunner::with_mio(4, 1, Priority::Medium), + AquaticUdpRunner::with_mio(4, Priority::High), ], UdpTracker::AquaticIoUring => vec![ - AquaticUdpRunner::with_io_uring(3, 1, Priority::High), - AquaticUdpRunner::with_io_uring(4, 1, Priority::Medium), + AquaticUdpRunner::with_io_uring(4, Priority::High), ], UdpTracker::OpenTracker => vec![ OpenTrackerUdpRunner::new(4, Priority::High), @@ -119,10 +120,10 @@ impl UdpCommand { 6 => SetConfig { implementations: indexmap! { UdpTracker::Aquatic => vec![ - AquaticUdpRunner::with_mio(5, 1, Priority::High), + AquaticUdpRunner::with_mio(6, Priority::High), ], UdpTracker::AquaticIoUring => vec![ - AquaticUdpRunner::with_io_uring(5, 1, Priority::High), + AquaticUdpRunner::with_io_uring(6, Priority::High), ], UdpTracker::OpenTracker => vec![ OpenTrackerUdpRunner::new(6, Priority::High), @@ -139,10 +140,10 @@ impl UdpCommand { 8 => SetConfig { implementations: indexmap! { UdpTracker::Aquatic => vec![ - AquaticUdpRunner::with_mio(7, 1, Priority::High), + AquaticUdpRunner::with_mio(8, Priority::High), ], UdpTracker::AquaticIoUring => vec![ - AquaticUdpRunner::with_io_uring(7, 1, Priority::High), + AquaticUdpRunner::with_io_uring(8, Priority::High), ], UdpTracker::OpenTracker => vec![ OpenTrackerUdpRunner::new(8, Priority::High), @@ -159,12 +160,10 @@ impl UdpCommand { 12 => SetConfig { implementations: indexmap! { UdpTracker::Aquatic => vec![ - AquaticUdpRunner::with_mio(10, 2, Priority::High), - AquaticUdpRunner::with_mio(9, 3, Priority::Medium), + AquaticUdpRunner::with_mio(12, Priority::High), ], UdpTracker::AquaticIoUring => vec![ - AquaticUdpRunner::with_io_uring(10, 2, Priority::High), - AquaticUdpRunner::with_io_uring(9, 3, Priority::Medium), + AquaticUdpRunner::with_io_uring(12, Priority::High), ], UdpTracker::OpenTracker => vec![ OpenTrackerUdpRunner::new(12, Priority::High), @@ -181,10 +180,10 @@ impl UdpCommand { 16 => SetConfig { implementations: indexmap! { UdpTracker::Aquatic => vec![ - AquaticUdpRunner::with_mio(13, 3, Priority::High), + AquaticUdpRunner::with_mio(16, Priority::High), ], UdpTracker::AquaticIoUring => vec![ - AquaticUdpRunner::with_io_uring(13, 3, Priority::High), + AquaticUdpRunner::with_io_uring(16, Priority::High), ], UdpTracker::OpenTracker => vec![ OpenTrackerUdpRunner::new(16, Priority::High), @@ -211,7 +210,6 @@ impl UdpCommand { #[derive(Debug, Clone)] struct AquaticUdpRunner { socket_workers: usize, - swarm_workers: usize, use_io_uring: bool, priority: Priority, } @@ -219,24 +217,20 @@ struct AquaticUdpRunner { impl AquaticUdpRunner { fn with_mio( socket_workers: usize, - swarm_workers: usize, priority: Priority, ) -> Rc> { Rc::new(Self { socket_workers, - swarm_workers, use_io_uring: false, priority, }) } fn with_io_uring( socket_workers: usize, - swarm_workers: usize, priority: Priority, ) -> Rc> { Rc::new(Self { socket_workers, - swarm_workers, use_io_uring: true, priority, }) @@ -256,7 +250,6 @@ impl ProcessRunner for AquaticUdpRunner { let mut c = aquatic_udp::config::Config::default(); c.socket_workers = self.socket_workers; - c.swarm_workers = self.swarm_workers; c.network.address = SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 3000)); c.network.use_io_uring = self.use_io_uring; c.protocol.max_response_peers = 30; @@ -283,7 +276,6 @@ impl ProcessRunner for AquaticUdpRunner { fn keys(&self) -> IndexMap { indexmap! { "socket workers".to_string() => self.socket_workers.to_string(), - "swarm workers".to_string() => self.swarm_workers.to_string(), } } } diff --git a/crates/common/src/lib.rs b/crates/common/src/lib.rs index ea5b9285..fbd99b9b 100644 --- a/crates/common/src/lib.rs +++ b/crates/common/src/lib.rs @@ -163,6 +163,7 @@ pub enum WorkerType { Socket(usize), Statistics, Signals, + Cleaning, #[cfg(feature = "prometheus")] Prometheus, } @@ -174,6 +175,7 @@ impl Display for WorkerType { Self::Socket(index) => f.write_fmt(format_args!("Socket worker {}", index + 1)), Self::Statistics => f.write_str("Statistics worker"), Self::Signals => f.write_str("Signals worker"), + Self::Cleaning => f.write_str("Cleaning worker"), #[cfg(feature = "prometheus")] Self::Prometheus => f.write_str("Prometheus worker"), } diff --git a/crates/http/README.md b/crates/http/README.md index 36f8c4e3..219792f9 100644 --- a/crates/http/README.md +++ b/crates/http/README.md @@ -109,6 +109,10 @@ Implements: `aquatic_http` has not been tested as much as `aquatic_udp`, but likely works fine in production. +## Architectural overview + +![Architectural overview of aquatic](../../documents/aquatic-architecture-2024.svg) + ## Copyright and license Copyright (c) Joakim Frostegård diff --git a/crates/udp/Cargo.toml b/crates/udp/Cargo.toml index 72c1f923..47cfa06e 100644 --- a/crates/udp/Cargo.toml +++ b/crates/udp/Cargo.toml @@ -48,6 +48,7 @@ log = "0.4" mimalloc = { version = "0.1", default-features = false } mio = { version = "0.8", features = ["net", "os-poll"] } num-format = "0.4" +parking_lot = "0.12" rand = { version = "0.8", features = ["small_rng"] } serde = { version = "1", features = ["derive"] } signal-hook = { version = "0.3" } diff --git a/crates/udp/README.md b/crates/udp/README.md index 60a5c2d1..d8abbf93 100644 --- a/crates/udp/README.md +++ b/crates/udp/README.md @@ -21,9 +21,9 @@ This is the most mature implementation in the aquatic family. I consider it full ## Performance -![UDP BitTorrent tracker throughput comparison](../../documents/aquatic-udp-load-test-illustration-2023-01-11.png) +![UDP BitTorrent tracker throughput](../../documents/aquatic-udp-load-test-2024-02-10.png) -More benchmark details are available [here](../../documents/aquatic-udp-load-test-2023-01-11.pdf). +More benchmark details are available [here](../../documents/aquatic-udp-load-test-2024-02-10.md). ## Usage diff --git a/crates/udp/src/common.rs b/crates/udp/src/common.rs index 4dd3496f..dce58cb7 100644 --- a/crates/udp/src/common.rs +++ b/crates/udp/src/common.rs @@ -1,18 +1,15 @@ -use std::collections::BTreeMap; -use std::hash::Hash; use std::iter::repeat_with; use std::sync::atomic::AtomicUsize; use std::sync::Arc; -use crossbeam_channel::{Receiver, SendError, Sender, TrySendError}; - use aquatic_common::access_list::AccessListArcSwap; -use aquatic_common::{CanonicalSocketAddr, ServerStartInstant}; +use aquatic_common::ServerStartInstant; use aquatic_udp_protocol::*; use crossbeam_utils::CachePadded; use hdrhistogram::Histogram; use crate::config::Config; +use crate::swarm::TorrentMaps; pub const BUFFER_SIZE: usize = 8192; @@ -32,145 +29,10 @@ impl IpVersion { } } -#[derive(Clone, Copy, Debug)] -pub struct SocketWorkerIndex(pub usize); - -#[derive(Clone, Copy, PartialEq, Eq, Hash, Debug)] -pub struct SwarmWorkerIndex(pub usize); - -impl SwarmWorkerIndex { - pub fn from_info_hash(config: &Config, info_hash: InfoHash) -> Self { - Self(info_hash.0[0] as usize % config.swarm_workers) - } -} - -#[derive(Debug)] -pub struct PendingScrapeRequest { - pub slab_key: usize, - pub info_hashes: BTreeMap, -} - -#[derive(Debug)] -pub struct PendingScrapeResponse { - pub slab_key: usize, - pub torrent_stats: BTreeMap, -} - -#[derive(Debug)] -pub enum ConnectedRequest { - Announce(AnnounceRequest), - Scrape(PendingScrapeRequest), -} - -#[derive(Debug)] -pub enum ConnectedResponse { - AnnounceIpv4(AnnounceResponse), - AnnounceIpv6(AnnounceResponse), - Scrape(PendingScrapeResponse), -} - -pub struct ConnectedRequestSender { - index: SocketWorkerIndex, - senders: Vec>, -} - -impl ConnectedRequestSender { - pub fn new( - index: SocketWorkerIndex, - senders: Vec>, - ) -> Self { - Self { index, senders } - } - - pub fn try_send_to( - &self, - index: SwarmWorkerIndex, - request: ConnectedRequest, - addr: CanonicalSocketAddr, - ) -> Result<(), (SwarmWorkerIndex, ConnectedRequest, CanonicalSocketAddr)> { - match self.senders[index.0].try_send((self.index, request, addr)) { - Ok(()) => Ok(()), - Err(TrySendError::Full(r)) => Err((index, r.1, r.2)), - Err(TrySendError::Disconnected(_)) => { - panic!("Request channel {} is disconnected", index.0); - } - } - } -} - -pub struct ConnectedResponseSender { - senders: Vec>, - to_any_last_index_picked: usize, -} - -impl ConnectedResponseSender { - pub fn new(senders: Vec>) -> Self { - Self { - senders, - to_any_last_index_picked: 0, - } - } - - pub fn try_send_to( - &self, - index: SocketWorkerIndex, - addr: CanonicalSocketAddr, - response: ConnectedResponse, - ) -> Result<(), TrySendError<(CanonicalSocketAddr, ConnectedResponse)>> { - self.senders[index.0].try_send((addr, response)) - } - - pub fn send_to( - &self, - index: SocketWorkerIndex, - addr: CanonicalSocketAddr, - response: ConnectedResponse, - ) -> Result<(), SendError<(CanonicalSocketAddr, ConnectedResponse)>> { - self.senders[index.0].send((addr, response)) - } - - pub fn send_to_any( - &mut self, - addr: CanonicalSocketAddr, - response: ConnectedResponse, - ) -> Result<(), SendError<(CanonicalSocketAddr, ConnectedResponse)>> { - let start = self.to_any_last_index_picked + 1; - - let mut message = Some((addr, response)); - - for i in (start..start + self.senders.len()).map(|i| i % self.senders.len()) { - match self.senders[i].try_send(message.take().unwrap()) { - Ok(()) => { - self.to_any_last_index_picked = i; - - return Ok(()); - } - Err(TrySendError::Full(msg)) => { - message = Some(msg); - } - Err(TrySendError::Disconnected(_)) => { - panic!("ConnectedResponseReceiver disconnected"); - } - } - } - - let (addr, response) = message.unwrap(); - - self.to_any_last_index_picked = start % self.senders.len(); - self.send_to( - SocketWorkerIndex(self.to_any_last_index_picked), - addr, - response, - ) - } -} - -pub type ConnectedResponseReceiver = Receiver<(CanonicalSocketAddr, ConnectedResponse)>; - #[derive(Clone)] pub struct Statistics { pub socket: Vec>>, - pub swarm: Vec>>, + pub swarm: CachePaddedArc>, } impl Statistics { @@ -179,9 +41,7 @@ impl Statistics { socket: repeat_with(Default::default) .take(config.socket_workers) .collect(), - swarm: repeat_with(Default::default) - .take(config.swarm_workers) - .collect(), + swarm: Default::default(), } } } @@ -230,6 +90,7 @@ pub enum StatisticsMessage { #[derive(Clone)] pub struct State { pub access_list: Arc, + pub torrent_maps: TorrentMaps, pub server_start_instant: ServerStartInstant, } @@ -237,6 +98,7 @@ impl Default for State { fn default() -> Self { Self { access_list: Arc::new(AccessListArcSwap::default()), + torrent_maps: TorrentMaps::default(), server_start_instant: ServerStartInstant::new(), } } diff --git a/crates/udp/src/config.rs b/crates/udp/src/config.rs index df83279e..ae97a4f0 100644 --- a/crates/udp/src/config.rs +++ b/crates/udp/src/config.rs @@ -11,36 +11,16 @@ use aquatic_toml_config::TomlConfig; #[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize, Serialize)] #[serde(default, deny_unknown_fields)] pub struct Config { - /// Number of socket worker. One per physical core is recommended. + /// Number of socket workers /// - /// Socket workers receive requests from clients and parse them. - /// Responses to connect requests are sent back immediately. Announce and - /// scrape requests are passed on to swarm workers, which generate - /// responses and send them back to the socket worker, which sends them - /// to the client. + /// 0 = automatically set to number of available virtual CPUs pub socket_workers: usize, - /// Number of swarm workers. One is enough in almost all cases - /// - /// Swarm workers receive parsed announce and scrape requests from socket - /// workers, generate responses and send them back to the socket workers. - pub swarm_workers: usize, pub log_level: LogLevel, - /// Maximum number of items in each channel passing requests/responses - /// between workers. A value of zero is no longer allowed. - pub worker_channel_size: usize, - /// How long to block waiting for requests in swarm workers. - /// - /// Higher values means that with zero traffic, the worker will not - /// unnecessarily cause the CPU to wake up as often. However, high values - /// (something like larger than 1000) combined with very low traffic can - /// cause delays in torrent cleaning. - pub request_channel_recv_timeout_ms: u64, pub network: NetworkConfig, pub protocol: ProtocolConfig, pub statistics: StatisticsConfig, pub cleaning: CleaningConfig, pub privileges: PrivilegeConfig, - /// Access list configuration /// /// The file is read on start and when the program receives `SIGUSR1`. If @@ -48,26 +28,19 @@ pub struct Config { /// emitting of an error-level log message, while successful updates of the /// access list result in emitting of an info-level log message. pub access_list: AccessListConfig, - #[cfg(feature = "cpu-pinning")] - pub cpu_pinning: aquatic_common::cpu_pinning::asc::CpuPinningConfigAsc, } impl Default for Config { fn default() -> Self { Self { socket_workers: 1, - swarm_workers: 1, log_level: LogLevel::Error, - worker_channel_size: 1_024, - request_channel_recv_timeout_ms: 100, network: NetworkConfig::default(), protocol: ProtocolConfig::default(), statistics: StatisticsConfig::default(), cleaning: CleaningConfig::default(), privileges: PrivilegeConfig::default(), access_list: AccessListConfig::default(), - #[cfg(feature = "cpu-pinning")] - cpu_pinning: Default::default(), } } } @@ -100,13 +73,6 @@ pub struct NetworkConfig { pub socket_recv_buffer_size: usize, /// Poll timeout in milliseconds (mio backend only) pub poll_timeout_ms: u64, - #[cfg(feature = "io-uring")] - pub use_io_uring: bool, - /// Number of ring entries (io_uring backend only) - /// - /// Will be rounded to next power of two if not already one. - #[cfg(feature = "io-uring")] - pub ring_size: u16, /// Store this many responses at most for retrying (once) on send failure /// (mio backend only) /// @@ -114,6 +80,13 @@ pub struct NetworkConfig { /// such as FreeBSD. Setting the value to zero disables resending /// functionality. pub resend_buffer_max_len: usize, + #[cfg(feature = "io-uring")] + pub use_io_uring: bool, + /// Number of ring entries (io_uring backend only) + /// + /// Will be rounded to next power of two if not already one. + #[cfg(feature = "io-uring")] + pub ring_size: u16, } impl NetworkConfig { @@ -132,11 +105,11 @@ impl Default for NetworkConfig { only_ipv6: false, socket_recv_buffer_size: 8_000_000, poll_timeout_ms: 50, + resend_buffer_max_len: 0, #[cfg(feature = "io-uring")] use_io_uring: true, #[cfg(feature = "io-uring")] ring_size: 128, - resend_buffer_max_len: 0, } } } @@ -239,28 +212,18 @@ impl Default for StatisticsConfig { pub struct CleaningConfig { /// Clean torrents this often (seconds) pub torrent_cleaning_interval: u64, - /// Clean pending scrape responses this often (seconds) - /// - /// In regular operation, there should be no pending scrape responses - /// lingering for long enough to have to be cleaned up this way. - pub pending_scrape_cleaning_interval: u64, /// Allow clients to use a connection token for this long (seconds) pub max_connection_age: u32, /// Remove peers who have not announced for this long (seconds) pub max_peer_age: u32, - /// Remove pending scrape responses that have not been returned from swarm - /// workers for this long (seconds) - pub max_pending_scrape_age: u32, } impl Default for CleaningConfig { fn default() -> Self { Self { torrent_cleaning_interval: 60 * 2, - pending_scrape_cleaning_interval: 60 * 10, max_connection_age: 60 * 2, max_peer_age: 60 * 20, - max_pending_scrape_age: 60, } } } diff --git a/crates/udp/src/lib.rs b/crates/udp/src/lib.rs index cf6a588c..b1215c1f 100644 --- a/crates/udp/src/lib.rs +++ b/crates/udp/src/lib.rs @@ -1,130 +1,62 @@ pub mod common; pub mod config; +pub mod swarm; pub mod workers; -use std::collections::BTreeMap; -use std::thread::{sleep, Builder, JoinHandle}; +use std::thread::{available_parallelism, sleep, Builder, JoinHandle}; use std::time::Duration; use anyhow::Context; use aquatic_common::WorkerType; -use crossbeam_channel::{bounded, unbounded}; +use crossbeam_channel::unbounded; use signal_hook::consts::SIGUSR1; use signal_hook::iterator::Signals; use aquatic_common::access_list::update_access_list; -#[cfg(feature = "cpu-pinning")] -use aquatic_common::cpu_pinning::{pin_current_if_configured_to, WorkerIndex}; use aquatic_common::privileges::PrivilegeDropper; -use common::{ - ConnectedRequestSender, ConnectedResponseSender, SocketWorkerIndex, State, Statistics, - SwarmWorkerIndex, -}; +use common::{State, Statistics}; use config::Config; use workers::socket::ConnectionValidator; -use workers::swarm::SwarmWorker; pub const APP_NAME: &str = "aquatic_udp: UDP BitTorrent tracker"; pub const APP_VERSION: &str = env!("CARGO_PKG_VERSION"); -pub fn run(config: Config) -> ::anyhow::Result<()> { +pub fn run(mut config: Config) -> ::anyhow::Result<()> { let mut signals = Signals::new([SIGUSR1])?; + if config.socket_workers == 0 { + config.socket_workers = available_parallelism().map(Into::into).unwrap_or(1); + }; + let state = State::default(); let statistics = Statistics::new(&config); let connection_validator = ConnectionValidator::new(&config)?; let priv_dropper = PrivilegeDropper::new(config.privileges.clone(), config.socket_workers); - let mut join_handles = Vec::new(); - - update_access_list(&config.access_list, &state.access_list)?; - - let mut request_senders = Vec::new(); - let mut request_receivers = BTreeMap::new(); - - let mut response_senders = Vec::new(); - let mut response_receivers = BTreeMap::new(); - let (statistics_sender, statistics_receiver) = unbounded(); - for i in 0..config.swarm_workers { - let (request_sender, request_receiver) = bounded(config.worker_channel_size); - - request_senders.push(request_sender); - request_receivers.insert(i, request_receiver); - } - - for i in 0..config.socket_workers { - let (response_sender, response_receiver) = bounded(config.worker_channel_size); - - response_senders.push(response_sender); - response_receivers.insert(i, response_receiver); - } - - for i in 0..config.swarm_workers { - let config = config.clone(); - let state = state.clone(); - let request_receiver = request_receivers.remove(&i).unwrap().clone(); - let response_sender = ConnectedResponseSender::new(response_senders.clone()); - let statistics_sender = statistics_sender.clone(); - let statistics = statistics.swarm[i].clone(); - - let handle = Builder::new() - .name(format!("swarm-{:02}", i + 1)) - .spawn(move || { - #[cfg(feature = "cpu-pinning")] - pin_current_if_configured_to( - &config.cpu_pinning, - config.socket_workers, - config.swarm_workers, - WorkerIndex::SwarmWorker(i), - ); - - let mut worker = SwarmWorker { - config, - state, - statistics, - request_receiver, - response_sender, - statistics_sender, - worker_index: SwarmWorkerIndex(i), - }; - - worker.run() - }) - .with_context(|| "spawn swarm worker")?; + update_access_list(&config.access_list, &state.access_list)?; - join_handles.push((WorkerType::Swarm(i), handle)); - } + let mut join_handles = Vec::new(); + // Spawn socket worker threads for i in 0..config.socket_workers { let state = state.clone(); let config = config.clone(); let connection_validator = connection_validator.clone(); - let request_sender = - ConnectedRequestSender::new(SocketWorkerIndex(i), request_senders.clone()); - let response_receiver = response_receivers.remove(&i).unwrap(); let priv_dropper = priv_dropper.clone(); let statistics = statistics.socket[i].clone(); + let statistics_sender = statistics_sender.clone(); let handle = Builder::new() .name(format!("socket-{:02}", i + 1)) .spawn(move || { - #[cfg(feature = "cpu-pinning")] - pin_current_if_configured_to( - &config.cpu_pinning, - config.socket_workers, - config.swarm_workers, - WorkerIndex::SocketWorker(i), - ); - workers::socket::run_socket_worker( config, state, statistics, + statistics_sender, connection_validator, - request_sender, - response_receiver, priv_dropper, ) }) @@ -133,6 +65,31 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { join_handles.push((WorkerType::Socket(i), handle)); } + // Spawn cleaning thread + { + let state = state.clone(); + let config = config.clone(); + let statistics = statistics.swarm.clone(); + let statistics_sender = statistics_sender.clone(); + + let handle = Builder::new().name("cleaning".into()).spawn(move || loop { + sleep(Duration::from_secs( + config.cleaning.torrent_cleaning_interval, + )); + + state.torrent_maps.clean_and_update_statistics( + &config, + &statistics, + &statistics_sender, + &state.access_list, + state.server_start_instant, + ); + })?; + + join_handles.push((WorkerType::Cleaning, handle)); + } + + // Spawn statistics thread if config.statistics.active() { let state = state.clone(); let config = config.clone(); @@ -140,14 +97,6 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { let handle = Builder::new() .name("statistics".into()) .spawn(move || { - #[cfg(feature = "cpu-pinning")] - pin_current_if_configured_to( - &config.cpu_pinning, - config.socket_workers, - config.swarm_workers, - WorkerIndex::Util, - ); - workers::statistics::run_statistics_worker( config, state, @@ -160,6 +109,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { join_handles.push((WorkerType::Statistics, handle)); } + // Spawn prometheus endpoint thread #[cfg(feature = "prometheus")] if config.statistics.active() && config.statistics.run_prometheus_endpoint { let handle = aquatic_common::spawn_prometheus_endpoint( @@ -180,14 +130,6 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { let handle: JoinHandle> = Builder::new() .name("signals".into()) .spawn(move || { - #[cfg(feature = "cpu-pinning")] - pin_current_if_configured_to( - &config.cpu_pinning, - config.socket_workers, - config.swarm_workers, - WorkerIndex::Util, - ); - for signal in &mut signals { match signal { SIGUSR1 => { @@ -204,14 +146,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { join_handles.push((WorkerType::Signals, handle)); } - #[cfg(feature = "cpu-pinning")] - pin_current_if_configured_to( - &config.cpu_pinning, - config.socket_workers, - config.swarm_workers, - WorkerIndex::Util, - ); - + // Quit application if any worker returns or panics loop { for (i, (_, handle)) in join_handles.iter().enumerate() { if handle.is_finished() { diff --git a/crates/udp/src/workers/swarm/storage.rs b/crates/udp/src/swarm.rs similarity index 58% rename from crates/udp/src/workers/swarm/storage.rs rename to crates/udp/src/swarm.rs index 3b042eac..2d422f93 100644 --- a/crates/udp/src/workers/swarm/storage.rs +++ b/crates/udp/src/swarm.rs @@ -1,17 +1,24 @@ +use std::iter::repeat_with; +use std::net::IpAddr; +use std::ops::DerefMut; +use std::sync::atomic::AtomicBool; use std::sync::atomic::Ordering; use std::sync::Arc; -use aquatic_common::IndexMap; use aquatic_common::SecondsSinceServerStart; +use aquatic_common::ServerStartInstant; use aquatic_common::{ access_list::{create_access_list_cache, AccessListArcSwap, AccessListCache, AccessListMode}, ValidUntil, }; +use aquatic_common::{CanonicalSocketAddr, IndexMap}; use aquatic_udp_protocol::*; use arrayvec::ArrayVec; use crossbeam_channel::Sender; +use hashbrown::HashMap; use hdrhistogram::Histogram; +use parking_lot::RwLockUpgradableReadGuard; use rand::prelude::SmallRng; use rand::Rng; @@ -20,51 +27,108 @@ use crate::config::Config; const SMALL_PEER_MAP_CAPACITY: usize = 2; +use aquatic_udp_protocol::InfoHash; +use parking_lot::RwLock; + +#[derive(Clone)] pub struct TorrentMaps { - pub ipv4: TorrentMap, - pub ipv6: TorrentMap, + ipv4: TorrentMapShards, + ipv6: TorrentMapShards, } impl Default for TorrentMaps { fn default() -> Self { + const NUM_SHARDS: usize = 16; + Self { - ipv4: TorrentMap(Default::default()), - ipv6: TorrentMap(Default::default()), + ipv4: TorrentMapShards::new(NUM_SHARDS), + ipv6: TorrentMapShards::new(NUM_SHARDS), } } } impl TorrentMaps { + pub fn announce( + &self, + config: &Config, + statistics_sender: &Sender, + rng: &mut SmallRng, + request: &AnnounceRequest, + src: CanonicalSocketAddr, + valid_until: ValidUntil, + ) -> Response { + match src.get().ip() { + IpAddr::V4(ip_address) => Response::AnnounceIpv4(self.ipv4.announce( + config, + statistics_sender, + rng, + request, + ip_address.into(), + valid_until, + )), + IpAddr::V6(ip_address) => Response::AnnounceIpv6(self.ipv6.announce( + config, + statistics_sender, + rng, + request, + ip_address.into(), + valid_until, + )), + } + } + + pub fn scrape(&self, request: ScrapeRequest, src: CanonicalSocketAddr) -> ScrapeResponse { + if src.is_ipv4() { + self.ipv4.scrape(request) + } else { + self.ipv6.scrape(request) + } + } + /// Remove forbidden or inactive torrents, reclaim space and update statistics pub fn clean_and_update_statistics( - &mut self, + &self, config: &Config, - state: &State, statistics: &CachePaddedArc>, statistics_sender: &Sender, access_list: &Arc, + server_start_instant: ServerStartInstant, ) { let mut cache = create_access_list_cache(access_list); let mode = config.access_list.mode; - let now = state.server_start_instant.seconds_elapsed(); - - let ipv4 = - self.ipv4 - .clean_and_get_statistics(config, statistics_sender, &mut cache, mode, now); - let ipv6 = - self.ipv6 - .clean_and_get_statistics(config, statistics_sender, &mut cache, mode, now); + let now = server_start_instant.seconds_elapsed(); + + let mut statistics_messages = Vec::new(); + + let ipv4 = self.ipv4.clean_and_get_statistics( + config, + &mut statistics_messages, + &mut cache, + mode, + now, + ); + let ipv6 = self.ipv6.clean_and_get_statistics( + config, + &mut statistics_messages, + &mut cache, + mode, + now, + ); if config.statistics.active() { - statistics.ipv4.peers.store(ipv4.0, Ordering::Relaxed); - statistics.ipv6.peers.store(ipv6.0, Ordering::Relaxed); + statistics.ipv4.torrents.store(ipv4.0, Ordering::Relaxed); + statistics.ipv6.torrents.store(ipv6.0, Ordering::Relaxed); + statistics.ipv4.peers.store(ipv4.1, Ordering::Relaxed); + statistics.ipv6.peers.store(ipv6.1, Ordering::Relaxed); - if let Some(message) = ipv4.1.map(StatisticsMessage::Ipv4PeerHistogram) { - if let Err(err) = statistics_sender.try_send(message) { - ::log::error!("couldn't send statistics message: {:#}", err); - } + if let Some(message) = ipv4.2 { + statistics_messages.push(StatisticsMessage::Ipv4PeerHistogram(message)); } - if let Some(message) = ipv6.1.map(StatisticsMessage::Ipv6PeerHistogram) { + if let Some(message) = ipv6.2 { + statistics_messages.push(StatisticsMessage::Ipv6PeerHistogram(message)); + } + + for message in statistics_messages { if let Err(err) = statistics_sender.try_send(message) { ::log::error!("couldn't send statistics message: {:#}", err); } @@ -73,116 +137,200 @@ impl TorrentMaps { } } -#[derive(Default)] -pub struct TorrentMap(pub IndexMap>); - -impl TorrentMap { - pub fn scrape(&mut self, request: PendingScrapeRequest) -> PendingScrapeResponse { - let torrent_stats = request - .info_hashes - .into_iter() - .map(|(i, info_hash)| { - let stats = self - .0 - .get(&info_hash) - .map(|torrent_data| torrent_data.scrape_statistics()) - .unwrap_or_else(|| TorrentScrapeStatistics { - seeders: NumberOfPeers::new(0), - leechers: NumberOfPeers::new(0), - completed: NumberOfDownloads::new(0), - }); - - (i, stats) - }) - .collect(); - - PendingScrapeResponse { - slab_key: request.slab_key, - torrent_stats, +#[derive(Clone)] +pub struct TorrentMapShards(Arc<[RwLock>]>); + +impl TorrentMapShards { + fn new(num_shards: usize) -> Self { + Self( + repeat_with(Default::default) + .take(num_shards) + .collect::>() + .into_boxed_slice() + .into(), + ) + } + + fn announce( + &self, + config: &Config, + statistics_sender: &Sender, + rng: &mut SmallRng, + request: &AnnounceRequest, + ip_address: I, + valid_until: ValidUntil, + ) -> AnnounceResponse { + let torrent_data = { + let torrent_map_shard = self.get_shard(&request.info_hash).upgradable_read(); + + // Clone Arc here to avoid keeping lock on whole shard + if let Some(torrent_data) = torrent_map_shard.get(&request.info_hash) { + torrent_data.clone() + } else { + // Don't overwrite entry if created in the meantime + RwLockUpgradableReadGuard::upgrade(torrent_map_shard) + .entry(request.info_hash) + .or_default() + .clone() + } + }; + + let mut peer_map = torrent_data.peer_map.write(); + + peer_map.announce( + config, + statistics_sender, + rng, + request, + ip_address, + valid_until, + ) + } + + fn scrape(&self, request: ScrapeRequest) -> ScrapeResponse { + let mut response = ScrapeResponse { + transaction_id: request.transaction_id, + torrent_stats: Vec::with_capacity(request.info_hashes.len()), + }; + + for info_hash in request.info_hashes { + let torrent_map_shard = self.get_shard(&info_hash); + + let statistics = if let Some(torrent_data) = torrent_map_shard.read().get(&info_hash) { + torrent_data.peer_map.read().scrape_statistics() + } else { + TorrentScrapeStatistics { + seeders: NumberOfPeers::new(0), + leechers: NumberOfPeers::new(0), + completed: NumberOfDownloads::new(0), + } + }; + + response.torrent_stats.push(statistics); } + + response } - /// Remove forbidden or inactive torrents, reclaim space and return number of remaining peers + fn clean_and_get_statistics( - &mut self, + &self, config: &Config, - statistics_sender: &Sender, + statistics_messages: &mut Vec, access_list_cache: &mut AccessListCache, access_list_mode: AccessListMode, now: SecondsSinceServerStart, - ) -> (usize, Option>) { + ) -> (usize, usize, Option>) { + let mut total_num_torrents = 0; let mut total_num_peers = 0; - let mut opt_histogram: Option> = if config.statistics.torrent_peer_histograms - { - match Histogram::new(3) { - Ok(histogram) => Some(histogram), - Err(err) => { - ::log::error!("Couldn't create peer histogram: {:#}", err); + let mut opt_histogram: Option> = config + .statistics + .torrent_peer_histograms + .then(|| Histogram::new(3).expect("create peer histogram")); - None - } - } - } else { - None - }; + for torrent_map_shard in self.0.iter() { + for torrent_data in torrent_map_shard.read().values() { + let mut peer_map = torrent_data.peer_map.write(); - self.0.retain(|info_hash, torrent| { - if !access_list_cache - .load() - .allows(access_list_mode, &info_hash.0) - { - return false; - } + let num_peers = match peer_map.deref_mut() { + PeerMap::Small(small_peer_map) => { + small_peer_map.clean_and_get_num_peers(config, statistics_messages, now) + } + PeerMap::Large(large_peer_map) => { + let num_peers = large_peer_map.clean_and_get_num_peers( + config, + statistics_messages, + now, + ); + + if let Some(small_peer_map) = large_peer_map.try_shrink() { + *peer_map = PeerMap::Small(small_peer_map); + } + + num_peers + } + }; - let num_peers = match torrent { - TorrentData::Small(peer_map) => { - peer_map.clean_and_get_num_peers(config, statistics_sender, now) - } - TorrentData::Large(peer_map) => { - let num_peers = - peer_map.clean_and_get_num_peers(config, statistics_sender, now); + drop(peer_map); - if let Some(peer_map) = peer_map.try_shrink() { - *torrent = TorrentData::Small(peer_map); + match opt_histogram.as_mut() { + Some(histogram) if num_peers > 0 => { + if let Err(err) = histogram.record(num_peers as u64) { + ::log::error!("Couldn't record {} to histogram: {:#}", num_peers, err); + } } - - num_peers + _ => (), } - }; - total_num_peers += num_peers; + total_num_peers += num_peers; + + torrent_data + .pending_removal + .store(num_peers == 0, Ordering::Release); + } - match opt_histogram { - Some(ref mut histogram) if num_peers > 0 => { - let n = num_peers.try_into().expect("Couldn't fit usize into u64"); + let mut torrent_map_shard = torrent_map_shard.write(); - if let Err(err) = histogram.record(n) { - ::log::error!("Couldn't record {} to histogram: {:#}", n, err); - } + torrent_map_shard.retain(|info_hash, torrent_data| { + if !access_list_cache + .load() + .allows(access_list_mode, &info_hash.0) + { + return false; } - _ => (), - } - num_peers > 0 - }); + // Check pending_removal flag set in previous cleaning step. This + // prevents us from removing TorrentData entries that were just + // added but do not yet contain any peers. Also double-check that + // no peers have been added since we last checked. + if torrent_data + .pending_removal + .fetch_and(false, Ordering::Acquire) + && torrent_data.peer_map.read().is_empty() + { + return false; + } + + true + }); + + torrent_map_shard.shrink_to_fit(); - self.0.shrink_to_fit(); + total_num_torrents += torrent_map_shard.len(); + } - (total_num_peers, opt_histogram) + (total_num_torrents, total_num_peers, opt_histogram) } - pub fn num_torrents(&self) -> usize { - self.0.len() + fn get_shard(&self, info_hash: &InfoHash) -> &RwLock> { + self.0.get(info_hash.0[0] as usize % self.0.len()).unwrap() + } +} + +/// Use HashMap instead of IndexMap for better lookup performance +type TorrentMapShard = HashMap>>; + +pub struct TorrentData { + peer_map: RwLock>, + pending_removal: AtomicBool, +} + +impl Default for TorrentData { + fn default() -> Self { + Self { + peer_map: Default::default(), + pending_removal: Default::default(), + } } } -pub enum TorrentData { +pub enum PeerMap { Small(SmallPeerMap), Large(LargePeerMap), } -impl TorrentData { - pub fn announce( +impl PeerMap { + fn announce( &mut self, config: &Config, statistics_sender: &Sender, @@ -298,7 +446,7 @@ impl TorrentData { response } - pub fn scrape_statistics(&self) -> TorrentScrapeStatistics { + fn scrape_statistics(&self) -> TorrentScrapeStatistics { let (seeders, leechers) = match self { Self::Small(peer_map) => peer_map.num_seeders_leechers(), Self::Large(peer_map) => peer_map.num_seeders_leechers(), @@ -310,9 +458,16 @@ impl TorrentData { completed: NumberOfDownloads::new(0), } } + + fn is_empty(&self) -> bool { + match self { + Self::Small(peer_map) => peer_map.0.is_empty(), + Self::Large(peer_map) => peer_map.peers.is_empty(), + } + } } -impl Default for TorrentData { +impl Default for PeerMap { fn default() -> Self { Self::Small(SmallPeerMap(ArrayVec::default())) } @@ -357,20 +512,14 @@ impl SmallPeerMap { fn clean_and_get_num_peers( &mut self, config: &Config, - statistics_sender: &Sender, + statistics_messages: &mut Vec, now: SecondsSinceServerStart, ) -> usize { self.0.retain(|(_, peer)| { let keep = peer.valid_until.valid(now); - if !keep - && config.statistics.peer_clients - && statistics_sender - .try_send(StatisticsMessage::PeerRemoved(peer.peer_id)) - .is_err() - { - // Should never happen in practice - ::log::error!("Couldn't send StatisticsMessage::PeerRemoved"); + if !keep && config.statistics.peer_clients { + statistics_messages.push(StatisticsMessage::PeerRemoved(peer.peer_id)); } keep @@ -421,12 +570,11 @@ impl LargePeerMap { /// Extract response peers /// - /// If there are more peers in map than `max_num_peers_to_take`, do a random - /// selection of peers from first and second halves of map in order to avoid - /// returning too homogeneous peers. - /// - /// Does NOT filter out announcing peer. - pub fn extract_response_peers( + /// If there are more peers in map than `max_num_peers_to_take`, do a + /// random selection of peers from first and second halves of map in + /// order to avoid returning too homogeneous peers. This is a lot more + /// cache-friendly than doing a fully random selection. + fn extract_response_peers( &self, rng: &mut impl Rng, max_num_peers_to_take: usize, @@ -456,10 +604,10 @@ impl LargePeerMap { let mut peers = Vec::with_capacity(max_num_peers_to_take); if let Some(slice) = self.peers.get_range(offset_half_one..end_half_one) { - peers.extend(slice.keys()); + peers.extend(slice.keys().copied()); } if let Some(slice) = self.peers.get_range(offset_half_two..end_half_two) { - peers.extend(slice.keys()); + peers.extend(slice.keys().copied()); } peers @@ -469,7 +617,7 @@ impl LargePeerMap { fn clean_and_get_num_peers( &mut self, config: &Config, - statistics_sender: &Sender, + statistics_messages: &mut Vec, now: SecondsSinceServerStart, ) -> usize { self.peers.retain(|_, peer| { @@ -479,13 +627,8 @@ impl LargePeerMap { if peer.is_seeder { self.num_seeders -= 1; } - if config.statistics.peer_clients - && statistics_sender - .try_send(StatisticsMessage::PeerRemoved(peer.peer_id)) - .is_err() - { - // Should never happen in practice - ::log::error!("Couldn't send StatisticsMessage::PeerRemoved"); + if config.statistics.peer_clients { + statistics_messages.push(StatisticsMessage::PeerRemoved(peer.peer_id)); } } diff --git a/crates/udp/src/workers/mod.rs b/crates/udp/src/workers/mod.rs index 5446a1f6..02af829e 100644 --- a/crates/udp/src/workers/mod.rs +++ b/crates/udp/src/workers/mod.rs @@ -1,3 +1,2 @@ pub mod socket; pub mod statistics; -pub mod swarm; diff --git a/crates/udp/src/workers/socket/mio.rs b/crates/udp/src/workers/socket/mio.rs index 26ab5be8..a73a35eb 100644 --- a/crates/udp/src/workers/socket/mio.rs +++ b/crates/udp/src/workers/socket/mio.rs @@ -1,9 +1,10 @@ use std::io::{Cursor, ErrorKind}; use std::sync::atomic::Ordering; -use std::time::{Duration, Instant}; +use std::time::Duration; use anyhow::Context; use aquatic_common::access_list::AccessListCache; +use crossbeam_channel::Sender; use mio::net::UdpSocket; use mio::{Events, Interest, Poll, Token}; @@ -12,40 +13,26 @@ use aquatic_common::{ ValidUntil, }; use aquatic_udp_protocol::*; +use rand::rngs::SmallRng; +use rand::SeedableRng; use crate::common::*; use crate::config::Config; -use super::storage::PendingScrapeResponseSlab; use super::validator::ConnectionValidator; use super::{create_socket, EXTRA_PACKET_SIZE_IPV4, EXTRA_PACKET_SIZE_IPV6}; -enum HandleRequestError { - RequestChannelFull(Vec<(SwarmWorkerIndex, ConnectedRequest, CanonicalSocketAddr)>), -} - -#[derive(Clone, Copy, Debug)] -enum PollMode { - Regular, - SkipPolling, - SkipReceiving, -} - pub struct SocketWorker { config: Config, shared_state: State, statistics: CachePaddedArc>, - request_sender: ConnectedRequestSender, - response_receiver: ConnectedResponseReceiver, + statistics_sender: Sender, access_list_cache: AccessListCache, validator: ConnectionValidator, - pending_scrape_responses: PendingScrapeResponseSlab, socket: UdpSocket, - opt_resend_buffer: Option>, buffer: [u8; BUFFER_SIZE], - polling_mode: PollMode, - /// Storage for requests that couldn't be sent to swarm worker because channel was full - pending_requests: Vec<(SwarmWorkerIndex, ConnectedRequest, CanonicalSocketAddr)>, + rng: SmallRng, + peer_valid_until: ValidUntil, } impl SocketWorker { @@ -53,35 +40,36 @@ impl SocketWorker { config: Config, shared_state: State, statistics: CachePaddedArc>, + statistics_sender: Sender, validator: ConnectionValidator, - request_sender: ConnectedRequestSender, - response_receiver: ConnectedResponseReceiver, priv_dropper: PrivilegeDropper, ) -> anyhow::Result<()> { let socket = UdpSocket::from_std(create_socket(&config, priv_dropper)?); let access_list_cache = create_access_list_cache(&shared_state.access_list); - let opt_resend_buffer = (config.network.resend_buffer_max_len > 0).then_some(Vec::new()); + let peer_valid_until = ValidUntil::new( + shared_state.server_start_instant, + config.cleaning.max_peer_age, + ); let mut worker = Self { config, shared_state, statistics, + statistics_sender, validator, - request_sender, - response_receiver, access_list_cache, - pending_scrape_responses: Default::default(), socket, - opt_resend_buffer, buffer: [0; BUFFER_SIZE], - polling_mode: PollMode::Regular, - pending_requests: Default::default(), + rng: SmallRng::from_entropy(), + peer_valid_until, }; worker.run_inner() } pub fn run_inner(&mut self) -> anyhow::Result<()> { + let mut opt_resend_buffer = + (self.config.network.resend_buffer_max_len > 0).then_some(Vec::new()); let mut events = Events::with_capacity(1); let mut poll = Poll::new().context("create poll")?; @@ -91,94 +79,41 @@ impl SocketWorker { let poll_timeout = Duration::from_millis(self.config.network.poll_timeout_ms); - let pending_scrape_cleaning_duration = - Duration::from_secs(self.config.cleaning.pending_scrape_cleaning_interval); - - let mut pending_scrape_valid_until = ValidUntil::new( - self.shared_state.server_start_instant, - self.config.cleaning.max_pending_scrape_age, - ); - let mut last_pending_scrape_cleaning = Instant::now(); - - let mut iter_counter = 0usize; + let mut iter_counter = 0u64; loop { - match self.polling_mode { - PollMode::Regular => { - poll.poll(&mut events, Some(poll_timeout)).context("poll")?; + poll.poll(&mut events, Some(poll_timeout)).context("poll")?; - for event in events.iter() { - if event.is_readable() { - self.read_and_handle_requests(pending_scrape_valid_until); - } - } - } - PollMode::SkipPolling => { - self.polling_mode = PollMode::Regular; - - // Continue reading from socket without polling, since - // reading was previouly cancelled - self.read_and_handle_requests(pending_scrape_valid_until); - } - PollMode::SkipReceiving => { - ::log::debug!("Postponing receiving requests because swarm worker channel is full. This means that the OS will be relied on to buffer incoming packets. To prevent this, raise config.worker_channel_size."); - - self.polling_mode = PollMode::SkipPolling; + for event in events.iter() { + if event.is_readable() { + self.read_and_handle_requests(&mut opt_resend_buffer); } } // If resend buffer is enabled, send any responses in it - if let Some(resend_buffer) = self.opt_resend_buffer.as_mut() { + if let Some(resend_buffer) = opt_resend_buffer.as_mut() { for (addr, response) in resend_buffer.drain(..) { - send_response( - &self.config, - &self.statistics, - &mut self.socket, - &mut self.buffer, - &mut None, - response, - addr, - ); - } - } - - // Check channel for any responses generated by swarm workers - self.handle_swarm_worker_responses(); - - // Try sending pending requests - while let Some((index, request, addr)) = self.pending_requests.pop() { - if let Err(r) = self.request_sender.try_send_to(index, request, addr) { - self.pending_requests.push(r); - - self.polling_mode = PollMode::SkipReceiving; - - break; + self.send_response(&mut None, addr, response); } } - // Run periodic ValidUntil updates and state cleaning if iter_counter % 256 == 0 { - let seconds_since_start = self.shared_state.server_start_instant.seconds_elapsed(); + self.validator.update_elapsed(); - pending_scrape_valid_until = ValidUntil::new_with_now( - seconds_since_start, - self.config.cleaning.max_pending_scrape_age, + self.peer_valid_until = ValidUntil::new( + self.shared_state.server_start_instant, + self.config.cleaning.max_peer_age, ); - - let now = Instant::now(); - - if now > last_pending_scrape_cleaning + pending_scrape_cleaning_duration { - self.pending_scrape_responses.clean(seconds_since_start); - - last_pending_scrape_cleaning = now; - } } iter_counter = iter_counter.wrapping_add(1); } } - fn read_and_handle_requests(&mut self, pending_scrape_valid_until: ValidUntil) { + fn read_and_handle_requests( + &mut self, + opt_resend_buffer: &mut Option>, + ) { let max_scrape_torrents = self.config.protocol.max_scrape_torrents; loop { @@ -222,13 +157,8 @@ impl SocketWorker { statistics.requests.fetch_add(1, Ordering::Relaxed); } - if let Err(HandleRequestError::RequestChannelFull(failed_requests)) = - self.handle_request(pending_scrape_valid_until, request, src) - { - self.pending_requests.extend(failed_requests); - self.polling_mode = PollMode::SkipReceiving; - - break; + if let Some(response) = self.handle_request(request, src) { + self.send_response(opt_resend_buffer, src, response); } } Err(RequestParseError::Sendable { @@ -241,15 +171,7 @@ impl SocketWorker { message: err.into(), }; - send_response( - &self.config, - &self.statistics, - &mut self.socket, - &mut self.buffer, - &mut self.opt_resend_buffer, - Response::Error(response), - src, - ); + self.send_response(opt_resend_buffer, src, Response::Error(response)); ::log::debug!("request parse error (sent error response): {:?}", err); } @@ -271,34 +193,15 @@ impl SocketWorker { } } - fn handle_request( - &mut self, - pending_scrape_valid_until: ValidUntil, - request: Request, - src: CanonicalSocketAddr, - ) -> Result<(), HandleRequestError> { + fn handle_request(&mut self, request: Request, src: CanonicalSocketAddr) -> Option { let access_list_mode = self.config.access_list.mode; match request { Request::Connect(request) => { - let connection_id = self.validator.create_connection_id(src); - - let response = ConnectResponse { - connection_id, + return Some(Response::Connect(ConnectResponse { + connection_id: self.validator.create_connection_id(src), transaction_id: request.transaction_id, - }; - - send_response( - &self.config, - &self.statistics, - &mut self.socket, - &mut self.buffer, - &mut self.opt_resend_buffer, - Response::Connect(response), - src, - ); - - Ok(()) + })); } Request::Announce(request) => { if self @@ -310,34 +213,22 @@ impl SocketWorker { .load() .allows(access_list_mode, &request.info_hash.0) { - let worker_index = - SwarmWorkerIndex::from_info_hash(&self.config, request.info_hash); - - self.request_sender - .try_send_to(worker_index, ConnectedRequest::Announce(request), src) - .map_err(|request| { - HandleRequestError::RequestChannelFull(vec![request]) - }) - } else { - let response = ErrorResponse { - transaction_id: request.transaction_id, - message: "Info hash not allowed".into(), - }; - - send_response( + let response = self.shared_state.torrent_maps.announce( &self.config, - &self.statistics, - &mut self.socket, - &mut self.buffer, - &mut self.opt_resend_buffer, - Response::Error(response), + &self.statistics_sender, + &mut self.rng, + &request, src, + self.peer_valid_until, ); - Ok(()) + return Some(response); + } else { + return Some(Response::Error(ErrorResponse { + transaction_id: request.transaction_id, + message: "Info hash not allowed".into(), + })); } - } else { - Ok(()) } } Request::Scrape(request) => { @@ -345,136 +236,85 @@ impl SocketWorker { .validator .connection_id_valid(src, request.connection_id) { - let split_requests = self.pending_scrape_responses.prepare_split_requests( - &self.config, - request, - pending_scrape_valid_until, - ); - - let mut failed = Vec::new(); - - for (swarm_worker_index, request) in split_requests { - if let Err(request) = self.request_sender.try_send_to( - swarm_worker_index, - ConnectedRequest::Scrape(request), - src, - ) { - failed.push(request); - } - } - - if failed.is_empty() { - Ok(()) - } else { - Err(HandleRequestError::RequestChannelFull(failed)) - } - } else { - Ok(()) + return Some(Response::Scrape( + self.shared_state.torrent_maps.scrape(request, src), + )); } } } - } - fn handle_swarm_worker_responses(&mut self) { - for (addr, response) in self.response_receiver.try_iter() { - let response = match response { - ConnectedResponse::Scrape(response) => { - if let Some(r) = self - .pending_scrape_responses - .add_and_get_finished(&response) - { - Response::Scrape(r) - } else { - continue; - } - } - ConnectedResponse::AnnounceIpv4(r) => Response::AnnounceIpv4(r), - ConnectedResponse::AnnounceIpv6(r) => Response::AnnounceIpv6(r), - }; - - send_response( - &self.config, - &self.statistics, - &mut self.socket, - &mut self.buffer, - &mut self.opt_resend_buffer, - response, - addr, - ); - } + None } -} -fn send_response( - config: &Config, - statistics: &CachePaddedArc>, - socket: &mut UdpSocket, - buffer: &mut [u8], - opt_resend_buffer: &mut Option>, - response: Response, - canonical_addr: CanonicalSocketAddr, -) { - let mut buffer = Cursor::new(&mut buffer[..]); - - if let Err(err) = response.write_bytes(&mut buffer) { - ::log::error!("failed writing response to buffer: {:#}", err); - - return; - } + fn send_response( + &mut self, + opt_resend_buffer: &mut Option>, + canonical_addr: CanonicalSocketAddr, + response: Response, + ) { + let mut buffer = Cursor::new(&mut self.buffer[..]); - let bytes_written = buffer.position() as usize; + if let Err(err) = response.write_bytes(&mut buffer) { + ::log::error!("failed writing response to buffer: {:#}", err); - let addr = if config.network.address.is_ipv4() { - canonical_addr - .get_ipv4() - .expect("found peer ipv6 address while running bound to ipv4 address") - } else { - canonical_addr.get_ipv6_mapped() - }; + return; + } - match socket.send_to(&buffer.into_inner()[..bytes_written], addr) { - Ok(amt) if config.statistics.active() => { - let stats = if canonical_addr.is_ipv4() { - let stats = &statistics.ipv4; + let bytes_written = buffer.position() as usize; - stats - .bytes_sent - .fetch_add(amt + EXTRA_PACKET_SIZE_IPV4, Ordering::Relaxed); + let addr = if self.config.network.address.is_ipv4() { + canonical_addr + .get_ipv4() + .expect("found peer ipv6 address while running bound to ipv4 address") + } else { + canonical_addr.get_ipv6_mapped() + }; - stats - } else { - let stats = &statistics.ipv6; + match self + .socket + .send_to(&buffer.into_inner()[..bytes_written], addr) + { + Ok(bytes_sent) if self.config.statistics.active() => { + let stats = if canonical_addr.is_ipv4() { + let stats = &self.statistics.ipv4; - stats - .bytes_sent - .fetch_add(amt + EXTRA_PACKET_SIZE_IPV6, Ordering::Relaxed); + stats + .bytes_sent + .fetch_add(bytes_sent + EXTRA_PACKET_SIZE_IPV4, Ordering::Relaxed); - stats - }; + stats + } else { + let stats = &self.statistics.ipv6; - match response { - Response::Connect(_) => { - stats.responses_connect.fetch_add(1, Ordering::Relaxed); - } - Response::AnnounceIpv4(_) | Response::AnnounceIpv6(_) => { - stats.responses_announce.fetch_add(1, Ordering::Relaxed); - } - Response::Scrape(_) => { - stats.responses_scrape.fetch_add(1, Ordering::Relaxed); - } - Response::Error(_) => { - stats.responses_error.fetch_add(1, Ordering::Relaxed); + stats + .bytes_sent + .fetch_add(bytes_sent + EXTRA_PACKET_SIZE_IPV6, Ordering::Relaxed); + + stats + }; + + match response { + Response::Connect(_) => { + stats.responses_connect.fetch_add(1, Ordering::Relaxed); + } + Response::AnnounceIpv4(_) | Response::AnnounceIpv6(_) => { + stats.responses_announce.fetch_add(1, Ordering::Relaxed); + } + Response::Scrape(_) => { + stats.responses_scrape.fetch_add(1, Ordering::Relaxed); + } + Response::Error(_) => { + stats.responses_error.fetch_add(1, Ordering::Relaxed); + } } } - } - Ok(_) => (), - Err(err) => { - match opt_resend_buffer.as_mut() { + Ok(_) => (), + Err(err) => match opt_resend_buffer.as_mut() { Some(resend_buffer) if (err.raw_os_error() == Some(libc::ENOBUFS)) || (err.kind() == ErrorKind::WouldBlock) => { - if resend_buffer.len() < config.network.resend_buffer_max_len { + if resend_buffer.len() < self.config.network.resend_buffer_max_len { ::log::debug!("Adding response to resend queue, since sending it to {} failed with: {:#}", addr, err); resend_buffer.push((canonical_addr, response)); @@ -485,7 +325,9 @@ fn send_response( _ => { ::log::warn!("Sending response to {} failed: {:#}", addr, err); } - } + }, } + + ::log::debug!("send response fn finished"); } } diff --git a/crates/udp/src/workers/socket/mod.rs b/crates/udp/src/workers/socket/mod.rs index d55e69dd..ef1adeac 100644 --- a/crates/udp/src/workers/socket/mod.rs +++ b/crates/udp/src/workers/socket/mod.rs @@ -1,17 +1,16 @@ mod mio; -mod storage; #[cfg(all(target_os = "linux", feature = "io-uring"))] mod uring; mod validator; use anyhow::Context; use aquatic_common::privileges::PrivilegeDropper; +use crossbeam_channel::Sender; use socket2::{Domain, Protocol, Socket, Type}; use crate::{ common::{ - CachePaddedArc, ConnectedRequestSender, ConnectedResponseReceiver, IpVersionStatistics, - SocketWorkerStatistics, State, + CachePaddedArc, IpVersionStatistics, SocketWorkerStatistics, State, StatisticsMessage, }, config::Config, }; @@ -43,9 +42,8 @@ pub fn run_socket_worker( config: Config, shared_state: State, statistics: CachePaddedArc>, + statistics_sender: Sender, validator: ConnectionValidator, - request_sender: ConnectedRequestSender, - response_receiver: ConnectedResponseReceiver, priv_dropper: PrivilegeDropper, ) -> anyhow::Result<()> { #[cfg(all(target_os = "linux", feature = "io-uring"))] @@ -56,9 +54,8 @@ pub fn run_socket_worker( config, shared_state, statistics, + statistics_sender, validator, - request_sender, - response_receiver, priv_dropper, ); } @@ -67,9 +64,8 @@ pub fn run_socket_worker( config, shared_state, statistics, + statistics_sender, validator, - request_sender, - response_receiver, priv_dropper, ) } diff --git a/crates/udp/src/workers/socket/storage.rs b/crates/udp/src/workers/socket/storage.rs deleted file mode 100644 index 84c11a79..00000000 --- a/crates/udp/src/workers/socket/storage.rs +++ /dev/null @@ -1,218 +0,0 @@ -use std::collections::BTreeMap; - -use hashbrown::HashMap; -use slab::Slab; - -use aquatic_common::{SecondsSinceServerStart, ValidUntil}; -use aquatic_udp_protocol::*; - -use crate::common::*; -use crate::config::Config; - -#[derive(Debug)] -pub struct PendingScrapeResponseSlabEntry { - num_pending: usize, - valid_until: ValidUntil, - torrent_stats: BTreeMap, - transaction_id: TransactionId, -} - -#[derive(Default)] -pub struct PendingScrapeResponseSlab(Slab); - -impl PendingScrapeResponseSlab { - pub fn prepare_split_requests( - &mut self, - config: &Config, - request: ScrapeRequest, - valid_until: ValidUntil, - ) -> impl IntoIterator { - let capacity = config.swarm_workers.min(request.info_hashes.len()); - let mut split_requests: HashMap = - HashMap::with_capacity(capacity); - - if request.info_hashes.is_empty() { - ::log::warn!( - "Attempted to prepare PendingScrapeResponseSlab entry with zero info hashes" - ); - - return split_requests; - } - - let vacant_entry = self.0.vacant_entry(); - let slab_key = vacant_entry.key(); - - for (i, info_hash) in request.info_hashes.into_iter().enumerate() { - let split_request = split_requests - .entry(SwarmWorkerIndex::from_info_hash(config, info_hash)) - .or_insert_with(|| PendingScrapeRequest { - slab_key, - info_hashes: BTreeMap::new(), - }); - - split_request.info_hashes.insert(i, info_hash); - } - - vacant_entry.insert(PendingScrapeResponseSlabEntry { - num_pending: split_requests.len(), - valid_until, - torrent_stats: Default::default(), - transaction_id: request.transaction_id, - }); - - split_requests - } - - pub fn add_and_get_finished( - &mut self, - response: &PendingScrapeResponse, - ) -> Option { - let finished = if let Some(entry) = self.0.get_mut(response.slab_key) { - entry.num_pending -= 1; - - entry.torrent_stats.extend(response.torrent_stats.iter()); - - entry.num_pending == 0 - } else { - ::log::warn!( - "PendingScrapeResponseSlab.add didn't find entry for key {:?}", - response.slab_key - ); - - false - }; - - if finished { - let entry = self.0.remove(response.slab_key); - - Some(ScrapeResponse { - transaction_id: entry.transaction_id, - torrent_stats: entry.torrent_stats.into_values().collect(), - }) - } else { - None - } - } - - pub fn clean(&mut self, now: SecondsSinceServerStart) { - self.0.retain(|k, v| { - if v.valid_until.valid(now) { - true - } else { - ::log::warn!( - "Unconsumed PendingScrapeResponseSlab entry. {:?}: {:?}", - k, - v - ); - - false - } - }); - - self.0.shrink_to_fit(); - } -} - -#[cfg(test)] -mod tests { - use aquatic_common::ServerStartInstant; - use quickcheck::TestResult; - use quickcheck_macros::quickcheck; - - use super::*; - - #[quickcheck] - fn test_pending_scrape_response_slab( - request_data: Vec<(i32, i64, u8)>, - swarm_workers: u8, - ) -> TestResult { - if swarm_workers == 0 { - return TestResult::discard(); - } - - let config = Config { - swarm_workers: swarm_workers as usize, - ..Default::default() - }; - - let valid_until = ValidUntil::new(ServerStartInstant::new(), 1); - - let mut map = PendingScrapeResponseSlab::default(); - - let mut requests = Vec::new(); - - for (t, c, b) in request_data { - if b == 0 { - return TestResult::discard(); - } - - let mut info_hashes = Vec::new(); - - for i in 0..b { - let info_hash = InfoHash([i; 20]); - - info_hashes.push(info_hash); - } - - let request = ScrapeRequest { - transaction_id: TransactionId::new(t), - connection_id: ConnectionId::new(c), - info_hashes, - }; - - requests.push(request); - } - - let mut all_split_requests = Vec::new(); - - for request in requests.iter() { - let split_requests = - map.prepare_split_requests(&config, request.to_owned(), valid_until); - - all_split_requests.push( - split_requests - .into_iter() - .collect::>(), - ); - } - - assert_eq!(map.0.len(), requests.len()); - - let mut responses = Vec::new(); - - for split_requests in all_split_requests { - for (worker_index, split_request) in split_requests { - assert!(worker_index.0 < swarm_workers as usize); - - let torrent_stats = split_request - .info_hashes - .into_iter() - .map(|(i, info_hash)| { - ( - i, - TorrentScrapeStatistics { - seeders: NumberOfPeers::new((info_hash.0[0]) as i32), - leechers: NumberOfPeers::new(0), - completed: NumberOfDownloads::new(0), - }, - ) - }) - .collect(); - - let response = PendingScrapeResponse { - slab_key: split_request.slab_key, - torrent_stats, - }; - - if let Some(response) = map.add_and_get_finished(&response) { - responses.push(response); - } - } - } - - assert!(map.0.is_empty()); - assert_eq!(responses.len(), requests.len()); - - TestResult::from_bool(true) - } -} diff --git a/crates/udp/src/workers/socket/uring/mod.rs b/crates/udp/src/workers/socket/uring/mod.rs index fe8b4907..ac572d4e 100644 --- a/crates/udp/src/workers/socket/uring/mod.rs +++ b/crates/udp/src/workers/socket/uring/mod.rs @@ -11,6 +11,7 @@ use std::sync::atomic::Ordering; use anyhow::Context; use aquatic_common::access_list::AccessListCache; +use crossbeam_channel::Sender; use io_uring::opcode::Timeout; use io_uring::types::{Fixed, Timespec}; use io_uring::{IoUring, Probe}; @@ -20,6 +21,8 @@ use aquatic_common::{ ValidUntil, }; use aquatic_udp_protocol::*; +use rand::rngs::SmallRng; +use rand::SeedableRng; use crate::common::*; use crate::config::Config; @@ -28,7 +31,6 @@ use self::buf_ring::BufRing; use self::recv_helper::RecvHelper; use self::send_buffers::{ResponseType, SendBuffers}; -use super::storage::PendingScrapeResponseSlab; use super::validator::ConnectionValidator; use super::{create_socket, EXTRA_PACKET_SIZE_IPV4, EXTRA_PACKET_SIZE_IPV6}; @@ -48,7 +50,6 @@ const RESPONSE_BUF_LEN: usize = 2048; const USER_DATA_RECV: u64 = u64::MAX; const USER_DATA_PULSE_TIMEOUT: u64 = u64::MAX - 1; -const USER_DATA_CLEANING_TIMEOUT: u64 = u64::MAX - 2; const SOCKET_IDENTIFIER: Fixed = Fixed(0); @@ -76,22 +77,20 @@ pub struct SocketWorker { config: Config, shared_state: State, statistics: CachePaddedArc>, - request_sender: ConnectedRequestSender, - response_receiver: ConnectedResponseReceiver, + statistics_sender: Sender, access_list_cache: AccessListCache, validator: ConnectionValidator, #[allow(dead_code)] socket: UdpSocket, - pending_scrape_responses: PendingScrapeResponseSlab, buf_ring: BufRing, send_buffers: SendBuffers, recv_helper: RecvHelper, - local_responses: VecDeque<(Response, CanonicalSocketAddr)>, + local_responses: VecDeque<(CanonicalSocketAddr, Response)>, resubmittable_sqe_buf: Vec, recv_sqe: io_uring::squeue::Entry, pulse_timeout_sqe: io_uring::squeue::Entry, - cleaning_timeout_sqe: io_uring::squeue::Entry, - pending_scrape_valid_until: ValidUntil, + peer_valid_until: ValidUntil, + rng: SmallRng, } impl SocketWorker { @@ -99,9 +98,8 @@ impl SocketWorker { config: Config, shared_state: State, statistics: CachePaddedArc>, + statistics_sender: Sender, validator: ConnectionValidator, - request_sender: ConnectedRequestSender, - response_receiver: ConnectedResponseReceiver, priv_dropper: PrivilegeDropper, ) -> anyhow::Result<()> { let ring_entries = config.network.ring_size.next_power_of_two(); @@ -136,57 +134,40 @@ impl SocketWorker { let recv_sqe = recv_helper.create_entry(buf_ring.bgid()); - // This timeout enables regular updates of pending_scrape_valid_until - // and wakes the main loop to send any pending responses in the case - // of no incoming requests + // This timeout enables regular updates of ConnectionValidator and + // peer_valid_until let pulse_timeout_sqe = { - let timespec_ptr = Box::into_raw(Box::new(Timespec::new().sec(1))) as *const _; + let timespec_ptr = Box::into_raw(Box::new(Timespec::new().sec(5))) as *const _; Timeout::new(timespec_ptr) .build() .user_data(USER_DATA_PULSE_TIMEOUT) }; - let cleaning_timeout_sqe = { - let timespec_ptr = Box::into_raw(Box::new( - Timespec::new().sec(config.cleaning.pending_scrape_cleaning_interval), - )) as *const _; + let resubmittable_sqe_buf = vec![recv_sqe.clone(), pulse_timeout_sqe.clone()]; - Timeout::new(timespec_ptr) - .build() - .user_data(USER_DATA_CLEANING_TIMEOUT) - }; - - let resubmittable_sqe_buf = vec![ - recv_sqe.clone(), - pulse_timeout_sqe.clone(), - cleaning_timeout_sqe.clone(), - ]; - - let pending_scrape_valid_until = ValidUntil::new( + let peer_valid_until = ValidUntil::new( shared_state.server_start_instant, - config.cleaning.max_pending_scrape_age, + config.cleaning.max_peer_age, ); let mut worker = Self { config, shared_state, statistics, + statistics_sender, validator, - request_sender, - response_receiver, access_list_cache, - pending_scrape_responses: Default::default(), send_buffers, recv_helper, local_responses: Default::default(), buf_ring, recv_sqe, pulse_timeout_sqe, - cleaning_timeout_sqe, resubmittable_sqe_buf, socket, - pending_scrape_valid_until, + peer_valid_until, + rng: SmallRng::from_entropy(), }; CurrentRing::with(|ring| worker.run_inner(ring)); @@ -210,7 +191,7 @@ impl SocketWorker { // Enqueue local responses for _ in 0..sq_space { - if let Some((response, addr)) = self.local_responses.pop_front() { + if let Some((addr, response)) = self.local_responses.pop_front() { match self.send_buffers.prepare_entry(response, addr) { Ok(entry) => { unsafe { ring.submission().push(&entry).unwrap() }; @@ -218,7 +199,7 @@ impl SocketWorker { num_send_added += 1; } Err(send_buffers::Error::NoBuffers(response)) => { - self.local_responses.push_front((response, addr)); + self.local_responses.push_front((addr, response)); break; } @@ -231,43 +212,6 @@ impl SocketWorker { } } - // Enqueue swarm worker responses - for _ in 0..(sq_space - num_send_added) { - let (addr, response) = if let Ok(r) = self.response_receiver.try_recv() { - r - } else { - break; - }; - - let response = match response { - ConnectedResponse::AnnounceIpv4(r) => Response::AnnounceIpv4(r), - ConnectedResponse::AnnounceIpv6(r) => Response::AnnounceIpv6(r), - ConnectedResponse::Scrape(r) => { - if let Some(r) = self.pending_scrape_responses.add_and_get_finished(&r) { - Response::Scrape(r) - } else { - continue; - } - } - }; - - match self.send_buffers.prepare_entry(response, addr) { - Ok(entry) => { - unsafe { ring.submission().push(&entry).unwrap() }; - - num_send_added += 1; - } - Err(send_buffers::Error::NoBuffers(response)) => { - self.local_responses.push_back((response, addr)); - - break; - } - Err(send_buffers::Error::SerializationFailed(err)) => { - ::log::error!("Failed serializing response: {:#}", err); - } - } - } - // Wait for all sendmsg entries to complete. If none were added, // wait for at least one recvmsg or timeout in order to avoid // busy-polling if there is no incoming data. @@ -286,28 +230,25 @@ impl SocketWorker { fn handle_cqe(&mut self, cqe: io_uring::cqueue::Entry) { match cqe.user_data() { USER_DATA_RECV => { - self.handle_recv_cqe(&cqe); + if let Some((addr, response)) = self.handle_recv_cqe(&cqe) { + self.local_responses.push_back((addr, response)); + } if !io_uring::cqueue::more(cqe.flags()) { self.resubmittable_sqe_buf.push(self.recv_sqe.clone()); } } USER_DATA_PULSE_TIMEOUT => { - self.pending_scrape_valid_until = ValidUntil::new( + self.validator.update_elapsed(); + + self.peer_valid_until = ValidUntil::new( self.shared_state.server_start_instant, - self.config.cleaning.max_pending_scrape_age, + self.config.cleaning.max_peer_age, ); self.resubmittable_sqe_buf .push(self.pulse_timeout_sqe.clone()); } - USER_DATA_CLEANING_TIMEOUT => { - self.pending_scrape_responses - .clean(self.shared_state.server_start_instant.seconds_elapsed()); - - self.resubmittable_sqe_buf - .push(self.cleaning_timeout_sqe.clone()); - } send_buffer_index => { let result = cqe.result(); @@ -352,12 +293,15 @@ impl SocketWorker { } } - fn handle_recv_cqe(&mut self, cqe: &io_uring::cqueue::Entry) { + fn handle_recv_cqe( + &mut self, + cqe: &io_uring::cqueue::Entry, + ) -> Option<(CanonicalSocketAddr, Response)> { let result = cqe.result(); if result < 0 { if -result == libc::ENOBUFS { - ::log::info!("recv failed due to lack of buffers. If increasing ring size doesn't help, get faster hardware"); + ::log::info!("recv failed due to lack of buffers, try increasing ring size"); } else { ::log::warn!( "recv failed: {:#}", @@ -365,7 +309,7 @@ impl SocketWorker { ); } - return; + return None; } let buffer = unsafe { @@ -374,23 +318,48 @@ impl SocketWorker { Ok(None) => { ::log::error!("Couldn't get recv buffer"); - return; + return None; } Err(err) => { ::log::error!("Couldn't get recv buffer: {:#}", err); - return; + return None; } } }; - let addr = match self.recv_helper.parse(buffer.as_slice()) { + match self.recv_helper.parse(buffer.as_slice()) { Ok((request, addr)) => { - self.handle_request(request, addr); + if self.config.statistics.active() { + let (statistics, extra_bytes) = if addr.is_ipv4() { + (&self.statistics.ipv4, EXTRA_PACKET_SIZE_IPV4) + } else { + (&self.statistics.ipv6, EXTRA_PACKET_SIZE_IPV6) + }; - addr + statistics + .bytes_received + .fetch_add(buffer.len() + extra_bytes, Ordering::Relaxed); + statistics.requests.fetch_add(1, Ordering::Relaxed); + } + + return self.handle_request(request, addr); } Err(self::recv_helper::Error::RequestParseError(err, addr)) => { + if self.config.statistics.active() { + if addr.is_ipv4() { + self.statistics + .ipv4 + .bytes_received + .fetch_add(buffer.len() + EXTRA_PACKET_SIZE_IPV4, Ordering::Relaxed); + } else { + self.statistics + .ipv6 + .bytes_received + .fetch_add(buffer.len() + EXTRA_PACKET_SIZE_IPV6, Ordering::Relaxed); + } + } + match err { RequestParseError::Sendable { connection_id, @@ -405,60 +374,43 @@ impl SocketWorker { message: err.into(), }; - self.local_responses.push_back((response.into(), addr)); + return Some((addr, Response::Error(response))); } } RequestParseError::Unsendable { err } => { ::log::debug!("Couldn't parse request from {:?}: {}", addr, err); } } - - addr } Err(self::recv_helper::Error::InvalidSocketAddress) => { ::log::debug!("Ignored request claiming to be from port 0"); - - return; } Err(self::recv_helper::Error::RecvMsgParseError) => { ::log::error!("RecvMsgOut::parse failed"); - - return; } Err(self::recv_helper::Error::RecvMsgTruncated) => { ::log::warn!("RecvMsgOut::parse failed: sockaddr or payload truncated"); - - return; } - }; - - if self.config.statistics.active() { - let (statistics, extra_bytes) = if addr.is_ipv4() { - (&self.statistics.ipv4, EXTRA_PACKET_SIZE_IPV4) - } else { - (&self.statistics.ipv6, EXTRA_PACKET_SIZE_IPV6) - }; - - statistics - .bytes_received - .fetch_add(buffer.len() + extra_bytes, Ordering::Relaxed); - statistics.requests.fetch_add(1, Ordering::Relaxed); } + + None } - fn handle_request(&mut self, request: Request, src: CanonicalSocketAddr) { + fn handle_request( + &mut self, + request: Request, + src: CanonicalSocketAddr, + ) -> Option<(CanonicalSocketAddr, Response)> { let access_list_mode = self.config.access_list.mode; match request { Request::Connect(request) => { - let connection_id = self.validator.create_connection_id(src); - let response = Response::Connect(ConnectResponse { - connection_id, + connection_id: self.validator.create_connection_id(src), transaction_id: request.transaction_id, }); - self.local_responses.push_back((response, src)); + return Some((src, response)); } Request::Announce(request) => { if self @@ -470,23 +422,23 @@ impl SocketWorker { .load() .allows(access_list_mode, &request.info_hash.0) { - let worker_index = - SwarmWorkerIndex::from_info_hash(&self.config, request.info_hash); - - if self - .request_sender - .try_send_to(worker_index, ConnectedRequest::Announce(request), src) - .is_err() - { - ::log::warn!("request sender full, dropping request"); - } + let response = self.shared_state.torrent_maps.announce( + &self.config, + &self.statistics_sender, + &mut self.rng, + &request, + src, + self.peer_valid_until, + ); + + return Some((src, response)); } else { let response = Response::Error(ErrorResponse { transaction_id: request.transaction_id, message: "Info hash not allowed".into(), }); - self.local_responses.push_back((response, src)) + return Some((src, response)); } } } @@ -495,24 +447,15 @@ impl SocketWorker { .validator .connection_id_valid(src, request.connection_id) { - let split_requests = self.pending_scrape_responses.prepare_split_requests( - &self.config, - request, - self.pending_scrape_valid_until, - ); + let response = + Response::Scrape(self.shared_state.torrent_maps.scrape(request, src)); - for (swarm_worker_index, request) in split_requests { - if self - .request_sender - .try_send_to(swarm_worker_index, ConnectedRequest::Scrape(request), src) - .is_err() - { - ::log::warn!("request sender full, dropping request"); - } - } + return Some((src, response)); } } } + + None } } diff --git a/crates/udp/src/workers/socket/validator.rs b/crates/udp/src/workers/socket/validator.rs index c68d1efc..f96b9159 100644 --- a/crates/udp/src/workers/socket/validator.rs +++ b/crates/udp/src/workers/socket/validator.rs @@ -12,6 +12,8 @@ use crate::config::Config; /// HMAC (BLAKE3) based ConnectionId creator and validator /// +/// Method update_elapsed must be called at least once a minute. +/// /// The purpose of using ConnectionIds is to make IP spoofing costly, mainly to /// prevent the tracker from being used as an amplification vector for DDoS /// attacks. By including 32 bits of BLAKE3 keyed hash output in the Ids, an @@ -32,6 +34,7 @@ pub struct ConnectionValidator { start_time: Instant, max_connection_age: u64, keyed_hasher: blake3::Hasher, + seconds_since_start: u32, } impl ConnectionValidator { @@ -49,11 +52,12 @@ impl ConnectionValidator { keyed_hasher, start_time: Instant::now(), max_connection_age: config.cleaning.max_connection_age.into(), + seconds_since_start: 0, }) } pub fn create_connection_id(&mut self, source_addr: CanonicalSocketAddr) -> ConnectionId { - let elapsed = (self.start_time.elapsed().as_secs() as u32).to_ne_bytes(); + let elapsed = (self.seconds_since_start).to_ne_bytes(); let hash = self.hash(elapsed, source_addr.get().ip()); @@ -78,16 +82,23 @@ impl ConnectionValidator { return false; } - let tracker_elapsed = self.start_time.elapsed().as_secs(); + let seconds_since_start = self.seconds_since_start as u64; let client_elapsed = u64::from(u32::from_ne_bytes(elapsed)); let client_expiration_time = client_elapsed + self.max_connection_age; // In addition to checking if the client connection is expired, - // disallow client_elapsed values that are in future and thus could not - // have been sent by the tracker. This prevents brute forcing with - // `u32::MAX` as 'elapsed' part of ConnectionId to find a hash that + // disallow client_elapsed values that are too far in future and thus + // could not have been sent by the tracker. This prevents brute forcing + // with `u32::MAX` as 'elapsed' part of ConnectionId to find a hash that // works until the tracker is restarted. - (client_expiration_time > tracker_elapsed) & (client_elapsed <= tracker_elapsed) + let client_not_expired = client_expiration_time > seconds_since_start; + let client_elapsed_not_in_far_future = client_elapsed <= (seconds_since_start + 60); + + client_not_expired & client_elapsed_not_in_far_future + } + + pub fn update_elapsed(&mut self) { + self.seconds_since_start = self.start_time.elapsed().as_secs() as u32; } fn hash(&mut self, elapsed: [u8; 4], ip_addr: IpAddr) -> [u8; 4] { @@ -148,7 +159,6 @@ mod tests { if max_connection_age == 0 { quickcheck::TestResult::from_bool(!original_valid) } else { - // Note: depends on that running this test takes less than a second quickcheck::TestResult::from_bool(original_valid) } } diff --git a/crates/udp/src/workers/statistics/collector.rs b/crates/udp/src/workers/statistics/collector.rs index 5297853e..93fe11d8 100644 --- a/crates/udp/src/workers/statistics/collector.rs +++ b/crates/udp/src/workers/statistics/collector.rs @@ -25,7 +25,6 @@ pub struct StatisticsCollector { statistics: Statistics, ip_version: IpVersion, last_update: Instant, - pending_histograms: Vec>, last_complete_histogram: PeerHistogramStatistics, } @@ -34,19 +33,13 @@ impl StatisticsCollector { Self { statistics, last_update: Instant::now(), - pending_histograms: Vec::new(), last_complete_histogram: Default::default(), ip_version, } } - pub fn add_histogram(&mut self, config: &Config, histogram: Histogram) { - self.pending_histograms.push(histogram); - - if self.pending_histograms.len() == config.swarm_workers { - self.last_complete_histogram = - PeerHistogramStatistics::new(self.pending_histograms.drain(..).sum()); - } + pub fn add_histogram(&mut self, histogram: Histogram) { + self.last_complete_histogram = PeerHistogramStatistics::new(histogram); } pub fn collect_from_shared( @@ -60,8 +53,6 @@ impl StatisticsCollector { let mut responses_error: usize = 0; let mut bytes_received: usize = 0; let mut bytes_sent: usize = 0; - let mut num_torrents: usize = 0; - let mut num_peers: usize = 0; #[cfg(feature = "prometheus")] let ip_version_prometheus_str = self.ip_version.prometheus_str(); @@ -186,44 +177,37 @@ impl StatisticsCollector { } } - for (i, statistics) in self - .statistics - .swarm - .iter() - .map(|s| s.by_ip_version(self.ip_version)) - .enumerate() - { - { - let n = statistics.torrents.load(Ordering::Relaxed); + let swarm_statistics = &self.statistics.swarm.by_ip_version(self.ip_version); - num_torrents += n; + let num_torrents = { + let num_torrents = swarm_statistics.torrents.load(Ordering::Relaxed); - #[cfg(feature = "prometheus")] - if config.statistics.run_prometheus_endpoint { - ::metrics::gauge!( - "aquatic_torrents", - "ip_version" => ip_version_prometheus_str, - "worker_index" => i.to_string(), - ) - .set(n as f64); - } + #[cfg(feature = "prometheus")] + if config.statistics.run_prometheus_endpoint { + ::metrics::gauge!( + "aquatic_torrents", + "ip_version" => ip_version_prometheus_str, + ) + .set(num_torrents as f64); } - { - let n = statistics.peers.load(Ordering::Relaxed); - num_peers += n; + num_torrents + }; - #[cfg(feature = "prometheus")] - if config.statistics.run_prometheus_endpoint { - ::metrics::gauge!( - "aquatic_peers", - "ip_version" => ip_version_prometheus_str, - "worker_index" => i.to_string(), - ) - .set(n as f64); - } + let num_peers = { + let num_peers = swarm_statistics.peers.load(Ordering::Relaxed); + + #[cfg(feature = "prometheus")] + if config.statistics.run_prometheus_endpoint { + ::metrics::gauge!( + "aquatic_peers", + "ip_version" => ip_version_prometheus_str, + ) + .set(num_peers as f64); } - } + + num_peers + }; let elapsed = { let now = Instant::now(); diff --git a/crates/udp/src/workers/statistics/mod.rs b/crates/udp/src/workers/statistics/mod.rs index beafa2d1..9ae5b91d 100644 --- a/crates/udp/src/workers/statistics/mod.rs +++ b/crates/udp/src/workers/statistics/mod.rs @@ -81,8 +81,8 @@ pub fn run_statistics_worker( for message in statistics_receiver.try_iter() { match message { - StatisticsMessage::Ipv4PeerHistogram(h) => ipv4_collector.add_histogram(&config, h), - StatisticsMessage::Ipv6PeerHistogram(h) => ipv6_collector.add_histogram(&config, h), + StatisticsMessage::Ipv4PeerHistogram(h) => ipv4_collector.add_histogram(h), + StatisticsMessage::Ipv6PeerHistogram(h) => ipv6_collector.add_histogram(h), StatisticsMessage::PeerAdded(peer_id) => { if process_peer_client_data { peers @@ -249,7 +249,10 @@ fn print_to_stdout(config: &Config, statistics: &CollectedStatistics) { " error: {:>10}", statistics.responses_per_second_error ); - println!(" torrents: {:>10}", statistics.num_torrents); + println!( + " torrents: {:>10} (updated every {}s)", + statistics.num_torrents, config.cleaning.torrent_cleaning_interval + ); println!( " peers: {:>10} (updated every {}s)", statistics.num_peers, config.cleaning.torrent_cleaning_interval diff --git a/crates/udp/src/workers/swarm/mod.rs b/crates/udp/src/workers/swarm/mod.rs deleted file mode 100644 index ccdab8ad..00000000 --- a/crates/udp/src/workers/swarm/mod.rs +++ /dev/null @@ -1,149 +0,0 @@ -mod storage; - -use std::net::IpAddr; -use std::sync::atomic::Ordering; -use std::time::Duration; -use std::time::Instant; - -use crossbeam_channel::Receiver; -use crossbeam_channel::Sender; -use rand::{rngs::SmallRng, SeedableRng}; - -use aquatic_common::{CanonicalSocketAddr, ValidUntil}; - -use crate::common::*; -use crate::config::Config; - -use storage::TorrentMaps; - -pub struct SwarmWorker { - pub config: Config, - pub state: State, - pub statistics: CachePaddedArc>, - pub request_receiver: Receiver<(SocketWorkerIndex, ConnectedRequest, CanonicalSocketAddr)>, - pub response_sender: ConnectedResponseSender, - pub statistics_sender: Sender, - pub worker_index: SwarmWorkerIndex, -} - -impl SwarmWorker { - pub fn run(&mut self) -> anyhow::Result<()> { - let mut torrents = TorrentMaps::default(); - let mut rng = SmallRng::from_entropy(); - - let timeout = Duration::from_millis(self.config.request_channel_recv_timeout_ms); - let mut peer_valid_until = ValidUntil::new( - self.state.server_start_instant, - self.config.cleaning.max_peer_age, - ); - - let cleaning_interval = Duration::from_secs(self.config.cleaning.torrent_cleaning_interval); - let statistics_update_interval = Duration::from_secs(self.config.statistics.interval); - - let mut last_cleaning = Instant::now(); - let mut last_statistics_update = Instant::now(); - - let mut iter_counter = 0usize; - - loop { - if let Ok((sender_index, request, src)) = self.request_receiver.recv_timeout(timeout) { - // It is OK to block here as long as we don't also do blocking - // sends in socket workers (doing both could cause a deadlock) - match (request, src.get().ip()) { - (ConnectedRequest::Announce(request), IpAddr::V4(ip)) => { - let response = torrents - .ipv4 - .0 - .entry(request.info_hash) - .or_default() - .announce( - &self.config, - &self.statistics_sender, - &mut rng, - &request, - ip.into(), - peer_valid_until, - ); - - // It doesn't matter which socket worker receives announce responses - self.response_sender - .send_to_any(src, ConnectedResponse::AnnounceIpv4(response)) - .expect("swarm response channel is closed"); - } - (ConnectedRequest::Announce(request), IpAddr::V6(ip)) => { - let response = torrents - .ipv6 - .0 - .entry(request.info_hash) - .or_default() - .announce( - &self.config, - &self.statistics_sender, - &mut rng, - &request, - ip.into(), - peer_valid_until, - ); - - // It doesn't matter which socket worker receives announce responses - self.response_sender - .send_to_any(src, ConnectedResponse::AnnounceIpv6(response)) - .expect("swarm response channel is closed"); - } - (ConnectedRequest::Scrape(request), IpAddr::V4(_)) => { - let response = torrents.ipv4.scrape(request); - - self.response_sender - .send_to(sender_index, src, ConnectedResponse::Scrape(response)) - .expect("swarm response channel is closed"); - } - (ConnectedRequest::Scrape(request), IpAddr::V6(_)) => { - let response = torrents.ipv6.scrape(request); - - self.response_sender - .send_to(sender_index, src, ConnectedResponse::Scrape(response)) - .expect("swarm response channel is closed"); - } - }; - } - - // Run periodic tasks - if iter_counter % 128 == 0 { - let now = Instant::now(); - - peer_valid_until = ValidUntil::new( - self.state.server_start_instant, - self.config.cleaning.max_peer_age, - ); - - if now > last_cleaning + cleaning_interval { - torrents.clean_and_update_statistics( - &self.config, - &self.state, - &self.statistics, - &self.statistics_sender, - &self.state.access_list, - ); - - last_cleaning = now; - } - if self.config.statistics.active() - && now > last_statistics_update + statistics_update_interval - { - self.statistics - .ipv4 - .torrents - .store(torrents.ipv4.num_torrents(), Ordering::Relaxed); - self.statistics - .ipv6 - .torrents - .store(torrents.ipv6.num_torrents(), Ordering::Relaxed); - - last_statistics_update = now; - } - } - - iter_counter = iter_counter.wrapping_add(1); - } - } -} diff --git a/crates/udp/templates/statistics.html b/crates/udp/templates/statistics.html index 0fe8930e..01bd37f4 100644 --- a/crates/udp/templates/statistics.html +++ b/crates/udp/templates/statistics.html @@ -25,10 +25,10 @@

BitTorrent tracker statistics

IPv4

- + - + @@ -141,10 +141,10 @@

Peers per torrent

IPv6

* Peer count is updated every { peer_update_interval } seconds* Torrent/peer count is updated every { peer_update_interval } seconds
Number of torrents{ ipv4.num_torrents }{ ipv4.num_torrents } *
Number of peers
- + - + diff --git a/crates/ws/README.md b/crates/ws/README.md index c21fadb7..ae5a34b9 100644 --- a/crates/ws/README.md +++ b/crates/ws/README.md @@ -105,6 +105,10 @@ clients. Notes: `aquatic_ws` has not been tested as much as `aquatic_udp`, but likely works fine in production. +## Architectural overview + +![Architectural overview of aquatic](../../documents/aquatic-architecture-2024.svg) + ## Copyright and license Copyright (c) Joakim Frostegård diff --git a/documents/aquatic-udp-load-test-2024-02-10.md b/documents/aquatic-udp-load-test-2024-02-10.md new file mode 100644 index 00000000..82d6e700 --- /dev/null +++ b/documents/aquatic-udp-load-test-2024-02-10.md @@ -0,0 +1,106 @@ +2024-02-10 Joakim Frostegård + +# UDP BitTorrent tracker throughput comparison + +This is a performance comparison of several UDP BitTorrent tracker implementations. + +Benchmarks were run using [aquatic_bencher](../crates/bencher), with `--cpu-mode subsequent-one-per-pair`. + +## Software and hardware + +### Tracker implementations + +| Name | Commit | +|---------------|---------| +| [aquatic_udp] | 21a5301 | +| [opentracker] | 110868e | +| [chihaya] | 2f79440 | + +[aquatic_udp]: ../crates/udp +[opentracker]: http://erdgeist.org/arts/software/opentracker/ +[chihaya]: https://github.com/chihaya/chihaya + +### OS and compilers + +| Name | Version | +|--------|---------| +| Debian | 12.4 | +| Linux | 6.5.10 | +| rustc | 1.76.0 | +| GCC | 12.2.0 | +| go | 1.19.8 | + +### Hardware + +Hetzner CCX63: 48 dedicated vCPUs (AMD Milan Epyc 7003) + +## Results + +![UDP BitTorrent tracker throughput](./aquatic-udp-load-test-2024-02-10.png) + +
* Peer count is updated every { peer_update_interval } seconds* Torrent/peer count is updated every { peer_update_interval } seconds
Number of torrents{ ipv6.num_torrents }{ ipv6.num_torrents } *
Number of peers
+ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
+ UDP BitTorrent tracker troughput +

Average responses per second, best result.

+
CPU coresaquatic_udp (mio)aquatic_udp (io_uring)opentrackerchihaya
1186,939226,065190,54055,989
2371,478444,353379,623111,226
4734,709876,642748,401136,983
61,034,8041,267,006901,600131,827
81,296,6931,521,1131,170,928131,779
121,871,3531,837,2231,675,059130,942
162,037,7132,258,3211,645,828127,256
diff --git a/documents/aquatic-udp-load-test-2024-02-10.png b/documents/aquatic-udp-load-test-2024-02-10.png new file mode 100644 index 00000000..a91b862b Binary files /dev/null and b/documents/aquatic-udp-load-test-2024-02-10.png differ