Skip to content

Commit

Permalink
refactor: segregate command and query for announce request
Browse files Browse the repository at this point in the history
This changes the API of the torrent repository. The method:

```
fn update_torrent_with_peer_and_get_stats(&self, info_hash: &InfoHash, peer: &peer::Peer) -> (bool, SwarmMetadata);
```

is replaced with:

```
fn upsert_peer(&self, info_hash: &InfoHash, peer: &peer::Peer);
fn get_swarm_metadata(&self, info_hash: &InfoHash) -> Option<SwarmMetadata>;
```

The performance is not affected. Benchmaring is still using both methods
in order to simulate `announce` requests.

1. The interface is simpler (command/query segregation.
2. In the long-term:
    - Returning swarm metadata in the announce request could be
      optional. The announce request process would be faster if the
tracker does not have to mantain the swarm data. This is not likely to
happen becuase the scrape request needs this metadata.
    - New repository performance improvements could be implemented. This allow
      decoupling peer lists from swarm metadata. The repository
internally can have two data strcutures one for the peer list and
another for the swarm metatada. Both using different locks.
  • Loading branch information
josecelano committed Apr 12, 2024
1 parent b5fb03b commit aa4bfba
Show file tree
Hide file tree
Showing 25 changed files with 259 additions and 226 deletions.
34 changes: 16 additions & 18 deletions packages/torrent-repository/benches/helpers/asyn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ where

let info_hash = InfoHash([0; 20]);

torrent_repository
.update_torrent_with_peer_and_get_stats(&info_hash, &DEFAULT_PEER)
.await;
torrent_repository.upsert_peer(&info_hash, &DEFAULT_PEER).await;

torrent_repository.get_swarm_metadata(&info_hash).await;
}

start.elapsed()
Expand All @@ -37,19 +37,19 @@ where
let handles = FuturesUnordered::new();

// Add the torrent/peer to the torrent repository
torrent_repository
.update_torrent_with_peer_and_get_stats(info_hash, &DEFAULT_PEER)
.await;
torrent_repository.upsert_peer(info_hash, &DEFAULT_PEER).await;

torrent_repository.get_swarm_metadata(info_hash).await;

let start = Instant::now();

for _ in 0..samples {
let torrent_repository_clone = torrent_repository.clone();

let handle = runtime.spawn(async move {
torrent_repository_clone
.update_torrent_with_peer_and_get_stats(info_hash, &DEFAULT_PEER)
.await;
torrent_repository_clone.upsert_peer(info_hash, &DEFAULT_PEER).await;

torrent_repository_clone.get_swarm_metadata(info_hash).await;

if let Some(sleep_time) = sleep {
let start_time = std::time::Instant::now();
Expand Down Expand Up @@ -87,9 +87,9 @@ where
let torrent_repository_clone = torrent_repository.clone();

let handle = runtime.spawn(async move {
torrent_repository_clone
.update_torrent_with_peer_and_get_stats(&info_hash, &DEFAULT_PEER)
.await;
torrent_repository_clone.upsert_peer(&info_hash, &DEFAULT_PEER).await;

torrent_repository_clone.get_swarm_metadata(&info_hash).await;

if let Some(sleep_time) = sleep {
let start_time = std::time::Instant::now();
Expand Down Expand Up @@ -123,9 +123,8 @@ where

// Add the torrents/peers to the torrent repository
for info_hash in &info_hashes {
torrent_repository
.update_torrent_with_peer_and_get_stats(info_hash, &DEFAULT_PEER)
.await;
torrent_repository.upsert_peer(info_hash, &DEFAULT_PEER).await;
torrent_repository.get_swarm_metadata(info_hash).await;
}

let start = Instant::now();
Expand All @@ -134,9 +133,8 @@ where
let torrent_repository_clone = torrent_repository.clone();

let handle = runtime.spawn(async move {
torrent_repository_clone
.update_torrent_with_peer_and_get_stats(&info_hash, &DEFAULT_PEER)
.await;
torrent_repository_clone.upsert_peer(&info_hash, &DEFAULT_PEER).await;
torrent_repository_clone.get_swarm_metadata(&info_hash).await;

if let Some(sleep_time) = sleep {
let start_time = std::time::Instant::now();
Expand Down
22 changes: 16 additions & 6 deletions packages/torrent-repository/benches/helpers/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ where

let info_hash = InfoHash([0; 20]);

torrent_repository.update_torrent_with_peer_and_get_stats(&info_hash, &DEFAULT_PEER);
torrent_repository.upsert_peer(&info_hash, &DEFAULT_PEER);

torrent_repository.get_swarm_metadata(&info_hash);
}

start.elapsed()
Expand All @@ -37,15 +39,19 @@ where
let handles = FuturesUnordered::new();

// Add the torrent/peer to the torrent repository
torrent_repository.update_torrent_with_peer_and_get_stats(info_hash, &DEFAULT_PEER);
torrent_repository.upsert_peer(info_hash, &DEFAULT_PEER);

torrent_repository.get_swarm_metadata(info_hash);

let start = Instant::now();

for _ in 0..samples {
let torrent_repository_clone = torrent_repository.clone();

let handle = runtime.spawn(async move {
torrent_repository_clone.update_torrent_with_peer_and_get_stats(info_hash, &DEFAULT_PEER);
torrent_repository_clone.upsert_peer(info_hash, &DEFAULT_PEER);

torrent_repository_clone.get_swarm_metadata(info_hash);

if let Some(sleep_time) = sleep {
let start_time = std::time::Instant::now();
Expand Down Expand Up @@ -83,7 +89,9 @@ where
let torrent_repository_clone = torrent_repository.clone();

let handle = runtime.spawn(async move {
torrent_repository_clone.update_torrent_with_peer_and_get_stats(&info_hash, &DEFAULT_PEER);
torrent_repository_clone.upsert_peer(&info_hash, &DEFAULT_PEER);

torrent_repository_clone.get_swarm_metadata(&info_hash);

if let Some(sleep_time) = sleep {
let start_time = std::time::Instant::now();
Expand Down Expand Up @@ -117,7 +125,8 @@ where

// Add the torrents/peers to the torrent repository
for info_hash in &info_hashes {
torrent_repository.update_torrent_with_peer_and_get_stats(info_hash, &DEFAULT_PEER);
torrent_repository.upsert_peer(info_hash, &DEFAULT_PEER);
torrent_repository.get_swarm_metadata(info_hash);
}

let start = Instant::now();
Expand All @@ -126,7 +135,8 @@ where
let torrent_repository_clone = torrent_repository.clone();

let handle = runtime.spawn(async move {
torrent_repository_clone.update_torrent_with_peer_and_get_stats(&info_hash, &DEFAULT_PEER);
torrent_repository_clone.upsert_peer(&info_hash, &DEFAULT_PEER);
torrent_repository_clone.get_swarm_metadata(&info_hash);

if let Some(sleep_time) = sleep {
let start_time = std::time::Instant::now();
Expand Down
20 changes: 6 additions & 14 deletions packages/torrent-repository/src/entry/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ pub trait Entry {
/// It returns the swarm metadata (statistics) as a struct:
///
/// `(seeders, completed, leechers)`
fn get_stats(&self) -> SwarmMetadata;
fn get_swarm_metadata(&self) -> SwarmMetadata;

/// Returns True if Still a Valid Entry according to the Tracker Policy
fn is_good(&self, policy: &TrackerPolicy) -> bool;
Expand All @@ -40,31 +40,27 @@ pub trait Entry {
///
/// The number of peers that have complete downloading is synchronously updated when peers are updated.
/// That's the total torrent downloads counter.
fn insert_or_update_peer(&mut self, peer: &peer::Peer) -> bool;

// It preforms a combined operation of `insert_or_update_peer` and `get_stats`.
fn insert_or_update_peer_and_get_stats(&mut self, peer: &peer::Peer) -> (bool, SwarmMetadata);
fn upsert_peer(&mut self, peer: &peer::Peer) -> bool;

/// It removes peer from the swarm that have not been updated for more than `current_cutoff` seconds
fn remove_inactive_peers(&mut self, current_cutoff: DurationSinceUnixEpoch);
}

#[allow(clippy::module_name_repetitions)]
pub trait EntrySync {
fn get_stats(&self) -> SwarmMetadata;
fn get_swarm_metadata(&self) -> SwarmMetadata;
fn is_good(&self, policy: &TrackerPolicy) -> bool;
fn peers_is_empty(&self) -> bool;
fn get_peers_len(&self) -> usize;
fn get_peers(&self, limit: Option<usize>) -> Vec<Arc<peer::Peer>>;
fn get_peers_for_client(&self, client: &SocketAddr, limit: Option<usize>) -> Vec<Arc<peer::Peer>>;
fn insert_or_update_peer(&self, peer: &peer::Peer) -> bool;
fn insert_or_update_peer_and_get_stats(&self, peer: &peer::Peer) -> (bool, SwarmMetadata);
fn upsert_peer(&self, peer: &peer::Peer) -> bool;
fn remove_inactive_peers(&self, current_cutoff: DurationSinceUnixEpoch);
}

#[allow(clippy::module_name_repetitions)]
pub trait EntryAsync {
fn get_stats(&self) -> impl std::future::Future<Output = SwarmMetadata> + Send;
fn get_swarm_metadata(&self) -> impl std::future::Future<Output = SwarmMetadata> + Send;
fn check_good(self, policy: &TrackerPolicy) -> impl std::future::Future<Output = bool> + Send;
fn peers_is_empty(&self) -> impl std::future::Future<Output = bool> + Send;
fn get_peers_len(&self) -> impl std::future::Future<Output = usize> + Send;
Expand All @@ -74,11 +70,7 @@ pub trait EntryAsync {
client: &SocketAddr,
limit: Option<usize>,
) -> impl std::future::Future<Output = Vec<Arc<peer::Peer>>> + Send;
fn insert_or_update_peer(self, peer: &peer::Peer) -> impl std::future::Future<Output = bool> + Send;
fn insert_or_update_peer_and_get_stats(
self,
peer: &peer::Peer,
) -> impl std::future::Future<Output = (bool, SwarmMetadata)> + std::marker::Send;
fn upsert_peer(self, peer: &peer::Peer) -> impl std::future::Future<Output = bool> + Send;
fn remove_inactive_peers(self, current_cutoff: DurationSinceUnixEpoch) -> impl std::future::Future<Output = ()> + Send;
}

Expand Down
14 changes: 4 additions & 10 deletions packages/torrent-repository/src/entry/mutex_std.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ use super::{Entry, EntrySync};
use crate::{EntryMutexStd, EntrySingle};

impl EntrySync for EntryMutexStd {
fn get_stats(&self) -> SwarmMetadata {
self.lock().expect("it should get a lock").get_stats()
fn get_swarm_metadata(&self) -> SwarmMetadata {
self.lock().expect("it should get a lock").get_swarm_metadata()
}

fn is_good(&self, policy: &TrackerPolicy) -> bool {
Expand All @@ -33,14 +33,8 @@ impl EntrySync for EntryMutexStd {
self.lock().expect("it should get lock").get_peers_for_client(client, limit)
}

fn insert_or_update_peer(&self, peer: &peer::Peer) -> bool {
self.lock().expect("it should lock the entry").insert_or_update_peer(peer)
}

fn insert_or_update_peer_and_get_stats(&self, peer: &peer::Peer) -> (bool, SwarmMetadata) {
self.lock()
.expect("it should lock the entry")
.insert_or_update_peer_and_get_stats(peer)
fn upsert_peer(&self, peer: &peer::Peer) -> bool {
self.lock().expect("it should lock the entry").upsert_peer(peer)
}

fn remove_inactive_peers(&self, current_cutoff: DurationSinceUnixEpoch) {
Expand Down
12 changes: 4 additions & 8 deletions packages/torrent-repository/src/entry/mutex_tokio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ use super::{Entry, EntryAsync};
use crate::{EntryMutexTokio, EntrySingle};

impl EntryAsync for EntryMutexTokio {
async fn get_stats(&self) -> SwarmMetadata {
self.lock().await.get_stats()
async fn get_swarm_metadata(&self) -> SwarmMetadata {
self.lock().await.get_swarm_metadata()
}

async fn check_good(self, policy: &TrackerPolicy) -> bool {
Expand All @@ -33,12 +33,8 @@ impl EntryAsync for EntryMutexTokio {
self.lock().await.get_peers_for_client(client, limit)
}

async fn insert_or_update_peer(self, peer: &peer::Peer) -> bool {
self.lock().await.insert_or_update_peer(peer)
}

async fn insert_or_update_peer_and_get_stats(self, peer: &peer::Peer) -> (bool, SwarmMetadata) {
self.lock().await.insert_or_update_peer_and_get_stats(peer)
async fn upsert_peer(self, peer: &peer::Peer) -> bool {
self.lock().await.upsert_peer(peer)
}

async fn remove_inactive_peers(self, current_cutoff: DurationSinceUnixEpoch) {
Expand Down
10 changes: 2 additions & 8 deletions packages/torrent-repository/src/entry/single.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use crate::EntrySingle;

impl Entry for EntrySingle {
#[allow(clippy::cast_possible_truncation)]
fn get_stats(&self) -> SwarmMetadata {
fn get_swarm_metadata(&self) -> SwarmMetadata {
let complete: u32 = self.peers.values().filter(|peer| peer.is_seeder()).count() as u32;
let incomplete: u32 = self.peers.len() as u32 - complete;

Expand Down Expand Up @@ -70,7 +70,7 @@ impl Entry for EntrySingle {
}
}

fn insert_or_update_peer(&mut self, peer: &peer::Peer) -> bool {
fn upsert_peer(&mut self, peer: &peer::Peer) -> bool {
let mut downloaded_stats_updated: bool = false;

match peer::ReadInfo::get_event(peer) {
Expand All @@ -93,12 +93,6 @@ impl Entry for EntrySingle {
downloaded_stats_updated
}

fn insert_or_update_peer_and_get_stats(&mut self, peer: &peer::Peer) -> (bool, SwarmMetadata) {
let changed = self.insert_or_update_peer(peer);
let stats = self.get_stats();
(changed, stats)
}

fn remove_inactive_peers(&mut self, current_cutoff: DurationSinceUnixEpoch) {
self.peers
.retain(|_, peer| peer::ReadInfo::get_updated(peer) > current_cutoff);
Expand Down
16 changes: 9 additions & 7 deletions packages/torrent-repository/src/repository/dash_map_mutex_std.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,21 @@ where
EntryMutexStd: EntrySync,
EntrySingle: Entry,
{
fn update_torrent_with_peer_and_get_stats(&self, info_hash: &InfoHash, peer: &peer::Peer) -> (bool, SwarmMetadata) {
fn upsert_peer(&self, info_hash: &InfoHash, peer: &peer::Peer) {
if let Some(entry) = self.torrents.get(info_hash) {
entry.insert_or_update_peer_and_get_stats(peer)
entry.upsert_peer(peer);
} else {
let _unused = self.torrents.insert(*info_hash, Arc::default());

match self.torrents.get(info_hash) {
Some(entry) => entry.insert_or_update_peer_and_get_stats(peer),
None => (false, SwarmMetadata::zeroed()),
if let Some(entry) = self.torrents.get(info_hash) {
entry.upsert_peer(peer);
}
}
}

fn get_swarm_metadata(&self, info_hash: &InfoHash) -> Option<SwarmMetadata> {
self.torrents.get(info_hash).map(|entry| entry.value().get_swarm_metadata())
}

fn get(&self, key: &InfoHash) -> Option<EntryMutexStd> {
let maybe_entry = self.torrents.get(key);
maybe_entry.map(|entry| entry.clone())
Expand All @@ -45,7 +47,7 @@ where
let mut metrics = TorrentsMetrics::default();

for entry in &self.torrents {
let stats = entry.value().lock().expect("it should get a lock").get_stats();
let stats = entry.value().lock().expect("it should get a lock").get_swarm_metadata();
metrics.complete += u64::from(stats.complete);
metrics.downloaded += u64::from(stats.downloaded);
metrics.incomplete += u64::from(stats.incomplete);
Expand Down
10 changes: 4 additions & 6 deletions packages/torrent-repository/src/repository/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ pub trait Repository<T>: Debug + Default + Sized + 'static {
fn remove(&self, key: &InfoHash) -> Option<T>;
fn remove_inactive_peers(&self, current_cutoff: DurationSinceUnixEpoch);
fn remove_peerless_torrents(&self, policy: &TrackerPolicy);
fn update_torrent_with_peer_and_get_stats(&self, info_hash: &InfoHash, peer: &peer::Peer) -> (bool, SwarmMetadata);
fn upsert_peer(&self, info_hash: &InfoHash, peer: &peer::Peer);
fn get_swarm_metadata(&self, info_hash: &InfoHash) -> Option<SwarmMetadata>;
}

#[allow(clippy::module_name_repetitions)]
Expand All @@ -36,9 +37,6 @@ pub trait RepositoryAsync<T>: Debug + Default + Sized + 'static {
fn remove(&self, key: &InfoHash) -> impl std::future::Future<Output = Option<T>> + Send;
fn remove_inactive_peers(&self, current_cutoff: DurationSinceUnixEpoch) -> impl std::future::Future<Output = ()> + Send;
fn remove_peerless_torrents(&self, policy: &TrackerPolicy) -> impl std::future::Future<Output = ()> + Send;
fn update_torrent_with_peer_and_get_stats(
&self,
info_hash: &InfoHash,
peer: &peer::Peer,
) -> impl std::future::Future<Output = (bool, SwarmMetadata)> + Send;
fn upsert_peer(&self, info_hash: &InfoHash, peer: &peer::Peer) -> impl std::future::Future<Output = ()> + Send;
fn get_swarm_metadata(&self, info_hash: &InfoHash) -> impl std::future::Future<Output = Option<SwarmMetadata>> + Send;
}
10 changes: 7 additions & 3 deletions packages/torrent-repository/src/repository/rw_lock_std.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,16 @@ impl Repository<EntrySingle> for TorrentsRwLockStd
where
EntrySingle: Entry,
{
fn update_torrent_with_peer_and_get_stats(&self, info_hash: &InfoHash, peer: &peer::Peer) -> (bool, SwarmMetadata) {
fn upsert_peer(&self, info_hash: &InfoHash, peer: &peer::Peer) {
let mut db = self.get_torrents_mut();

let entry = db.entry(*info_hash).or_insert(EntrySingle::default());

entry.insert_or_update_peer_and_get_stats(peer)
entry.upsert_peer(peer);
}

fn get_swarm_metadata(&self, info_hash: &InfoHash) -> Option<SwarmMetadata> {
self.get(info_hash).map(|entry| entry.get_swarm_metadata())
}

fn get(&self, key: &InfoHash) -> Option<EntrySingle> {
Expand All @@ -64,7 +68,7 @@ where
let mut metrics = TorrentsMetrics::default();

for entry in self.get_torrents().values() {
let stats = entry.get_stats();
let stats = entry.get_swarm_metadata();
metrics.complete += u64::from(stats.complete);
metrics.downloaded += u64::from(stats.downloaded);
metrics.incomplete += u64::from(stats.incomplete);
Expand Down
Loading

0 comments on commit aa4bfba

Please sign in to comment.