Skip to content

Commit

Permalink
Merge #183: Remove duplicate code from announce request handlers
Browse files Browse the repository at this point in the history
7fcc19d refactor: remove unneeded method (Jose Celano)
3b20795 refactor: function does not need to be pub anymore (Jose Celano)
cecbc17 refactor: extract duplicate code from announce request handlers (Jose Celano)
156ac4d refactor: clean announce request handlers (Jose Celano)
05ea741 refactor: move code from domain to delivery layer (Jose Celano)
4b8fbfb refactor: the tracker is responsible for assigning the IP to peers (Jose Celano)

Pull request description:

  Before implementing the [new](#182) `announce` handler for the Axum HTTP tracker, I needed to clean up the current handlers (UDP and HTTP). The code was duplicated, and I did not want to add a third copy.

  I planned to do it after migrating to Axum, but I changed my mind. I think the migration to Axum is going to be easier after this refactor.

Top commit has no ACKs.

Tree-SHA512: 21ed9132f083058b8e789184a665ab86344688ca482411abf2193bbd6f04d3b3d03ba53300361a9b54444ae12b1d0c4df65272f5da45d48364f4a419a675b007
  • Loading branch information
josecelano committed Feb 12, 2023
2 parents 8e85f18 + 7fcc19d commit 20f5751
Show file tree
Hide file tree
Showing 10 changed files with 244 additions and 354 deletions.
31 changes: 12 additions & 19 deletions src/http/warp_implementation/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use warp::{reject, Rejection, Reply};

use super::error::Error;
use super::{request, response, WebResult};
use crate::http::warp_implementation::peer_builder;
use crate::protocol::info_hash::InfoHash;
use crate::tracker::{self, auth, peer, statistics, torrent};

Expand All @@ -31,32 +32,26 @@ pub async fn authenticate(
})
}

/// Handle announce request
///
/// # Errors
///
/// Will return `warp::Rejection` that wraps the `ServerError` if unable to `send_scrape_response`.
/// Will return `warp::Rejection` that wraps the `ServerError` if unable to `send_announce_response`.
pub async fn handle_announce(
announce_request: request::Announce,
auth_key: Option<auth::Key>,
tracker: Arc<tracker::Tracker>,
) -> WebResult<impl Reply> {
authenticate(&announce_request.info_hash, &auth_key, tracker.clone()).await?;
debug!("http announce request: {:#?}", announce_request);

debug!("{:?}", announce_request);
let info_hash = announce_request.info_hash;
let remote_client_ip = announce_request.peer_addr;

let peer = peer::Peer::from_http_announce_request(&announce_request, announce_request.peer_addr, tracker.config.get_ext_ip());
let torrent_stats = tracker
.update_torrent_with_peer_and_get_stats(&announce_request.info_hash, &peer)
.await;
authenticate(&info_hash, &auth_key, tracker.clone()).await?;

// get all torrent peers excluding the peer_addr
let peers = tracker.get_torrent_peers(&announce_request.info_hash, &peer.peer_addr).await;
let mut peer = peer_builder::from_request(&announce_request, &remote_client_ip);

let announce_interval = tracker.config.announce_interval;
let response = tracker.announce(&info_hash, &mut peer, &remote_client_ip).await;

// send stats event
match announce_request.peer_addr {
match remote_client_ip {
IpAddr::V4(_) => {
tracker.send_stats_event(statistics::Event::Tcp4Announce).await;
}
Expand All @@ -67,15 +62,13 @@ pub async fn handle_announce(

send_announce_response(
&announce_request,
&torrent_stats,
&peers,
announce_interval,
&response.swam_stats,
&response.peers,
tracker.config.announce_interval,
tracker.config.min_announce_interval,
)
}

/// Handle scrape request
///
/// # Errors
///
/// Will return `warp::Rejection` that wraps the `ServerError` if unable to `send_scrape_response`.
Expand Down
1 change: 1 addition & 0 deletions src/http/warp_implementation/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use warp::Rejection;
pub mod error;
pub mod filters;
pub mod handlers;
pub mod peer_builder;
pub mod request;
pub mod response;
pub mod routes;
Expand Down
32 changes: 32 additions & 0 deletions src/http/warp_implementation/peer_builder.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
use std::net::{IpAddr, SocketAddr};

use aquatic_udp_protocol::{AnnounceEvent, NumberOfBytes};

use super::request::Announce;
use crate::protocol::clock::{Current, Time};
use crate::tracker::peer::Peer;

#[must_use]
pub fn from_request(announce_request: &Announce, peer_ip: &IpAddr) -> Peer {
let event: AnnounceEvent = if let Some(event) = &announce_request.event {
match event.as_ref() {
"started" => AnnounceEvent::Started,
"stopped" => AnnounceEvent::Stopped,
"completed" => AnnounceEvent::Completed,
_ => AnnounceEvent::None,
}
} else {
AnnounceEvent::None
};

#[allow(clippy::cast_possible_truncation)]
Peer {
peer_id: announce_request.peer_id,
peer_addr: SocketAddr::new(*peer_ip, announce_request.port),
updated: Current::now(),
uploaded: NumberOfBytes(i128::from(announce_request.uploaded) as i64),
downloaded: NumberOfBytes(i128::from(announce_request.downloaded) as i64),
left: NumberOfBytes(i128::from(announce_request.left) as i64),
event,
}
}
135 changes: 131 additions & 4 deletions src/tracker/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ pub mod torrent;

use std::collections::btree_map::Entry;
use std::collections::BTreeMap;
use std::net::SocketAddr;
use std::net::{IpAddr, SocketAddr};
use std::panic::Location;
use std::sync::Arc;
use std::time::Duration;
Expand All @@ -17,6 +17,8 @@ use tokio::sync::mpsc::error::SendError;
use tokio::sync::{RwLock, RwLockReadGuard};

use self::error::Error;
use self::peer::Peer;
use self::torrent::SwamStats;
use crate::config::Configuration;
use crate::databases::driver::Driver;
use crate::databases::{self, Database};
Expand All @@ -41,6 +43,11 @@ pub struct TorrentsMetrics {
pub torrents: u64,
}

pub struct AnnounceResponse {
pub peers: Vec<Peer>,
pub swam_stats: SwamStats,
}

impl Tracker {
/// # Errors
///
Expand Down Expand Up @@ -76,6 +83,18 @@ impl Tracker {
self.mode == mode::Mode::Listed || self.mode == mode::Mode::PrivateListed
}

/// It handles an announce request
pub async fn announce(&self, info_hash: &InfoHash, peer: &mut Peer, remote_client_ip: &IpAddr) -> AnnounceResponse {
peer.change_ip(&assign_ip_address_to_peer(remote_client_ip, self.config.get_ext_ip()));

let swam_stats = self.update_torrent_with_peer_and_get_stats(info_hash, peer).await;

// todo: remove peer by using its `Id` instead of its socket address: `get_peers_excluding_peer(peer_id: peer::Id)`
let peers = self.get_peers_excluding_peers_with_address(info_hash, &peer.peer_addr).await;

AnnounceResponse { peers, swam_stats }
}

/// # Errors
///
/// Will return a `database::Error` if unable to add the `auth_key` to the database.
Expand Down Expand Up @@ -272,13 +291,16 @@ impl Tracker {
Ok(())
}

/// Get all torrent peers for a given torrent filtering out the peer with the client address
pub async fn get_torrent_peers(&self, info_hash: &InfoHash, client_addr: &SocketAddr) -> Vec<peer::Peer> {
async fn get_peers_excluding_peers_with_address(
&self,
info_hash: &InfoHash,
excluded_address: &SocketAddr,
) -> Vec<peer::Peer> {
let read_lock = self.torrents.read().await;

match read_lock.get(info_hash) {
None => vec![],
Some(entry) => entry.get_peers(Some(client_addr)).into_iter().copied().collect(),
Some(entry) => entry.get_peers(Some(excluded_address)).into_iter().copied().collect(),
}
}

Expand Down Expand Up @@ -378,6 +400,15 @@ impl Tracker {
}
}

#[must_use]
fn assign_ip_address_to_peer(remote_client_ip: &IpAddr, tracker_external_ip: Option<IpAddr>) -> IpAddr {
if let Some(host_ip) = tracker_external_ip.filter(|_| remote_client_ip.is_loopback()) {
host_ip
} else {
*remote_client_ip
}
}

#[cfg(test)]
mod tests {
use std::sync::Arc;
Expand Down Expand Up @@ -424,4 +455,100 @@ mod tests {
}
);
}

mod the_tracker_assigning_the_ip_to_the_peer {

use std::net::{IpAddr, Ipv4Addr};

use crate::tracker::assign_ip_address_to_peer;

#[test]
fn should_use_the_source_ip_instead_of_the_ip_in_the_announce_request() {
let remote_ip = IpAddr::V4(Ipv4Addr::new(126, 0, 0, 2));

let peer_ip = assign_ip_address_to_peer(&remote_ip, None);

assert_eq!(peer_ip, remote_ip);
}

mod when_the_client_ip_is_a_ipv4_loopback_ip {

use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
use std::str::FromStr;

use crate::tracker::assign_ip_address_to_peer;

#[test]
fn it_should_use_the_loopback_ip_if_the_tracker_does_not_have_the_external_ip_configuration() {
let remote_ip = IpAddr::V4(Ipv4Addr::LOCALHOST);

let peer_ip = assign_ip_address_to_peer(&remote_ip, None);

assert_eq!(peer_ip, remote_ip);
}

#[test]
fn it_should_use_the_external_tracker_ip_in_tracker_configuration_if_it_is_defined() {
let remote_ip = IpAddr::V4(Ipv4Addr::LOCALHOST);

let tracker_external_ip = IpAddr::V4(Ipv4Addr::from_str("126.0.0.1").unwrap());

let peer_ip = assign_ip_address_to_peer(&remote_ip, Some(tracker_external_ip));

assert_eq!(peer_ip, tracker_external_ip);
}

#[test]
fn it_should_use_the_external_ip_in_the_tracker_configuration_if_it_is_defined_even_if_the_external_ip_is_an_ipv6_ip()
{
let remote_ip = IpAddr::V4(Ipv4Addr::LOCALHOST);

let tracker_external_ip = IpAddr::V6(Ipv6Addr::from_str("2345:0425:2CA1:0000:0000:0567:5673:23b5").unwrap());

let peer_ip = assign_ip_address_to_peer(&remote_ip, Some(tracker_external_ip));

assert_eq!(peer_ip, tracker_external_ip);
}
}

mod when_client_ip_is_a_ipv6_loopback_ip {

use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
use std::str::FromStr;

use crate::tracker::assign_ip_address_to_peer;

#[test]
fn it_should_use_the_loopback_ip_if_the_tracker_does_not_have_the_external_ip_configuration() {
let remote_ip = IpAddr::V6(Ipv6Addr::LOCALHOST);

let peer_ip = assign_ip_address_to_peer(&remote_ip, None);

assert_eq!(peer_ip, remote_ip);
}

#[test]
fn it_should_use_the_external_ip_in_tracker_configuration_if_it_is_defined() {
let remote_ip = IpAddr::V6(Ipv6Addr::LOCALHOST);

let tracker_external_ip = IpAddr::V6(Ipv6Addr::from_str("2345:0425:2CA1:0000:0000:0567:5673:23b5").unwrap());

let peer_ip = assign_ip_address_to_peer(&remote_ip, Some(tracker_external_ip));

assert_eq!(peer_ip, tracker_external_ip);
}

#[test]
fn it_should_use_the_external_ip_in_the_tracker_configuration_if_it_is_defined_even_if_the_external_ip_is_an_ipv4_ip()
{
let remote_ip = IpAddr::V6(Ipv6Addr::LOCALHOST);

let tracker_external_ip = IpAddr::V4(Ipv4Addr::from_str("126.0.0.1").unwrap());

let peer_ip = assign_ip_address_to_peer(&remote_ip, Some(tracker_external_ip));

assert_eq!(peer_ip, tracker_external_ip);
}
}
}
}
Loading

0 comments on commit 20f5751

Please sign in to comment.