Skip to content

Commit

Permalink
dev: refactor torrent repository
Browse files Browse the repository at this point in the history
extracted async and sync implementations
  • Loading branch information
da2ce7 committed Mar 25, 2024
1 parent 3bd2a9c commit 5c0047a
Show file tree
Hide file tree
Showing 12 changed files with 402 additions and 364 deletions.
1 change: 1 addition & 0 deletions cSpell.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
"alekitto",
"appuser",
"Arvid",
"asyn",
"autoclean",
"AUTOINCREMENT",
"automock",
Expand Down
41 changes: 25 additions & 16 deletions packages/torrent-repository-benchmarks/src/benches/asyn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,20 @@ use std::time::Duration;

use clap::Parser;
use futures::stream::FuturesUnordered;
use torrust_tracker::core::torrent::repository::TRepositoryAsync;
use torrust_tracker::core::torrent::repository_asyn::{RepositoryAsync, RepositoryTokioRwLock};
use torrust_tracker::shared::bit_torrent::info_hash::InfoHash;

use crate::args::Args;
use crate::benches::utils::{generate_unique_info_hashes, get_average_and_adjusted_average_from_results, DEFAULT_PEER};

pub async fn async_add_one_torrent<T: TRepositoryAsync + Send + Sync + 'static>(samples: usize) -> (Duration, Duration) {
pub async fn async_add_one_torrent<T>(samples: usize) -> (Duration, Duration)
where
RepositoryTokioRwLock<T>: RepositoryAsync<T>,
{
let mut results: Vec<Duration> = Vec::with_capacity(samples);

for _ in 0..samples {
let torrent_repository = Arc::new(T::new());
let torrent_repository = Arc::new(RepositoryTokioRwLock::<T>::default());

let info_hash = InfoHash([0; 20]);

Expand All @@ -32,15 +35,16 @@ pub async fn async_add_one_torrent<T: TRepositoryAsync + Send + Sync + 'static>(
}

// Add one torrent ten thousand times in parallel (depending on the set worker threads)
pub async fn async_update_one_torrent_in_parallel<T: TRepositoryAsync + Send + Sync + 'static>(
runtime: &tokio::runtime::Runtime,
samples: usize,
) -> (Duration, Duration) {
pub async fn async_update_one_torrent_in_parallel<T>(runtime: &tokio::runtime::Runtime, samples: usize) -> (Duration, Duration)
where
T: Send + Sync + 'static,
RepositoryTokioRwLock<T>: RepositoryAsync<T>,
{
let args = Args::parse();
let mut results: Vec<Duration> = Vec::with_capacity(samples);

for _ in 0..samples {
let torrent_repository = Arc::new(T::new());
let torrent_repository = Arc::new(RepositoryTokioRwLock::<T>::default());
let info_hash: &'static InfoHash = &InfoHash([0; 20]);
let handles = FuturesUnordered::new();

Expand Down Expand Up @@ -81,15 +85,16 @@ pub async fn async_update_one_torrent_in_parallel<T: TRepositoryAsync + Send + S
}

// Add ten thousand torrents in parallel (depending on the set worker threads)
pub async fn async_add_multiple_torrents_in_parallel<T: TRepositoryAsync + Send + Sync + 'static>(
runtime: &tokio::runtime::Runtime,
samples: usize,
) -> (Duration, Duration) {
pub async fn async_add_multiple_torrents_in_parallel<T>(runtime: &tokio::runtime::Runtime, samples: usize) -> (Duration, Duration)
where
T: Send + Sync + 'static,
RepositoryTokioRwLock<T>: RepositoryAsync<T>,
{
let args = Args::parse();
let mut results: Vec<Duration> = Vec::with_capacity(samples);

for _ in 0..samples {
let torrent_repository = Arc::new(T::new());
let torrent_repository = Arc::new(RepositoryTokioRwLock::<T>::default());
let info_hashes = generate_unique_info_hashes(10_000);
let handles = FuturesUnordered::new();

Expand Down Expand Up @@ -125,15 +130,19 @@ pub async fn async_add_multiple_torrents_in_parallel<T: TRepositoryAsync + Send
}

// Async update ten thousand torrents in parallel (depending on the set worker threads)
pub async fn async_update_multiple_torrents_in_parallel<T: TRepositoryAsync + Send + Sync + 'static>(
pub async fn async_update_multiple_torrents_in_parallel<T>(
runtime: &tokio::runtime::Runtime,
samples: usize,
) -> (Duration, Duration) {
) -> (Duration, Duration)
where
T: Send + Sync + 'static,
RepositoryTokioRwLock<T>: RepositoryAsync<T>,
{
let args = Args::parse();
let mut results: Vec<Duration> = Vec::with_capacity(samples);

for _ in 0..samples {
let torrent_repository = Arc::new(T::new());
let torrent_repository = Arc::new(RepositoryTokioRwLock::<T>::default());
let info_hashes = generate_unique_info_hashes(10_000);
let handles = FuturesUnordered::new();

Expand Down
42 changes: 24 additions & 18 deletions packages/torrent-repository-benchmarks/src/benches/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,22 @@ use std::time::Duration;

use clap::Parser;
use futures::stream::FuturesUnordered;
use torrust_tracker::core::torrent::repository::Repository;
use torrust_tracker::core::torrent::repository_sync::{RepositoryStdRwLock, RepositorySync};
use torrust_tracker::shared::bit_torrent::info_hash::InfoHash;

use crate::args::Args;
use crate::benches::utils::{generate_unique_info_hashes, get_average_and_adjusted_average_from_results, DEFAULT_PEER};

// Simply add one torrent
#[must_use]
pub fn add_one_torrent<T: Repository + Send + Sync + 'static>(samples: usize) -> (Duration, Duration) {
pub fn add_one_torrent<T>(samples: usize) -> (Duration, Duration)
where
RepositoryStdRwLock<T>: RepositorySync<T>,
{
let mut results: Vec<Duration> = Vec::with_capacity(samples);

for _ in 0..samples {
let torrent_repository = Arc::new(T::new());
let torrent_repository = Arc::new(RepositoryStdRwLock::<T>::default());

let info_hash = InfoHash([0; 20]);

Expand All @@ -32,15 +35,16 @@ pub fn add_one_torrent<T: Repository + Send + Sync + 'static>(samples: usize) ->
}

// Add one torrent ten thousand times in parallel (depending on the set worker threads)
pub async fn update_one_torrent_in_parallel<T: Repository + Send + Sync + 'static>(
runtime: &tokio::runtime::Runtime,
samples: usize,
) -> (Duration, Duration) {
pub async fn update_one_torrent_in_parallel<T>(runtime: &tokio::runtime::Runtime, samples: usize) -> (Duration, Duration)
where
T: Send + Sync + 'static,
RepositoryStdRwLock<T>: RepositorySync<T>,
{
let args = Args::parse();
let mut results: Vec<Duration> = Vec::with_capacity(samples);

for _ in 0..samples {
let torrent_repository = Arc::new(T::new());
let torrent_repository = Arc::new(RepositoryStdRwLock::<T>::default());
let info_hash: &'static InfoHash = &InfoHash([0; 20]);
let handles = FuturesUnordered::new();

Expand Down Expand Up @@ -77,15 +81,16 @@ pub async fn update_one_torrent_in_parallel<T: Repository + Send + Sync + 'stati
}

// Add ten thousand torrents in parallel (depending on the set worker threads)
pub async fn add_multiple_torrents_in_parallel<T: Repository + Send + Sync + 'static>(
runtime: &tokio::runtime::Runtime,
samples: usize,
) -> (Duration, Duration) {
pub async fn add_multiple_torrents_in_parallel<T>(runtime: &tokio::runtime::Runtime, samples: usize) -> (Duration, Duration)
where
T: Send + Sync + 'static,
RepositoryStdRwLock<T>: RepositorySync<T>,
{
let args = Args::parse();
let mut results: Vec<Duration> = Vec::with_capacity(samples);

for _ in 0..samples {
let torrent_repository = Arc::new(T::new());
let torrent_repository = Arc::new(RepositoryStdRwLock::<T>::default());
let info_hashes = generate_unique_info_hashes(10_000);
let handles = FuturesUnordered::new();

Expand Down Expand Up @@ -119,15 +124,16 @@ pub async fn add_multiple_torrents_in_parallel<T: Repository + Send + Sync + 'st
}

// Update ten thousand torrents in parallel (depending on the set worker threads)
pub async fn update_multiple_torrents_in_parallel<T: Repository + Send + Sync + 'static>(
runtime: &tokio::runtime::Runtime,
samples: usize,
) -> (Duration, Duration) {
pub async fn update_multiple_torrents_in_parallel<T>(runtime: &tokio::runtime::Runtime, samples: usize) -> (Duration, Duration)
where
T: Send + Sync + 'static,
RepositoryStdRwLock<T>: RepositorySync<T>,
{
let args = Args::parse();
let mut results: Vec<Duration> = Vec::with_capacity(samples);

for _ in 0..samples {
let torrent_repository = Arc::new(T::new());
let torrent_repository = Arc::new(RepositoryStdRwLock::<T>::default());
let info_hashes = generate_unique_info_hashes(10_000);
let handles = FuturesUnordered::new();

Expand Down
50 changes: 25 additions & 25 deletions packages/torrent-repository-benchmarks/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use torrust_torrent_repository_benchmarks::benches::asyn::{
use torrust_torrent_repository_benchmarks::benches::sync::{
add_multiple_torrents_in_parallel, add_one_torrent, update_multiple_torrents_in_parallel, update_one_torrent_in_parallel,
};
use torrust_tracker::core::torrent::repository::{AsyncSync, RepositoryAsync, RepositoryAsyncSingle, Sync, SyncSingle};
use torrust_tracker::core::torrent::{Entry, EntryMutexStd, EntryMutexTokio};

#[allow(clippy::too_many_lines)]
#[allow(clippy::print_literal)]
Expand All @@ -25,67 +25,67 @@ fn main() {
println!(
"{}: Avg/AdjAvg: {:?}",
"add_one_torrent",
rt.block_on(async_add_one_torrent::<RepositoryAsyncSingle>(1_000_000))
rt.block_on(async_add_one_torrent::<Entry>(1_000_000))
);
println!(
"{}: Avg/AdjAvg: {:?}",
"update_one_torrent_in_parallel",
rt.block_on(async_update_one_torrent_in_parallel::<RepositoryAsyncSingle>(&rt, 10))
rt.block_on(async_update_one_torrent_in_parallel::<Entry>(&rt, 10))
);
println!(
"{}: Avg/AdjAvg: {:?}",
"add_multiple_torrents_in_parallel",
rt.block_on(async_add_multiple_torrents_in_parallel::<RepositoryAsyncSingle>(&rt, 10))
rt.block_on(async_add_multiple_torrents_in_parallel::<Entry>(&rt, 10))
);
println!(
"{}: Avg/AdjAvg: {:?}",
"update_multiple_torrents_in_parallel",
rt.block_on(async_update_multiple_torrents_in_parallel::<RepositoryAsyncSingle>(&rt, 10))
rt.block_on(async_update_multiple_torrents_in_parallel::<Entry>(&rt, 10))
);

if let Some(true) = args.compare {
println!();

println!("std::sync::RwLock<std::collections::BTreeMap<InfoHash, Entry>>");
println!(
"{}: Avg/AdjAvg: {:?}",
"add_one_torrent",
add_one_torrent::<SyncSingle>(1_000_000)
);
println!("{}: Avg/AdjAvg: {:?}", "add_one_torrent", add_one_torrent::<Entry>(1_000_000));
println!(
"{}: Avg/AdjAvg: {:?}",
"update_one_torrent_in_parallel",
rt.block_on(update_one_torrent_in_parallel::<SyncSingle>(&rt, 10))
rt.block_on(update_one_torrent_in_parallel::<Entry>(&rt, 10))
);
println!(
"{}: Avg/AdjAvg: {:?}",
"add_multiple_torrents_in_parallel",
rt.block_on(add_multiple_torrents_in_parallel::<SyncSingle>(&rt, 10))
rt.block_on(add_multiple_torrents_in_parallel::<Entry>(&rt, 10))
);
println!(
"{}: Avg/AdjAvg: {:?}",
"update_multiple_torrents_in_parallel",
rt.block_on(update_multiple_torrents_in_parallel::<SyncSingle>(&rt, 10))
rt.block_on(update_multiple_torrents_in_parallel::<Entry>(&rt, 10))
);

println!();

println!("std::sync::RwLock<std::collections::BTreeMap<InfoHash, Arc<std::sync::Mutex<Entry>>>>");
println!("{}: Avg/AdjAvg: {:?}", "add_one_torrent", add_one_torrent::<Sync>(1_000_000));
println!(
"{}: Avg/AdjAvg: {:?}",
"add_one_torrent",
add_one_torrent::<EntryMutexStd>(1_000_000)
);
println!(
"{}: Avg/AdjAvg: {:?}",
"update_one_torrent_in_parallel",
rt.block_on(update_one_torrent_in_parallel::<Sync>(&rt, 10))
rt.block_on(update_one_torrent_in_parallel::<EntryMutexStd>(&rt, 10))
);
println!(
"{}: Avg/AdjAvg: {:?}",
"add_multiple_torrents_in_parallel",
rt.block_on(add_multiple_torrents_in_parallel::<Sync>(&rt, 10))
rt.block_on(add_multiple_torrents_in_parallel::<EntryMutexStd>(&rt, 10))
);
println!(
"{}: Avg/AdjAvg: {:?}",
"update_multiple_torrents_in_parallel",
rt.block_on(update_multiple_torrents_in_parallel::<Sync>(&rt, 10))
rt.block_on(update_multiple_torrents_in_parallel::<EntryMutexStd>(&rt, 10))
);

println!();
Expand All @@ -94,22 +94,22 @@ fn main() {
println!(
"{}: Avg/AdjAvg: {:?}",
"add_one_torrent",
rt.block_on(async_add_one_torrent::<AsyncSync>(1_000_000))
rt.block_on(async_add_one_torrent::<EntryMutexStd>(1_000_000))
);
println!(
"{}: Avg/AdjAvg: {:?}",
"update_one_torrent_in_parallel",
rt.block_on(async_update_one_torrent_in_parallel::<AsyncSync>(&rt, 10))
rt.block_on(async_update_one_torrent_in_parallel::<EntryMutexStd>(&rt, 10))
);
println!(
"{}: Avg/AdjAvg: {:?}",
"add_multiple_torrents_in_parallel",
rt.block_on(async_add_multiple_torrents_in_parallel::<AsyncSync>(&rt, 10))
rt.block_on(async_add_multiple_torrents_in_parallel::<EntryMutexStd>(&rt, 10))
);
println!(
"{}: Avg/AdjAvg: {:?}",
"update_multiple_torrents_in_parallel",
rt.block_on(async_update_multiple_torrents_in_parallel::<AsyncSync>(&rt, 10))
rt.block_on(async_update_multiple_torrents_in_parallel::<EntryMutexStd>(&rt, 10))
);

println!();
Expand All @@ -118,22 +118,22 @@ fn main() {
println!(
"{}: Avg/AdjAvg: {:?}",
"add_one_torrent",
rt.block_on(async_add_one_torrent::<RepositoryAsync>(1_000_000))
rt.block_on(async_add_one_torrent::<EntryMutexTokio>(1_000_000))
);
println!(
"{}: Avg/AdjAvg: {:?}",
"update_one_torrent_in_parallel",
rt.block_on(async_update_one_torrent_in_parallel::<RepositoryAsync>(&rt, 10))
rt.block_on(async_update_one_torrent_in_parallel::<EntryMutexTokio>(&rt, 10))
);
println!(
"{}: Avg/AdjAvg: {:?}",
"add_multiple_torrents_in_parallel",
rt.block_on(async_add_multiple_torrents_in_parallel::<RepositoryAsync>(&rt, 10))
rt.block_on(async_add_multiple_torrents_in_parallel::<EntryMutexTokio>(&rt, 10))
);
println!(
"{}: Avg/AdjAvg: {:?}",
"update_multiple_torrents_in_parallel",
rt.block_on(async_update_multiple_torrents_in_parallel::<RepositoryAsync>(&rt, 10))
rt.block_on(async_update_multiple_torrents_in_parallel::<EntryMutexTokio>(&rt, 10))
);
}
}
8 changes: 5 additions & 3 deletions src/core/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -455,7 +455,8 @@ use torrust_tracker_primitives::TrackerMode;
use self::auth::Key;
use self::error::Error;
use self::peer::Peer;
use self::torrent::repository::{RepositoryAsyncSingle, TRepositoryAsync};
use self::torrent::repository_asyn::{RepositoryAsync, RepositoryTokioRwLock};
use self::torrent::Entry;
use crate::core::databases::Database;
use crate::core::torrent::{SwarmMetadata, SwarmStats};
use crate::shared::bit_torrent::info_hash::InfoHash;
Expand All @@ -481,7 +482,7 @@ pub struct Tracker {
policy: TrackerPolicy,
keys: tokio::sync::RwLock<std::collections::HashMap<Key, auth::ExpiringKey>>,
whitelist: tokio::sync::RwLock<std::collections::HashSet<InfoHash>>,
pub torrents: Arc<RepositoryAsyncSingle>,
pub torrents: Arc<RepositoryTokioRwLock<Entry>>,
stats_event_sender: Option<Box<dyn statistics::EventSender>>,
stats_repository: statistics::Repo,
external_ip: Option<IpAddr>,
Expand Down Expand Up @@ -579,7 +580,7 @@ impl Tracker {
mode,
keys: tokio::sync::RwLock::new(std::collections::HashMap::new()),
whitelist: tokio::sync::RwLock::new(std::collections::HashSet::new()),
torrents: Arc::new(RepositoryAsyncSingle::new()),
torrents: Arc::new(RepositoryTokioRwLock::<Entry>::default()),
stats_event_sender,
stats_repository,
database,
Expand Down Expand Up @@ -1754,6 +1755,7 @@ mod tests {
use aquatic_udp_protocol::AnnounceEvent;

use crate::core::tests::the_tracker::{sample_info_hash, sample_peer, tracker_persisting_torrents_in_database};
use crate::core::torrent::repository_asyn::RepositoryAsync;

#[tokio::test]
async fn it_should_persist_the_number_of_completed_peers_for_all_torrents_into_the_database() {
Expand Down
1 change: 1 addition & 0 deletions src/core/services/torrent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use std::sync::Arc;
use serde::Deserialize;

use crate::core::peer::Peer;
use crate::core::torrent::repository_asyn::RepositoryAsync;
use crate::core::Tracker;
use crate::shared::bit_torrent::info_hash::InfoHash;

Expand Down
Loading

0 comments on commit 5c0047a

Please sign in to comment.