Skip to content

Commit

Permalink
Merge pull request #182 from greatest-ape/work-2024-02-01
Browse files Browse the repository at this point in the history
udp: fix prometheus issue, improve statistics code, other fixes
  • Loading branch information
greatest-ape authored Feb 3, 2024
2 parents a86eb68 + 3513b71 commit 53af594
Show file tree
Hide file tree
Showing 18 changed files with 552 additions and 428 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,12 @@
* Remove config key `network.poll_event_capacity` (always use 1)
* Speed up parsing and serialization of requests and responses by using
[zerocopy](https://crates.io/crates/zerocopy)
* Report socket worker related prometheus stats per worker

#### Fixed

* Quit whole application if any worker thread quits
* Disallow announce requests with port value of 0

### aquatic_http

Expand Down
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/udp/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ cfg-if = "1"
compact_str = "0.7"
constant_time_eq = "0.3"
crossbeam-channel = "0.5"
crossbeam-utils = "0.8"
getrandom = "0.2"
hashbrown = { version = "0.14", default-features = false }
hdrhistogram = "7"
Expand Down
177 changes: 82 additions & 95 deletions crates/udp/src/common.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,49 @@
use std::collections::BTreeMap;
use std::hash::Hash;
use std::iter::repeat_with;
use std::sync::atomic::AtomicUsize;
use std::sync::Arc;

use crossbeam_channel::{Receiver, SendError, Sender, TrySendError};

use aquatic_common::access_list::AccessListArcSwap;
use aquatic_common::CanonicalSocketAddr;
use aquatic_common::{CanonicalSocketAddr, ServerStartInstant};
use aquatic_udp_protocol::*;
use crossbeam_utils::CachePadded;
use hdrhistogram::Histogram;

use crate::config::Config;

pub const BUFFER_SIZE: usize = 8192;

#[derive(Clone, Copy, Debug)]
pub enum IpVersion {
V4,
V6,
}

#[cfg(feature = "prometheus")]
impl IpVersion {
pub fn prometheus_str(&self) -> &'static str {
match self {
Self::V4 => "4",
Self::V6 => "6",
}
}
}

#[derive(Clone, Copy, Debug)]
pub struct SocketWorkerIndex(pub usize);

#[derive(Clone, Copy, PartialEq, Eq, Hash, Debug)]
pub struct SwarmWorkerIndex(pub usize);

impl SwarmWorkerIndex {
pub fn from_info_hash(config: &Config, info_hash: InfoHash) -> Self {
Self(info_hash.0[0] as usize % config.swarm_workers)
}
}

#[derive(Debug)]
pub struct PendingScrapeRequest {
pub slab_key: usize,
Expand All @@ -39,18 +69,6 @@ pub enum ConnectedResponse {
Scrape(PendingScrapeResponse),
}

#[derive(Clone, Copy, Debug)]
pub struct SocketWorkerIndex(pub usize);

#[derive(Clone, Copy, PartialEq, Eq, Hash, Debug)]
pub struct SwarmWorkerIndex(pub usize);

impl SwarmWorkerIndex {
pub fn from_info_hash(config: &Config, info_hash: InfoHash) -> Self {
Self(info_hash.0[0] as usize % config.swarm_workers)
}
}

pub struct ConnectedRequestSender {
index: SocketWorkerIndex,
senders: Vec<Sender<(SocketWorkerIndex, ConnectedRequest, CanonicalSocketAddr)>>,
Expand All @@ -64,10 +82,6 @@ impl ConnectedRequestSender {
Self { index, senders }
}

pub fn any_full(&self) -> bool {
self.senders.iter().any(|sender| sender.is_full())
}

pub fn try_send_to(
&self,
index: SwarmWorkerIndex,
Expand Down Expand Up @@ -153,116 +167,89 @@ impl ConnectedResponseSender {

pub type ConnectedResponseReceiver = Receiver<(CanonicalSocketAddr, ConnectedResponse)>;

#[derive(PartialEq, Eq, Hash, Clone, Copy, Debug)]
pub enum PeerStatus {
Seeding,
Leeching,
Stopped,
#[derive(Clone)]
pub struct Statistics {
pub socket: Vec<CachePaddedArc<IpVersionStatistics<SocketWorkerStatistics>>>,
pub swarm: Vec<CachePaddedArc<IpVersionStatistics<SwarmWorkerStatistics>>>,
}

impl PeerStatus {
/// Determine peer status from announce event and number of bytes left.
///
/// Likely, the last branch will be taken most of the time.
#[inline]
pub fn from_event_and_bytes_left(event: AnnounceEvent, bytes_left: NumberOfBytes) -> Self {
if event == AnnounceEvent::Stopped {
Self::Stopped
} else if bytes_left.0.get() == 0 {
Self::Seeding
} else {
Self::Leeching
impl Statistics {
pub fn new(config: &Config) -> Self {
Self {
socket: repeat_with(Default::default)
.take(config.socket_workers)
.collect(),
swarm: repeat_with(Default::default)
.take(config.swarm_workers)
.collect(),
}
}
}

pub enum StatisticsMessage {
Ipv4PeerHistogram(Histogram<u64>),
Ipv6PeerHistogram(Histogram<u64>),
PeerAdded(PeerId),
PeerRemoved(PeerId),
#[derive(Default)]
pub struct IpVersionStatistics<T> {
pub ipv4: T,
pub ipv6: T,
}

pub struct Statistics {
pub requests_received: AtomicUsize,
pub responses_sent_connect: AtomicUsize,
pub responses_sent_announce: AtomicUsize,
pub responses_sent_scrape: AtomicUsize,
pub responses_sent_error: AtomicUsize,
impl<T> IpVersionStatistics<T> {
pub fn by_ip_version(&self, ip_version: IpVersion) -> &T {
match ip_version {
IpVersion::V4 => &self.ipv4,
IpVersion::V6 => &self.ipv6,
}
}
}

#[derive(Default)]
pub struct SocketWorkerStatistics {
pub requests: AtomicUsize,
pub responses_connect: AtomicUsize,
pub responses_announce: AtomicUsize,
pub responses_scrape: AtomicUsize,
pub responses_error: AtomicUsize,
pub bytes_received: AtomicUsize,
pub bytes_sent: AtomicUsize,
pub torrents: Vec<AtomicUsize>,
pub peers: Vec<AtomicUsize>,
}

impl Statistics {
pub fn new(num_swarm_workers: usize) -> Self {
Self {
requests_received: Default::default(),
responses_sent_connect: Default::default(),
responses_sent_announce: Default::default(),
responses_sent_scrape: Default::default(),
responses_sent_error: Default::default(),
bytes_received: Default::default(),
bytes_sent: Default::default(),
torrents: Self::create_atomic_usize_vec(num_swarm_workers),
peers: Self::create_atomic_usize_vec(num_swarm_workers),
}
}
pub type CachePaddedArc<T> = CachePadded<Arc<CachePadded<T>>>;

fn create_atomic_usize_vec(len: usize) -> Vec<AtomicUsize> {
::std::iter::repeat_with(AtomicUsize::default)
.take(len)
.collect()
}
#[derive(Default)]
pub struct SwarmWorkerStatistics {
pub torrents: AtomicUsize,
pub peers: AtomicUsize,
}

pub enum StatisticsMessage {
Ipv4PeerHistogram(Histogram<u64>),
Ipv6PeerHistogram(Histogram<u64>),
PeerAdded(PeerId),
PeerRemoved(PeerId),
}

#[derive(Clone)]
pub struct State {
pub access_list: Arc<AccessListArcSwap>,
pub statistics_ipv4: Arc<Statistics>,
pub statistics_ipv6: Arc<Statistics>,
pub server_start_instant: ServerStartInstant,
}

impl State {
pub fn new(num_swarm_workers: usize) -> Self {
impl Default for State {
fn default() -> Self {
Self {
access_list: Arc::new(AccessListArcSwap::default()),
statistics_ipv4: Arc::new(Statistics::new(num_swarm_workers)),
statistics_ipv6: Arc::new(Statistics::new(num_swarm_workers)),
server_start_instant: ServerStartInstant::new(),
}
}
}

#[cfg(test)]
mod tests {
use std::net::Ipv6Addr;
use std::{net::Ipv6Addr, num::NonZeroU16};

use crate::config::Config;

use super::*;

#[test]
fn test_peer_status_from_event_and_bytes_left() {
use crate::common::*;

use PeerStatus::*;

let f = PeerStatus::from_event_and_bytes_left;

assert_eq!(Stopped, f(AnnounceEvent::Stopped, NumberOfBytes::new(0)));
assert_eq!(Stopped, f(AnnounceEvent::Stopped, NumberOfBytes::new(1)));

assert_eq!(Seeding, f(AnnounceEvent::Started, NumberOfBytes::new(0)));
assert_eq!(Leeching, f(AnnounceEvent::Started, NumberOfBytes::new(1)));

assert_eq!(Seeding, f(AnnounceEvent::Completed, NumberOfBytes::new(0)));
assert_eq!(Leeching, f(AnnounceEvent::Completed, NumberOfBytes::new(1)));

assert_eq!(Seeding, f(AnnounceEvent::None, NumberOfBytes::new(0)));
assert_eq!(Leeching, f(AnnounceEvent::None, NumberOfBytes::new(1)));
}

// Assumes that announce response with maximum amount of ipv6 peers will
// be the longest
#[test]
Expand All @@ -273,7 +260,7 @@ mod tests {

let peers = ::std::iter::repeat(ResponsePeer {
ip_address: Ipv6AddrBytes(Ipv6Addr::new(1, 1, 1, 1, 1, 1, 1, 1).octets()),
port: Port::new(1),
port: Port::new(NonZeroU16::new(1).unwrap()),
})
.take(config.protocol.max_response_peers)
.collect();
Expand Down
31 changes: 21 additions & 10 deletions crates/udp/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@ use aquatic_common::access_list::update_access_list;
#[cfg(feature = "cpu-pinning")]
use aquatic_common::cpu_pinning::{pin_current_if_configured_to, WorkerIndex};
use aquatic_common::privileges::PrivilegeDropper;
use aquatic_common::ServerStartInstant;

use common::{
ConnectedRequestSender, ConnectedResponseSender, SocketWorkerIndex, State, SwarmWorkerIndex,
ConnectedRequestSender, ConnectedResponseSender, SocketWorkerIndex, State, Statistics,
SwarmWorkerIndex,
};
use config::Config;
use workers::socket::ConnectionValidator;
Expand All @@ -31,7 +31,8 @@ pub const APP_VERSION: &str = env!("CARGO_PKG_VERSION");
pub fn run(config: Config) -> ::anyhow::Result<()> {
let mut signals = Signals::new([SIGUSR1])?;

let state = State::new(config.swarm_workers);
let state = State::default();
let statistics = Statistics::new(&config);
let connection_validator = ConnectionValidator::new(&config)?;
let priv_dropper = PrivilegeDropper::new(config.privileges.clone(), config.socket_workers);
let mut join_handles = Vec::new();
Expand All @@ -46,8 +47,6 @@ pub fn run(config: Config) -> ::anyhow::Result<()> {

let (statistics_sender, statistics_receiver) = unbounded();

let server_start_instant = ServerStartInstant::new();

for i in 0..config.swarm_workers {
let (request_sender, request_receiver) = bounded(config.worker_channel_size);

Expand All @@ -68,6 +67,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> {
let request_receiver = request_receivers.remove(&i).unwrap().clone();
let response_sender = ConnectedResponseSender::new(response_senders.clone());
let statistics_sender = statistics_sender.clone();
let statistics = statistics.swarm[i].clone();

let handle = Builder::new()
.name(format!("swarm-{:02}", i + 1))
Expand All @@ -83,7 +83,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> {
let mut worker = SwarmWorker {
config,
state,
server_start_instant,
statistics,
request_receiver,
response_sender,
statistics_sender,
Expand All @@ -105,6 +105,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> {
ConnectedRequestSender::new(SocketWorkerIndex(i), request_senders.clone());
let response_receiver = response_receivers.remove(&i).unwrap();
let priv_dropper = priv_dropper.clone();
let statistics = statistics.socket[i].clone();

let handle = Builder::new()
.name(format!("socket-{:02}", i + 1))
Expand All @@ -118,10 +119,10 @@ pub fn run(config: Config) -> ::anyhow::Result<()> {
);

workers::socket::run_socket_worker(
state,
config,
state,
statistics,
connection_validator,
server_start_instant,
request_sender,
response_receiver,
priv_dropper,
Expand All @@ -147,7 +148,12 @@ pub fn run(config: Config) -> ::anyhow::Result<()> {
WorkerIndex::Util,
);

workers::statistics::run_statistics_worker(config, state, statistics_receiver)
workers::statistics::run_statistics_worker(
config,
state,
statistics,
statistics_receiver,
)
})
.with_context(|| "spawn statistics worker")?;

Expand Down Expand Up @@ -187,6 +193,11 @@ pub fn run(config: Config) -> ::anyhow::Result<()> {
.build()
.context("build prometheus recorder and exporter")?;

let recorder_handle = recorder.handle();

::metrics::set_global_recorder(recorder)
.context("set global metrics recorder")?;

::tokio::spawn(async move {
let mut interval = ::tokio::time::interval(Duration::from_secs(5));

Expand All @@ -195,7 +206,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> {

// Periodically render metrics to make sure
// idles are cleaned up
recorder.handle().render();
recorder_handle.render();
}
});

Expand Down
Loading

0 comments on commit 53af594

Please sign in to comment.