Skip to content

Commit

Permalink
test: add tests for new torrent repository using SkipMap
Browse files Browse the repository at this point in the history
  • Loading branch information
josecelano committed Apr 8, 2024
1 parent 0989285 commit 12f54e7
Show file tree
Hide file tree
Showing 3 changed files with 193 additions and 82 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use crate::{EntryMutexStd, EntrySingle};

#[derive(Default, Debug)]
pub struct CrossbeamSkipList<T> {
torrents: SkipMap<InfoHash, T>,
pub torrents: SkipMap<InfoHash, T>,
}

impl Repository<EntryMutexStd> for CrossbeamSkipList<EntryMutexStd>
Expand Down
165 changes: 98 additions & 67 deletions packages/torrent-repository/tests/common/repo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,141 +7,172 @@ use torrust_tracker_primitives::{peer, DurationSinceUnixEpoch, PersistentTorrent
use torrust_tracker_torrent_repository::repository::{Repository as _, RepositoryAsync as _};
use torrust_tracker_torrent_repository::{
EntrySingle, TorrentsRwLockStd, TorrentsRwLockStdMutexStd, TorrentsRwLockStdMutexTokio, TorrentsRwLockTokio,
TorrentsRwLockTokioMutexStd, TorrentsRwLockTokioMutexTokio,
TorrentsRwLockTokioMutexStd, TorrentsRwLockTokioMutexTokio, TorrentsSkipMapMutexStd,
};

#[derive(Debug)]
pub(crate) enum Repo {
Std(TorrentsRwLockStd),
StdMutexStd(TorrentsRwLockStdMutexStd),
StdMutexTokio(TorrentsRwLockStdMutexTokio),
Tokio(TorrentsRwLockTokio),
TokioMutexStd(TorrentsRwLockTokioMutexStd),
TokioMutexTokio(TorrentsRwLockTokioMutexTokio),
RwLockStd(TorrentsRwLockStd),
RwLockStdMutexStd(TorrentsRwLockStdMutexStd),
RwLockStdMutexTokio(TorrentsRwLockStdMutexTokio),
RwLockTokio(TorrentsRwLockTokio),
RwLockTokioMutexStd(TorrentsRwLockTokioMutexStd),
RwLockTokioMutexTokio(TorrentsRwLockTokioMutexTokio),
SkipMapMutexStd(TorrentsSkipMapMutexStd),
}

impl Repo {
pub(crate) async fn get(&self, key: &InfoHash) -> Option<EntrySingle> {
match self {
Repo::Std(repo) => repo.get(key),
Repo::StdMutexStd(repo) => Some(repo.get(key)?.lock().unwrap().clone()),
Repo::StdMutexTokio(repo) => Some(repo.get(key).await?.lock().await.clone()),
Repo::Tokio(repo) => repo.get(key).await,
Repo::TokioMutexStd(repo) => Some(repo.get(key).await?.lock().unwrap().clone()),
Repo::TokioMutexTokio(repo) => Some(repo.get(key).await?.lock().await.clone()),
Repo::RwLockStd(repo) => repo.get(key),
Repo::RwLockStdMutexStd(repo) => Some(repo.get(key)?.lock().unwrap().clone()),
Repo::RwLockStdMutexTokio(repo) => Some(repo.get(key).await?.lock().await.clone()),
Repo::RwLockTokio(repo) => repo.get(key).await,
Repo::RwLockTokioMutexStd(repo) => Some(repo.get(key).await?.lock().unwrap().clone()),
Repo::RwLockTokioMutexTokio(repo) => Some(repo.get(key).await?.lock().await.clone()),
Repo::SkipMapMutexStd(repo) => Some(repo.get(key)?.lock().unwrap().clone()),
}
}

pub(crate) async fn get_metrics(&self) -> TorrentsMetrics {
match self {
Repo::Std(repo) => repo.get_metrics(),
Repo::StdMutexStd(repo) => repo.get_metrics(),
Repo::StdMutexTokio(repo) => repo.get_metrics().await,
Repo::Tokio(repo) => repo.get_metrics().await,
Repo::TokioMutexStd(repo) => repo.get_metrics().await,
Repo::TokioMutexTokio(repo) => repo.get_metrics().await,
Repo::RwLockStd(repo) => repo.get_metrics(),
Repo::RwLockStdMutexStd(repo) => repo.get_metrics(),
Repo::RwLockStdMutexTokio(repo) => repo.get_metrics().await,
Repo::RwLockTokio(repo) => repo.get_metrics().await,
Repo::RwLockTokioMutexStd(repo) => repo.get_metrics().await,
Repo::RwLockTokioMutexTokio(repo) => repo.get_metrics().await,
Repo::SkipMapMutexStd(repo) => repo.get_metrics(),
}
}

pub(crate) async fn get_paginated(&self, pagination: Option<&Pagination>) -> Vec<(InfoHash, EntrySingle)> {
match self {
Repo::Std(repo) => repo.get_paginated(pagination),
Repo::StdMutexStd(repo) => repo
Repo::RwLockStd(repo) => repo.get_paginated(pagination),
Repo::RwLockStdMutexStd(repo) => repo
.get_paginated(pagination)
.iter()
.map(|(i, t)| (*i, t.lock().expect("it should get a lock").clone()))
.collect(),
Repo::StdMutexTokio(repo) => {
Repo::RwLockStdMutexTokio(repo) => {
let mut v: Vec<(InfoHash, EntrySingle)> = vec![];

for (i, t) in repo.get_paginated(pagination).await {
v.push((i, t.lock().await.clone()));
}
v
}
Repo::Tokio(repo) => repo.get_paginated(pagination).await,
Repo::TokioMutexStd(repo) => repo
Repo::RwLockTokio(repo) => repo.get_paginated(pagination).await,
Repo::RwLockTokioMutexStd(repo) => repo
.get_paginated(pagination)
.await
.iter()
.map(|(i, t)| (*i, t.lock().expect("it should get a lock").clone()))
.collect(),
Repo::TokioMutexTokio(repo) => {
Repo::RwLockTokioMutexTokio(repo) => {
let mut v: Vec<(InfoHash, EntrySingle)> = vec![];

for (i, t) in repo.get_paginated(pagination).await {
v.push((i, t.lock().await.clone()));
}
v
}
Repo::SkipMapMutexStd(repo) => repo
.get_paginated(pagination)
.iter()
.map(|(i, t)| (*i, t.lock().expect("it should get a lock").clone()))
.collect(),
}
}

pub(crate) async fn import_persistent(&self, persistent_torrents: &PersistentTorrents) {
match self {
Repo::Std(repo) => repo.import_persistent(persistent_torrents),
Repo::StdMutexStd(repo) => repo.import_persistent(persistent_torrents),
Repo::StdMutexTokio(repo) => repo.import_persistent(persistent_torrents).await,
Repo::Tokio(repo) => repo.import_persistent(persistent_torrents).await,
Repo::TokioMutexStd(repo) => repo.import_persistent(persistent_torrents).await,
Repo::TokioMutexTokio(repo) => repo.import_persistent(persistent_torrents).await,
Repo::RwLockStd(repo) => repo.import_persistent(persistent_torrents),
Repo::RwLockStdMutexStd(repo) => repo.import_persistent(persistent_torrents),
Repo::RwLockStdMutexTokio(repo) => repo.import_persistent(persistent_torrents).await,
Repo::RwLockTokio(repo) => repo.import_persistent(persistent_torrents).await,
Repo::RwLockTokioMutexStd(repo) => repo.import_persistent(persistent_torrents).await,
Repo::RwLockTokioMutexTokio(repo) => repo.import_persistent(persistent_torrents).await,
Repo::SkipMapMutexStd(repo) => repo.import_persistent(persistent_torrents),
}
}

pub(crate) async fn remove(&self, key: &InfoHash) -> Option<EntrySingle> {
match self {
Repo::Std(repo) => repo.remove(key),
Repo::StdMutexStd(repo) => Some(repo.remove(key)?.lock().unwrap().clone()),
Repo::StdMutexTokio(repo) => Some(repo.remove(key).await?.lock().await.clone()),
Repo::Tokio(repo) => repo.remove(key).await,
Repo::TokioMutexStd(repo) => Some(repo.remove(key).await?.lock().unwrap().clone()),
Repo::TokioMutexTokio(repo) => Some(repo.remove(key).await?.lock().await.clone()),
Repo::RwLockStd(repo) => repo.remove(key),
Repo::RwLockStdMutexStd(repo) => Some(repo.remove(key)?.lock().unwrap().clone()),
Repo::RwLockStdMutexTokio(repo) => Some(repo.remove(key).await?.lock().await.clone()),
Repo::RwLockTokio(repo) => repo.remove(key).await,
Repo::RwLockTokioMutexStd(repo) => Some(repo.remove(key).await?.lock().unwrap().clone()),
Repo::RwLockTokioMutexTokio(repo) => Some(repo.remove(key).await?.lock().await.clone()),
Repo::SkipMapMutexStd(repo) => Some(repo.remove(key)?.lock().unwrap().clone()),
}
}

pub(crate) async fn remove_inactive_peers(&self, current_cutoff: DurationSinceUnixEpoch) {
match self {
Repo::Std(repo) => repo.remove_inactive_peers(current_cutoff),
Repo::StdMutexStd(repo) => repo.remove_inactive_peers(current_cutoff),
Repo::StdMutexTokio(repo) => repo.remove_inactive_peers(current_cutoff).await,
Repo::Tokio(repo) => repo.remove_inactive_peers(current_cutoff).await,
Repo::TokioMutexStd(repo) => repo.remove_inactive_peers(current_cutoff).await,
Repo::TokioMutexTokio(repo) => repo.remove_inactive_peers(current_cutoff).await,
Repo::RwLockStd(repo) => repo.remove_inactive_peers(current_cutoff),
Repo::RwLockStdMutexStd(repo) => repo.remove_inactive_peers(current_cutoff),
Repo::RwLockStdMutexTokio(repo) => repo.remove_inactive_peers(current_cutoff).await,
Repo::RwLockTokio(repo) => repo.remove_inactive_peers(current_cutoff).await,
Repo::RwLockTokioMutexStd(repo) => repo.remove_inactive_peers(current_cutoff).await,
Repo::RwLockTokioMutexTokio(repo) => repo.remove_inactive_peers(current_cutoff).await,
Repo::SkipMapMutexStd(repo) => repo.remove_inactive_peers(current_cutoff),
}
}

pub(crate) async fn remove_peerless_torrents(&self, policy: &TrackerPolicy) {
match self {
Repo::Std(repo) => repo.remove_peerless_torrents(policy),
Repo::StdMutexStd(repo) => repo.remove_peerless_torrents(policy),
Repo::StdMutexTokio(repo) => repo.remove_peerless_torrents(policy).await,
Repo::Tokio(repo) => repo.remove_peerless_torrents(policy).await,
Repo::TokioMutexStd(repo) => repo.remove_peerless_torrents(policy).await,
Repo::TokioMutexTokio(repo) => repo.remove_peerless_torrents(policy).await,
Repo::RwLockStd(repo) => repo.remove_peerless_torrents(policy),
Repo::RwLockStdMutexStd(repo) => repo.remove_peerless_torrents(policy),
Repo::RwLockStdMutexTokio(repo) => repo.remove_peerless_torrents(policy).await,
Repo::RwLockTokio(repo) => repo.remove_peerless_torrents(policy).await,
Repo::RwLockTokioMutexStd(repo) => repo.remove_peerless_torrents(policy).await,
Repo::RwLockTokioMutexTokio(repo) => repo.remove_peerless_torrents(policy).await,
Repo::SkipMapMutexStd(repo) => repo.remove_peerless_torrents(policy),
}
}

pub(crate) async fn update_torrent_with_peer_and_get_stats(
&self,
info_hash: &InfoHash,
peer: &peer::Peer,
) -> (bool, SwarmMetadata) {
match self {
Repo::Std(repo) => repo.update_torrent_with_peer_and_get_stats(info_hash, peer),
Repo::StdMutexStd(repo) => repo.update_torrent_with_peer_and_get_stats(info_hash, peer),
Repo::StdMutexTokio(repo) => repo.update_torrent_with_peer_and_get_stats(info_hash, peer).await,
Repo::Tokio(repo) => repo.update_torrent_with_peer_and_get_stats(info_hash, peer).await,
Repo::TokioMutexStd(repo) => repo.update_torrent_with_peer_and_get_stats(info_hash, peer).await,
Repo::TokioMutexTokio(repo) => repo.update_torrent_with_peer_and_get_stats(info_hash, peer).await,
Repo::RwLockStd(repo) => repo.update_torrent_with_peer_and_get_stats(info_hash, peer),
Repo::RwLockStdMutexStd(repo) => repo.update_torrent_with_peer_and_get_stats(info_hash, peer),
Repo::RwLockStdMutexTokio(repo) => repo.update_torrent_with_peer_and_get_stats(info_hash, peer).await,
Repo::RwLockTokio(repo) => repo.update_torrent_with_peer_and_get_stats(info_hash, peer).await,
Repo::RwLockTokioMutexStd(repo) => repo.update_torrent_with_peer_and_get_stats(info_hash, peer).await,
Repo::RwLockTokioMutexTokio(repo) => repo.update_torrent_with_peer_and_get_stats(info_hash, peer).await,
Repo::SkipMapMutexStd(repo) => repo.update_torrent_with_peer_and_get_stats(info_hash, peer),
}
}

pub(crate) async fn insert(&self, info_hash: &InfoHash, torrent: EntrySingle) -> Option<EntrySingle> {
match self {
Repo::Std(repo) => repo.write().insert(*info_hash, torrent),
Repo::StdMutexStd(repo) => Some(repo.write().insert(*info_hash, torrent.into())?.lock().unwrap().clone()),
Repo::StdMutexTokio(repo) => {
let r = repo.write().insert(*info_hash, torrent.into());
match r {
Some(t) => Some(t.lock().await.clone()),
None => None,
}
Repo::RwLockStd(repo) => {
repo.write().insert(*info_hash, torrent);
}
Repo::Tokio(repo) => repo.write().await.insert(*info_hash, torrent),
Repo::TokioMutexStd(repo) => Some(repo.write().await.insert(*info_hash, torrent.into())?.lock().unwrap().clone()),
Repo::TokioMutexTokio(repo) => Some(repo.write().await.insert(*info_hash, torrent.into())?.lock().await.clone()),
}
Repo::RwLockStdMutexStd(repo) => {
repo.write().insert(*info_hash, torrent.into());
}
Repo::RwLockStdMutexTokio(repo) => {
repo.write().insert(*info_hash, torrent.into());
}
Repo::RwLockTokio(repo) => {
repo.write().await.insert(*info_hash, torrent);
}
Repo::RwLockTokioMutexStd(repo) => {
repo.write().await.insert(*info_hash, torrent.into());
}
Repo::RwLockTokioMutexTokio(repo) => {
repo.write().await.insert(*info_hash, torrent.into());
}
Repo::SkipMapMutexStd(repo) => {
repo.torrents.insert(*info_hash, torrent.into());
}
};
self.get(info_hash).await
}
}
Loading

0 comments on commit 12f54e7

Please sign in to comment.