Skip to content

Commit

Permalink
feat: new torrent repository using crossbeam_skiplist::SkipMap
Browse files Browse the repository at this point in the history
SkipMap is an ordered map based on a lock-free skip list.
It's al alternative to BTreeMap which supports concurrent access across
multiple threads.

One of the performance problems with the current solution is we can only
add one torrent at the time because threads need to lock the whole
BTreeMap. The SkipMap should avoid that problem.

More info about SkiMap:

https://docs.rs/crossbeam-skiplist/latest/crossbeam_skiplist/struct.SkipMap.html#method.remove

The aquatic UDP load test was executed with the current implementation
and the new one:

Current Implementation:

Requests out: 397287.37/second
Responses in: 357549.15/second
  - Connect responses:  177073.94
  - Announce responses: 176905.36
  - Scrape responses:   3569.85
  - Error responses:    0.00
Peers per announce response: 0.00
Announce responses per info hash:
  - p10: 1
  - p25: 1
  - p50: 1
  - p75: 1
  - p90: 2
  - p95: 3
  - p99: 104
  - p99.9: 287
  - p100: 371

New Implementation:

Requests out: 396788.68/second
Responses in: 357105.27/second
  - Connect responses:  176662.91
  - Announce responses: 176863.44
  - Scrape responses:   3578.91
  - Error responses:    0.00
Peers per announce response: 0.00
Announce responses per info hash:
  - p10: 1
  - p25: 1
  - p50: 1
  - p75: 1
  - p90: 2
  - p95: 3
  - p99: 105
  - p99.9: 287
  - p100: 351

The result is pretty similar but the benchmarking for the repository
using criterios shows that this implementations is a litle bit better
than the current one.
  • Loading branch information
josecelano committed Apr 5, 2024
1 parent 608585e commit 642d6be
Show file tree
Hide file tree
Showing 7 changed files with 137 additions and 5 deletions.
1 change: 1 addition & 0 deletions cSpell.json
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@
"Shareaza",
"sharktorrent",
"SHLVL",
"skiplist",
"socketaddr",
"sqllite",
"subsec",
Expand Down
5 changes: 3 additions & 2 deletions packages/torrent-repository/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,12 @@ rust-version.workspace = true
version.workspace = true

[dependencies]
crossbeam-skiplist = "0.1"
futures = "0.3.29"
tokio = { version = "1", features = ["macros", "net", "rt-multi-thread", "signal", "sync"] }
torrust-tracker-primitives = { version = "3.0.0-alpha.12-develop", path = "../primitives" }
torrust-tracker-configuration = { version = "3.0.0-alpha.12-develop", path = "../configuration" }
torrust-tracker-clock = { version = "3.0.0-alpha.12-develop", path = "../clock" }
torrust-tracker-configuration = { version = "3.0.0-alpha.12-develop", path = "../configuration" }
torrust-tracker-primitives = { version = "3.0.0-alpha.12-develop", path = "../primitives" }

[dev-dependencies]
criterion = { version = "0", features = ["async_tokio"] }
Expand Down
21 changes: 20 additions & 1 deletion packages/torrent-repository/benches/repository_benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ mod helpers;
use criterion::{criterion_group, criterion_main, Criterion};
use torrust_tracker_torrent_repository::{
TorrentsRwLockStd, TorrentsRwLockStdMutexStd, TorrentsRwLockStdMutexTokio, TorrentsRwLockTokio, TorrentsRwLockTokioMutexStd,
TorrentsRwLockTokioMutexTokio,
TorrentsRwLockTokioMutexTokio, TorrentsSkipMapMutexStd,
};

use crate::helpers::{asyn, sync};
Expand Down Expand Up @@ -45,6 +45,10 @@ fn add_one_torrent(c: &mut Criterion) {
.iter_custom(asyn::add_one_torrent::<TorrentsRwLockTokioMutexTokio, _>);
});

group.bench_function("SkipMapMutexStd", |b| {
b.iter_custom(sync::add_one_torrent::<TorrentsSkipMapMutexStd, _>);
});

group.finish();
}

Expand Down Expand Up @@ -89,6 +93,11 @@ fn add_multiple_torrents_in_parallel(c: &mut Criterion) {
.iter_custom(|iters| asyn::add_multiple_torrents_in_parallel::<TorrentsRwLockTokioMutexTokio, _>(&rt, iters, None));
});

group.bench_function("SkipMapMutexStd", |b| {
b.to_async(&rt)
.iter_custom(|iters| sync::add_multiple_torrents_in_parallel::<TorrentsSkipMapMutexStd, _>(&rt, iters, None));
});

group.finish();
}

Expand Down Expand Up @@ -133,6 +142,11 @@ fn update_one_torrent_in_parallel(c: &mut Criterion) {
.iter_custom(|iters| asyn::update_one_torrent_in_parallel::<TorrentsRwLockTokioMutexTokio, _>(&rt, iters, None));
});

group.bench_function("SkipMapMutexStd", |b| {
b.to_async(&rt)
.iter_custom(|iters| sync::update_one_torrent_in_parallel::<TorrentsSkipMapMutexStd, _>(&rt, iters, None));
});

group.finish();
}

Expand Down Expand Up @@ -178,6 +192,11 @@ fn update_multiple_torrents_in_parallel(c: &mut Criterion) {
});
});

group.bench_function("SkipMapMutexStd", |b| {
b.to_async(&rt)
.iter_custom(|iters| sync::update_multiple_torrents_in_parallel::<TorrentsSkipMapMutexStd, _>(&rt, iters, None));
});

group.finish();
}

Expand Down
3 changes: 3 additions & 0 deletions packages/torrent-repository/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::sync::Arc;

use repository::skip_map_mutex_std::CrossbeamSkipList;
use torrust_tracker_clock::clock;

pub mod entry;
Expand All @@ -16,6 +17,8 @@ pub type TorrentsRwLockTokio = repository::RwLockTokio<EntrySingle>;
pub type TorrentsRwLockTokioMutexStd = repository::RwLockTokio<EntryMutexStd>;
pub type TorrentsRwLockTokioMutexTokio = repository::RwLockTokio<EntryMutexTokio>;

pub type TorrentsSkipMapMutexStd = CrossbeamSkipList<EntryMutexStd>;

/// This code needs to be copied into each crate.
/// Working version, for production.
#[cfg(not(test))]
Expand Down
1 change: 1 addition & 0 deletions packages/torrent-repository/src/repository/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ pub mod rw_lock_std_mutex_tokio;
pub mod rw_lock_tokio;
pub mod rw_lock_tokio_mutex_std;
pub mod rw_lock_tokio_mutex_tokio;
pub mod skip_map_mutex_std;

use std::fmt::Debug;

Expand Down
106 changes: 106 additions & 0 deletions packages/torrent-repository/src/repository/skip_map_mutex_std.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
use std::collections::BTreeMap;
use std::sync::Arc;

use crossbeam_skiplist::SkipMap;
use torrust_tracker_configuration::TrackerPolicy;
use torrust_tracker_primitives::info_hash::InfoHash;
use torrust_tracker_primitives::pagination::Pagination;
use torrust_tracker_primitives::swarm_metadata::SwarmMetadata;
use torrust_tracker_primitives::torrent_metrics::TorrentsMetrics;
use torrust_tracker_primitives::{peer, DurationSinceUnixEpoch, PersistentTorrents};

use super::Repository;
use crate::entry::{Entry, EntrySync};
use crate::{EntryMutexStd, EntrySingle};

#[derive(Default, Debug)]
pub struct CrossbeamSkipList<T> {
torrents: SkipMap<InfoHash, T>,
}

impl Repository<EntryMutexStd> for CrossbeamSkipList<EntryMutexStd>
where
EntryMutexStd: EntrySync,
EntrySingle: Entry,
{
fn update_torrent_with_peer_and_get_stats(&self, info_hash: &InfoHash, peer: &peer::Peer) -> (bool, SwarmMetadata) {
let entry = self.torrents.get_or_insert(*info_hash, Arc::default());
entry.value().insert_or_update_peer_and_get_stats(peer)
}

fn get(&self, key: &InfoHash) -> Option<EntryMutexStd> {
let maybe_entry = self.torrents.get(key);
maybe_entry.map(|entry| entry.value().clone())
}

fn get_metrics(&self) -> TorrentsMetrics {
let mut metrics = TorrentsMetrics::default();

for entry in &self.torrents {
let stats = entry.value().lock().expect("it should get a lock").get_stats();
metrics.complete += u64::from(stats.complete);
metrics.downloaded += u64::from(stats.downloaded);
metrics.incomplete += u64::from(stats.incomplete);
metrics.torrents += 1;
}

metrics
}

fn get_paginated(&self, pagination: Option<&Pagination>) -> Vec<(InfoHash, EntryMutexStd)> {
match pagination {
Some(pagination) => self
.torrents
.iter()
.skip(pagination.offset as usize)
.take(pagination.limit as usize)
.map(|entry| (*entry.key(), entry.value().clone()))
.collect(),
None => self

Check warning on line 59 in packages/torrent-repository/src/repository/skip_map_mutex_std.rs

View check run for this annotation

Codecov / codecov/patch

packages/torrent-repository/src/repository/skip_map_mutex_std.rs#L59

Added line #L59 was not covered by tests
.torrents
.iter()
.map(|entry| (*entry.key(), entry.value().clone()))

Check warning on line 62 in packages/torrent-repository/src/repository/skip_map_mutex_std.rs

View check run for this annotation

Codecov / codecov/patch

packages/torrent-repository/src/repository/skip_map_mutex_std.rs#L62

Added line #L62 was not covered by tests
.collect(),
}
}

fn import_persistent(&self, persistent_torrents: &PersistentTorrents) {
for (info_hash, completed) in persistent_torrents {
if self.torrents.contains_key(info_hash) {
continue;
}

let entry = EntryMutexStd::new(
EntrySingle {
peers: BTreeMap::default(),
downloaded: *completed,
}
.into(),
);

// Since SkipMap is lock-free the torrent could have been inserted
// after checking if it exists.
self.torrents.get_or_insert(*info_hash, entry);
}
}

fn remove(&self, key: &InfoHash) -> Option<EntryMutexStd> {
self.torrents.remove(key).map(|entry| entry.value().clone())
}

fn remove_inactive_peers(&self, current_cutoff: DurationSinceUnixEpoch) {
for entry in &self.torrents {
entry.value().remove_inactive_peers(current_cutoff);
}
}

Check warning on line 95 in packages/torrent-repository/src/repository/skip_map_mutex_std.rs

View check run for this annotation

Codecov / codecov/patch

packages/torrent-repository/src/repository/skip_map_mutex_std.rs#L91-L95

Added lines #L91 - L95 were not covered by tests

fn remove_peerless_torrents(&self, policy: &TrackerPolicy) {
for entry in &self.torrents {
if entry.value().is_good(policy) {

Check warning on line 99 in packages/torrent-repository/src/repository/skip_map_mutex_std.rs

View check run for this annotation

Codecov / codecov/patch

packages/torrent-repository/src/repository/skip_map_mutex_std.rs#L97-L99

Added lines #L97 - L99 were not covered by tests
continue;
}

entry.remove();
}
}

Check warning on line 105 in packages/torrent-repository/src/repository/skip_map_mutex_std.rs

View check run for this annotation

Codecov / codecov/patch

packages/torrent-repository/src/repository/skip_map_mutex_std.rs#L103-L105

Added lines #L103 - L105 were not covered by tests
}
5 changes: 3 additions & 2 deletions src/core/torrent/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
//! Peer that don not have a full copy of the torrent data are called "leechers".
//!

use torrust_tracker_torrent_repository::TorrentsRwLockStdMutexStd;
use torrust_tracker_torrent_repository::TorrentsSkipMapMutexStd;

pub type Torrents = TorrentsRwLockStdMutexStd; // Currently Used
//pub type Torrents = TorrentsRwLockStdMutexStd; // Currently Used
pub type Torrents = TorrentsSkipMapMutexStd; // Currently Used

0 comments on commit 642d6be

Please sign in to comment.