Skip to content

Commit

Permalink
Merge #784: Performance optimization: create a new torrent repository…
Browse files Browse the repository at this point in the history
… using `DashMap`

4030fd1 fix: torrent repository tests. DashMap is not ordered (Jose Celano)
1e76c17 chore: add dashmap cargo dep to cargp machete (Jose Celano)
00ee9db feat: [#565] new torrent repository implementation usind DashMap (Jose Celano)
78b46c4 chore(deps): add cargo dependency: dashmap (Jose Celano)

Pull request description:

  Relates to:

  - #778
  - #567 (comment)

  This PR adds a new torrent repository implementation where the outer collection for torrents uses a [DashMap](https://docs.rs/dashmap/latest/dashmap/).

  This is something @mickvandijke was working on this [PR](#645).

  There have been many changes. @da2ce7 has extracted a [package for the repositories](https://github.com/torrust/torrust-tracker/tree/develop/packages/torrent-repository).

  This PR adds a new repo using DashMap to the new package. However, it does not implement some of the extra features @mickvandijke added to the other [PR](#645). For example, it does not limit memory consumption. None of the other repos have that feature, so I suggest merging this PR and implementing that feature in the future for all repos.

  ### Why

  The current repository used in production is the one using a [SkipMap](https://docs.rs/crossbeam-skiplist/latest/crossbeam_skiplist/struct.SkipMap.html) data structure. DashMap was the first type we considered to allow adding new torrents in parallels.

  The implementation with DashMap has not been merged because it does not guarantee the order when you iterate over the torrents. The tracker API returns torrents ordered by InfoHash. To avoid breaking the API that PR would need to add a new data structure (kind of Index) to keep the torrent list ordered.

  This implementation helps us run performance tests with this option without spending much time fixing its limitations. It looks like the performance is similar to the SkiMap, so we don't need to use it in production now. We only need to keep it for benchmarking in the future if other versions are better.

  ### Benchmarking

  Running the Aquatic UDP load test, the DashMap looks slightly better.

  SkipMap:

  ```output
  Requests out: 396788.68/second
  Responses in: 357105.27/second
    - Connect responses:  176662.91
    - Announce responses: 176863.44
    - Scrape responses:   3578.91
    - Error responses:    0.00
  Peers per announce response: 0.00
  Announce responses per info hash:
    - p10: 1
    - p25: 1
    - p50: 1
    - p75: 1
    - p90: 2
    - p95: 3
    - p99: 105
    - p99.9: 287
    - p100: 351
  ```

  DashMap with initial capacity 0 (best result. On average is lower, similar to SkipMap):

  ```output
  Requests out: 410658.38/second
  Responses in: 365892.86/second
    - Connect responses:  181258.91
    - Announce responses: 181005.95
    - Scrape responses:   3628.00
    - Error responses:    0.00
  Peers per announce response: 0.00
  Announce responses per info hash:
    - p10: 1
    - p25: 1
    - p50: 1
    - p75: 1
    - p90: 2
    - p95: 3
    - p99: 104
    - p99.9: 295
    - p100: 363
  ```

  With Criterion:

  ![image](https://github.com/torrust/torrust-tracker/assets/58816/1e1fca2d-821d-4e2c-adf8-49e055758bd0)

  ![image](https://github.com/torrust/torrust-tracker/assets/58816/fcb9a32f-c4f1-4ffd-9797-4a74fc000336)

  ![image](https://github.com/torrust/torrust-tracker/assets/58816/efe2d788-31ac-4997-82f6-d022aa8f79a0)

  ![image](https://github.com/torrust/torrust-tracker/assets/58816/f64a4e5a-a363-48cd-87d9-78162cda11d4)

  ### Conclusion

  From my point of view, other [performance optimisations](#774) have more potential than this, considering this implementation is not finished. The changes needed to finish this implementation will probably decrease the performance obtained in this benchmarking. In the future, we can review this option if we change the [tracker API behaviour to getting all torrents](#775).

ACKs for top commit:
  josecelano:
    ACK 4030fd1

Tree-SHA512: 1a6c56f3aecb34fc40c401597efe1663c3cc66903f2d27bf8f2bbc6b058080d487a89b705c224657aaa6059f1c2a8597583e636e30727f9293bb43f460441415
  • Loading branch information
josecelano committed Apr 9, 2024
2 parents af52045 + 4030fd1 commit b5fb03b
Show file tree
Hide file tree
Showing 10 changed files with 185 additions and 11 deletions.
15 changes: 15 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ clap = { version = "4", features = ["derive", "env"] }
colored = "2"
config = "0"
crossbeam-skiplist = "0.1"
dashmap = "5.5.3"
derive_more = "0"
fern = "0"
futures = "0"
Expand Down Expand Up @@ -77,7 +78,7 @@ url = "2"
uuid = { version = "1", features = ["v4"] }

[package.metadata.cargo-machete]
ignored = ["serde_bytes", "crossbeam-skiplist"]
ignored = ["serde_bytes", "crossbeam-skiplist", "dashmap"]

[dev-dependencies]
local-ip-address = "0"
Expand Down
1 change: 1 addition & 0 deletions cSpell.json
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@
"Weidendorfer",
"Werror",
"whitespaces",
"Xacrimon",
"XBTT",
"Xdebug",
"Xeon",
Expand Down
1 change: 1 addition & 0 deletions packages/torrent-repository/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ version.workspace = true

[dependencies]
crossbeam-skiplist = "0.1"
dashmap = "5.5.3"
futures = "0.3.29"
tokio = { version = "1", features = ["macros", "net", "rt-multi-thread", "signal", "sync"] }
torrust-tracker-clock = { version = "3.0.0-alpha.12-develop", path = "../clock" }
Expand Down
23 changes: 21 additions & 2 deletions packages/torrent-repository/benches/repository_benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ mod helpers;

use criterion::{criterion_group, criterion_main, Criterion};
use torrust_tracker_torrent_repository::{
TorrentsRwLockStd, TorrentsRwLockStdMutexStd, TorrentsRwLockStdMutexTokio, TorrentsRwLockTokio, TorrentsRwLockTokioMutexStd,
TorrentsRwLockTokioMutexTokio, TorrentsSkipMapMutexStd,
TorrentsDashMapMutexStd, TorrentsRwLockStd, TorrentsRwLockStdMutexStd, TorrentsRwLockStdMutexTokio, TorrentsRwLockTokio,
TorrentsRwLockTokioMutexStd, TorrentsRwLockTokioMutexTokio, TorrentsSkipMapMutexStd,
};

use crate::helpers::{asyn, sync};
Expand Down Expand Up @@ -49,6 +49,10 @@ fn add_one_torrent(c: &mut Criterion) {
b.iter_custom(sync::add_one_torrent::<TorrentsSkipMapMutexStd, _>);
});

group.bench_function("DashMapMutexStd", |b| {
b.iter_custom(sync::add_one_torrent::<TorrentsDashMapMutexStd, _>);
});

group.finish();
}

Expand Down Expand Up @@ -98,6 +102,11 @@ fn add_multiple_torrents_in_parallel(c: &mut Criterion) {
.iter_custom(|iters| sync::add_multiple_torrents_in_parallel::<TorrentsSkipMapMutexStd, _>(&rt, iters, None));
});

group.bench_function("DashMapMutexStd", |b| {
b.to_async(&rt)
.iter_custom(|iters| sync::add_multiple_torrents_in_parallel::<TorrentsDashMapMutexStd, _>(&rt, iters, None));
});

group.finish();
}

Expand Down Expand Up @@ -147,6 +156,11 @@ fn update_one_torrent_in_parallel(c: &mut Criterion) {
.iter_custom(|iters| sync::update_one_torrent_in_parallel::<TorrentsSkipMapMutexStd, _>(&rt, iters, None));
});

group.bench_function("DashMapMutexStd", |b| {
b.to_async(&rt)
.iter_custom(|iters| sync::update_one_torrent_in_parallel::<TorrentsDashMapMutexStd, _>(&rt, iters, None));
});

group.finish();
}

Expand Down Expand Up @@ -197,6 +211,11 @@ fn update_multiple_torrents_in_parallel(c: &mut Criterion) {
.iter_custom(|iters| sync::update_multiple_torrents_in_parallel::<TorrentsSkipMapMutexStd, _>(&rt, iters, None));
});

group.bench_function("DashMapMutexStd", |b| {
b.to_async(&rt)
.iter_custom(|iters| sync::update_multiple_torrents_in_parallel::<TorrentsDashMapMutexStd, _>(&rt, iters, None));
});

group.finish();
}

Expand Down
2 changes: 2 additions & 0 deletions packages/torrent-repository/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::sync::Arc;

use repository::dash_map_mutex_std::XacrimonDashMap;
use repository::rw_lock_std::RwLockStd;
use repository::rw_lock_tokio::RwLockTokio;
use repository::skip_map_mutex_std::CrossbeamSkipList;
Expand All @@ -20,6 +21,7 @@ pub type TorrentsRwLockTokioMutexStd = RwLockTokio<EntryMutexStd>;
pub type TorrentsRwLockTokioMutexTokio = RwLockTokio<EntryMutexTokio>;

pub type TorrentsSkipMapMutexStd = CrossbeamSkipList<EntryMutexStd>;
pub type TorrentsDashMapMutexStd = XacrimonDashMap<EntryMutexStd>;

/// This code needs to be copied into each crate.
/// Working version, for production.
Expand Down
106 changes: 106 additions & 0 deletions packages/torrent-repository/src/repository/dash_map_mutex_std.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
use std::collections::BTreeMap;
use std::sync::Arc;

use dashmap::DashMap;
use torrust_tracker_configuration::TrackerPolicy;
use torrust_tracker_primitives::info_hash::InfoHash;
use torrust_tracker_primitives::pagination::Pagination;
use torrust_tracker_primitives::swarm_metadata::SwarmMetadata;
use torrust_tracker_primitives::torrent_metrics::TorrentsMetrics;
use torrust_tracker_primitives::{peer, DurationSinceUnixEpoch, PersistentTorrents};

use super::Repository;
use crate::entry::{Entry, EntrySync};
use crate::{EntryMutexStd, EntrySingle};

#[derive(Default, Debug)]
pub struct XacrimonDashMap<T> {
pub torrents: DashMap<InfoHash, T>,
}

impl Repository<EntryMutexStd> for XacrimonDashMap<EntryMutexStd>
where
EntryMutexStd: EntrySync,
EntrySingle: Entry,
{
fn update_torrent_with_peer_and_get_stats(&self, info_hash: &InfoHash, peer: &peer::Peer) -> (bool, SwarmMetadata) {
if let Some(entry) = self.torrents.get(info_hash) {
entry.insert_or_update_peer_and_get_stats(peer)
} else {
let _unused = self.torrents.insert(*info_hash, Arc::default());

match self.torrents.get(info_hash) {
Some(entry) => entry.insert_or_update_peer_and_get_stats(peer),
None => (false, SwarmMetadata::zeroed()),
}
}
}

fn get(&self, key: &InfoHash) -> Option<EntryMutexStd> {
let maybe_entry = self.torrents.get(key);
maybe_entry.map(|entry| entry.clone())
}

fn get_metrics(&self) -> TorrentsMetrics {
let mut metrics = TorrentsMetrics::default();

for entry in &self.torrents {
let stats = entry.value().lock().expect("it should get a lock").get_stats();
metrics.complete += u64::from(stats.complete);
metrics.downloaded += u64::from(stats.downloaded);
metrics.incomplete += u64::from(stats.incomplete);
metrics.torrents += 1;
}

metrics
}

fn get_paginated(&self, pagination: Option<&Pagination>) -> Vec<(InfoHash, EntryMutexStd)> {
match pagination {
Some(pagination) => self
.torrents
.iter()
.skip(pagination.offset as usize)
.take(pagination.limit as usize)
.map(|entry| (*entry.key(), entry.value().clone()))
.collect(),
None => self
.torrents
.iter()
.map(|entry| (*entry.key(), entry.value().clone()))
.collect(),
}
}

fn import_persistent(&self, persistent_torrents: &PersistentTorrents) {
for (info_hash, completed) in persistent_torrents {
if self.torrents.contains_key(info_hash) {
continue;
}

let entry = EntryMutexStd::new(
EntrySingle {
peers: BTreeMap::default(),
downloaded: *completed,
}
.into(),
);

self.torrents.insert(*info_hash, entry);
}
}

fn remove(&self, key: &InfoHash) -> Option<EntryMutexStd> {
self.torrents.remove(key).map(|(_key, value)| value.clone())
}

fn remove_inactive_peers(&self, current_cutoff: DurationSinceUnixEpoch) {
for entry in &self.torrents {
entry.value().remove_inactive_peers(current_cutoff);
}
}

fn remove_peerless_torrents(&self, policy: &TrackerPolicy) {
self.torrents.retain(|_, entry| entry.is_good(policy));
}
}
1 change: 1 addition & 0 deletions packages/torrent-repository/src/repository/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use torrust_tracker_primitives::swarm_metadata::SwarmMetadata;
use torrust_tracker_primitives::torrent_metrics::TorrentsMetrics;
use torrust_tracker_primitives::{peer, DurationSinceUnixEpoch, PersistentTorrents};

pub mod dash_map_mutex_std;
pub mod rw_lock_std;
pub mod rw_lock_std_mutex_std;
pub mod rw_lock_std_mutex_tokio;
Expand Down
20 changes: 18 additions & 2 deletions packages/torrent-repository/tests/common/repo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ use torrust_tracker_primitives::torrent_metrics::TorrentsMetrics;
use torrust_tracker_primitives::{peer, DurationSinceUnixEpoch, PersistentTorrents};
use torrust_tracker_torrent_repository::repository::{Repository as _, RepositoryAsync as _};
use torrust_tracker_torrent_repository::{
EntrySingle, TorrentsRwLockStd, TorrentsRwLockStdMutexStd, TorrentsRwLockStdMutexTokio, TorrentsRwLockTokio,
TorrentsRwLockTokioMutexStd, TorrentsRwLockTokioMutexTokio, TorrentsSkipMapMutexStd,
EntrySingle, TorrentsDashMapMutexStd, TorrentsRwLockStd, TorrentsRwLockStdMutexStd, TorrentsRwLockStdMutexTokio,
TorrentsRwLockTokio, TorrentsRwLockTokioMutexStd, TorrentsRwLockTokioMutexTokio, TorrentsSkipMapMutexStd,
};

#[derive(Debug)]
Expand All @@ -19,6 +19,7 @@ pub(crate) enum Repo {
RwLockTokioMutexStd(TorrentsRwLockTokioMutexStd),
RwLockTokioMutexTokio(TorrentsRwLockTokioMutexTokio),
SkipMapMutexStd(TorrentsSkipMapMutexStd),
DashMapMutexStd(TorrentsDashMapMutexStd),
}

impl Repo {
Expand All @@ -31,6 +32,7 @@ impl Repo {
Repo::RwLockTokioMutexStd(repo) => Some(repo.get(key).await?.lock().unwrap().clone()),
Repo::RwLockTokioMutexTokio(repo) => Some(repo.get(key).await?.lock().await.clone()),
Repo::SkipMapMutexStd(repo) => Some(repo.get(key)?.lock().unwrap().clone()),
Repo::DashMapMutexStd(repo) => Some(repo.get(key)?.lock().unwrap().clone()),
}
}

Expand All @@ -43,6 +45,7 @@ impl Repo {
Repo::RwLockTokioMutexStd(repo) => repo.get_metrics().await,
Repo::RwLockTokioMutexTokio(repo) => repo.get_metrics().await,
Repo::SkipMapMutexStd(repo) => repo.get_metrics(),
Repo::DashMapMutexStd(repo) => repo.get_metrics(),
}
}

Expand Down Expand Up @@ -82,6 +85,11 @@ impl Repo {
.iter()
.map(|(i, t)| (*i, t.lock().expect("it should get a lock").clone()))
.collect(),
Repo::DashMapMutexStd(repo) => repo
.get_paginated(pagination)
.iter()
.map(|(i, t)| (*i, t.lock().expect("it should get a lock").clone()))
.collect(),
}
}

Expand All @@ -94,6 +102,7 @@ impl Repo {
Repo::RwLockTokioMutexStd(repo) => repo.import_persistent(persistent_torrents).await,
Repo::RwLockTokioMutexTokio(repo) => repo.import_persistent(persistent_torrents).await,
Repo::SkipMapMutexStd(repo) => repo.import_persistent(persistent_torrents),
Repo::DashMapMutexStd(repo) => repo.import_persistent(persistent_torrents),
}
}

Expand All @@ -106,6 +115,7 @@ impl Repo {
Repo::RwLockTokioMutexStd(repo) => Some(repo.remove(key).await?.lock().unwrap().clone()),
Repo::RwLockTokioMutexTokio(repo) => Some(repo.remove(key).await?.lock().await.clone()),
Repo::SkipMapMutexStd(repo) => Some(repo.remove(key)?.lock().unwrap().clone()),
Repo::DashMapMutexStd(repo) => Some(repo.remove(key)?.lock().unwrap().clone()),
}
}

Expand All @@ -118,6 +128,7 @@ impl Repo {
Repo::RwLockTokioMutexStd(repo) => repo.remove_inactive_peers(current_cutoff).await,
Repo::RwLockTokioMutexTokio(repo) => repo.remove_inactive_peers(current_cutoff).await,
Repo::SkipMapMutexStd(repo) => repo.remove_inactive_peers(current_cutoff),
Repo::DashMapMutexStd(repo) => repo.remove_inactive_peers(current_cutoff),
}
}

Expand All @@ -130,6 +141,7 @@ impl Repo {
Repo::RwLockTokioMutexStd(repo) => repo.remove_peerless_torrents(policy).await,
Repo::RwLockTokioMutexTokio(repo) => repo.remove_peerless_torrents(policy).await,
Repo::SkipMapMutexStd(repo) => repo.remove_peerless_torrents(policy),
Repo::DashMapMutexStd(repo) => repo.remove_peerless_torrents(policy),
}
}

Expand All @@ -146,6 +158,7 @@ impl Repo {
Repo::RwLockTokioMutexStd(repo) => repo.update_torrent_with_peer_and_get_stats(info_hash, peer).await,
Repo::RwLockTokioMutexTokio(repo) => repo.update_torrent_with_peer_and_get_stats(info_hash, peer).await,
Repo::SkipMapMutexStd(repo) => repo.update_torrent_with_peer_and_get_stats(info_hash, peer),
Repo::DashMapMutexStd(repo) => repo.update_torrent_with_peer_and_get_stats(info_hash, peer),
}
}

Expand All @@ -172,6 +185,9 @@ impl Repo {
Repo::SkipMapMutexStd(repo) => {
repo.torrents.insert(*info_hash, torrent.into());
}
Repo::DashMapMutexStd(repo) => {
repo.torrents.insert(*info_hash, torrent.into());
}
};
self.get(info_hash).await
}
Expand Down
Loading

0 comments on commit b5fb03b

Please sign in to comment.