Skip to content

Commit

Permalink
feat(http): [torrust#160] scaffolding for HTTP tracker using Axum
Browse files Browse the repository at this point in the history
We are going to migrate the HTTP tracker from Warp to Axum.
This is the basic scaffolding for Axum. Tests have been duplicated to
test the new Axum implementation. The setup allows executing both
versions: the Warp version on production and both versions (Warp and
Axum) on testing env.
  • Loading branch information
josecelano committed Feb 8, 2023
1 parent da6f1a7 commit 0dc3050
Show file tree
Hide file tree
Showing 12 changed files with 1,507 additions and 67 deletions.
9 changes: 9 additions & 0 deletions src/http/axum/handlers.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
use axum::response::Json;

use super::resources::ok::Ok;
use super::responses::ok_response;

#[allow(clippy::unused_async)]
pub async fn get_status_handler() -> Json<Ok> {
ok_response()
}
5 changes: 5 additions & 0 deletions src/http/axum/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pub mod handlers;
pub mod resources;
pub mod responses;
pub mod routes;
pub mod server;
1 change: 1 addition & 0 deletions src/http/axum/resources/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pub mod ok;
4 changes: 4 additions & 0 deletions src/http/axum/resources/ok.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
use serde::{Deserialize, Serialize};

#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)]
pub struct Ok {}
10 changes: 10 additions & 0 deletions src/http/axum/responses.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
// Resource responses

use axum::Json;

use super::resources::ok::Ok;

#[must_use]
pub fn ok_response() -> Json<Ok> {
Json(Ok {})
}
13 changes: 13 additions & 0 deletions src/http/axum/routes.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
use std::sync::Arc;

use axum::routing::get;
use axum::Router;

use super::handlers::get_status_handler;
use crate::tracker::Tracker;

pub fn router(_tracker: &Arc<Tracker>) -> Router {
Router::new()
// Status
.route("/status", get(get_status_handler))
}
43 changes: 43 additions & 0 deletions src/http/axum/server.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
use std::net::SocketAddr;
use std::sync::Arc;

use axum_server::tls_rustls::RustlsConfig;
use axum_server::Handle;
use futures::Future;
use log::info;
use warp::hyper;

use super::routes::router;
use crate::tracker::Tracker;

pub fn start(socket_addr: SocketAddr, tracker: &Arc<Tracker>) -> impl Future<Output = hyper::Result<()>> {
let app = router(tracker);

let server = axum::Server::bind(&socket_addr).serve(app.into_make_service());

server.with_graceful_shutdown(async move {
tokio::signal::ctrl_c().await.expect("Failed to listen to shutdown signal.");
info!("Stopping Torrust HTTP tracker server on http://{} ...", socket_addr);
})
}

pub fn start_tls(
socket_addr: SocketAddr,
ssl_config: RustlsConfig,
tracker: &Arc<Tracker>,
) -> impl Future<Output = Result<(), std::io::Error>> {
let app = router(tracker);

let handle = Handle::new();
let shutdown_handle = handle.clone();

tokio::spawn(async move {
tokio::signal::ctrl_c().await.expect("Failed to listen to shutdown signal.");
info!("Stopping Torrust HTTP tracker server on https://{} ...", socket_addr);
shutdown_handle.shutdown();
});

axum_server::bind_rustls(socket_addr, ssl_config)
.handle(handle)
.serve(app.into_make_service())
}
9 changes: 9 additions & 0 deletions src/http/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@
//! - <https://wiki.theory.org/BitTorrentSpecification#Tracker_HTTP.2FHTTPS_Protocol>
//! - <https://wiki.theory.org/BitTorrent_Tracker_Protocol>
//!

use serde::{Deserialize, Serialize};
pub mod axum;
pub mod error;
pub mod filters;
pub mod handlers;
Expand All @@ -19,3 +22,9 @@ pub mod server;

pub type Bytes = u64;
pub type WebResult<T> = std::result::Result<T, warp::Rejection>;

#[derive(Serialize, Deserialize, Copy, Clone, PartialEq, Eq, Debug)]
pub enum Version {
Warp,
Axum,
}
68 changes: 66 additions & 2 deletions src/jobs/http_tracker.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,31 @@
use std::net::SocketAddr;
use std::sync::Arc;

use axum_server::tls_rustls::RustlsConfig;
use log::{info, warn};
use tokio::sync::oneshot;
use tokio::task::JoinHandle;

use crate::config::HttpTracker;
use crate::http::axum::server;
use crate::http::server::Http;
use crate::http::Version;
use crate::tracker;

#[derive(Debug)]
pub struct ServerJobStarted();

pub async fn start_job(config: &HttpTracker, tracker: Arc<tracker::Tracker>, version: Version) -> JoinHandle<()> {
match version {
Version::Warp => start_warp(config, tracker.clone()).await,
Version::Axum => start_axum(config, tracker.clone()).await,
}
}

/// # Panics
///
/// It would panic if the `config::HttpTracker` struct would contain an inappropriate values.
pub async fn start_job(config: &HttpTracker, tracker: Arc<tracker::Tracker>) -> JoinHandle<()> {
/// It would panic if the `config::HttpTracker` struct would contain inappropriate values.
async fn start_warp(config: &HttpTracker, tracker: Arc<tracker::Tracker>) -> JoinHandle<()> {
let bind_addr = config
.bind_address
.parse::<SocketAddr>()
Expand Down Expand Up @@ -68,3 +78,57 @@ pub async fn start_job(config: &HttpTracker, tracker: Arc<tracker::Tracker>) ->

join_handle
}

/// # Panics
///
/// It would panic if the `config::HttpTracker` struct would contain inappropriate values.
async fn start_axum(config: &HttpTracker, tracker: Arc<tracker::Tracker>) -> JoinHandle<()> {
let bind_addr = config
.bind_address
.parse::<std::net::SocketAddr>()
.expect("Tracker API bind_address invalid.");
let ssl_enabled = config.ssl_enabled;
let ssl_cert_path = config.ssl_cert_path.clone();
let ssl_key_path = config.ssl_key_path.clone();

let (tx, rx) = oneshot::channel::<ServerJobStarted>();

// Run the API server
let join_handle = tokio::spawn(async move {
if !ssl_enabled {
info!("Starting Torrust HTTP tracker server on: http://{}", bind_addr);

let handle = server::start(bind_addr, &tracker);

tx.send(ServerJobStarted())
.expect("the HTTP tracker server should not be dropped");

if let Ok(()) = handle.await {
info!("Torrust HTTP tracker server on http://{} stopped", bind_addr);
}
} else if ssl_enabled && ssl_cert_path.is_some() && ssl_key_path.is_some() {
info!("Starting Torrust HTTP tracker server on: https://{}", bind_addr);

let ssl_config = RustlsConfig::from_pem_file(ssl_cert_path.unwrap(), ssl_key_path.unwrap())
.await
.unwrap();

let handle = server::start_tls(bind_addr, ssl_config, &tracker);

tx.send(ServerJobStarted())
.expect("the HTTP tracker server should not be dropped");

if let Ok(()) = handle.await {
info!("Torrust HTTP tracker server on https://{} stopped", bind_addr);
}
}
});

// Wait until the HTTP tracker server job is running
match rx.await {
Ok(_msg) => info!("Torrust HTTP tracker server started"),
Err(e) => panic!("the HTTP tracker server was dropped: {e}"),
}

join_handle
}
3 changes: 2 additions & 1 deletion src/setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use log::warn;
use tokio::task::JoinHandle;

use crate::config::Configuration;
use crate::http::Version;
use crate::jobs::{http_tracker, torrent_cleanup, tracker_apis, udp_tracker};
use crate::tracker;

Expand Down Expand Up @@ -47,7 +48,7 @@ pub async fn setup(config: &Configuration, tracker: Arc<tracker::Tracker>) -> Ve
if !http_tracker_config.enabled {
continue;
}
jobs.push(http_tracker::start_job(http_tracker_config, tracker.clone()).await);
jobs.push(http_tracker::start_job(http_tracker_config, tracker.clone(), Version::Warp).await);
}

// Start HTTP API
Expand Down
33 changes: 17 additions & 16 deletions tests/http/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::net::{IpAddr, SocketAddr};
use std::sync::Arc;

use torrust_tracker::config::{ephemeral_configuration, Configuration};
use torrust_tracker::http::Version;
use torrust_tracker::jobs::http_tracker;
use torrust_tracker::protocol::info_hash::InfoHash;
use torrust_tracker::tracker::mode::Mode;
Expand All @@ -13,24 +14,24 @@ use torrust_tracker::{ephemeral_instance_keys, logging, static_time, tracker};
use super::connection_info::ConnectionInfo;

/// Starts a HTTP tracker with mode "public" in settings
pub async fn start_public_http_tracker() -> Server {
pub async fn start_public_http_tracker(version: Version) -> Server {
let mut configuration = ephemeral_configuration();
configuration.mode = Mode::Public;
start_custom_http_tracker(Arc::new(configuration)).await
start_custom_http_tracker(Arc::new(configuration), version).await
}

/// Starts a HTTP tracker with mode "listed" in settings
pub async fn start_whitelisted_http_tracker() -> Server {
pub async fn start_whitelisted_http_tracker(version: Version) -> Server {
let mut configuration = ephemeral_configuration();
configuration.mode = Mode::Listed;
start_custom_http_tracker(Arc::new(configuration)).await
start_custom_http_tracker(Arc::new(configuration), version).await
}

/// Starts a HTTP tracker with mode "private" in settings
pub async fn start_private_http_tracker() -> Server {
pub async fn start_private_http_tracker(version: Version) -> Server {
let mut configuration = ephemeral_configuration();
configuration.mode = Mode::Private;
start_custom_http_tracker(Arc::new(configuration)).await
start_custom_http_tracker(Arc::new(configuration), version).await
}

/// Starts a HTTP tracker with a wildcard IPV6 address.
Expand All @@ -40,7 +41,7 @@ pub async fn start_private_http_tracker() -> Server {
/// [[http_trackers]]
/// bind_address = "[::]:7070"
/// ```
pub async fn start_ipv6_http_tracker() -> Server {
pub async fn start_ipv6_http_tracker(version: Version) -> Server {
let mut configuration = ephemeral_configuration();

// Change socket address to "wildcard address" (unspecified address which means any IP address)
Expand All @@ -49,7 +50,7 @@ pub async fn start_ipv6_http_tracker() -> Server {
let new_ipv6_socket_address = format!("[::]:{}", socket_addr.port());
configuration.http_trackers[0].bind_address = new_ipv6_socket_address;

start_custom_http_tracker(Arc::new(configuration)).await
start_custom_http_tracker(Arc::new(configuration), version).await
}

/// Starts a HTTP tracker with an specific `external_ip`.
Expand All @@ -58,10 +59,10 @@ pub async fn start_ipv6_http_tracker() -> Server {
/// ```text
/// external_ip = "2.137.87.41"
/// ```
pub async fn start_http_tracker_with_external_ip(external_ip: &IpAddr) -> Server {
pub async fn start_http_tracker_with_external_ip(external_ip: &IpAddr, version: Version) -> Server {
let mut configuration = ephemeral_configuration();
configuration.external_ip = Some(external_ip.to_string());
start_custom_http_tracker(Arc::new(configuration)).await
start_custom_http_tracker(Arc::new(configuration), version).await
}

/// Starts a HTTP tracker `on_reverse_proxy`.
Expand All @@ -70,24 +71,24 @@ pub async fn start_http_tracker_with_external_ip(external_ip: &IpAddr) -> Server
/// ```text
/// on_reverse_proxy = true
/// ```
pub async fn start_http_tracker_on_reverse_proxy() -> Server {
pub async fn start_http_tracker_on_reverse_proxy(version: Version) -> Server {
let mut configuration = ephemeral_configuration();
configuration.on_reverse_proxy = true;
start_custom_http_tracker(Arc::new(configuration)).await
start_custom_http_tracker(Arc::new(configuration), version).await
}

pub async fn start_default_http_tracker() -> Server {
pub async fn start_default_http_tracker(version: Version) -> Server {
let configuration = tracker_configuration();
start_custom_http_tracker(configuration.clone()).await
start_custom_http_tracker(configuration.clone(), version).await
}

pub fn tracker_configuration() -> Arc<Configuration> {
Arc::new(ephemeral_configuration())
}

pub async fn start_custom_http_tracker(configuration: Arc<Configuration>) -> Server {
pub async fn start_custom_http_tracker(configuration: Arc<Configuration>, version: Version) -> Server {
let server = start(&configuration);
http_tracker::start_job(&configuration.http_trackers[0], server.tracker.clone()).await;
http_tracker::start_job(&configuration.http_trackers[0], server.tracker.clone(), version).await;
server
}

Expand Down
Loading

0 comments on commit 0dc3050

Please sign in to comment.