Skip to content

Commit

Permalink
Merge #530: Improve tracker statistics importation
Browse files Browse the repository at this point in the history
af7150b feat: [#469] import torrent stats using multiple torrents tracker API endpoint (Jose Celano)
16cbea8 feat: [#469] import torrent statistics in batches (Jose Celano)
feffd09 feat: [#469] add update datetime for tracker stasts importation (Jose Celano)

Pull request description:

  Currently, the Index imports statistics for all torrents every hour (1 hour is the default value in the configuration). We need to import stats for all torrents because we allow users to sort torrents by torrent stats (number of seeders and leechers). This PR improves a little bit the process.

  - [x] Add a new field (`updated_at`) to the table `torrust_torrent_tracker_stats` with the datetime when the stats were imported from the tracker. This is for logging purposes but it also helps to import torrents in batches. Regarding logging, it could help to check that the cronjob is running correctly.
  - [x] We get all torrents (`get_all_torrents_compact`) from the database. That could be big array of infohashes. We could obtain the 50 records that have not been updated for the longest time and run the importation every 100 milliseconds. We request the tracker API every 100 milliseconds getting 50 torrents. Those values can be adjusted in the future.
  - [x] A [new filter was added to the tracker API to get statistics for a list of torrents with one request](torrust/torrust-tracker#728). We can use it instead of getting one torrent at a time.

  **Pros:**

  - With millions of torrents we don't need to load all of them into memory.
  - The new field `updated_at` helps to monitor the importation process.
  - We get torrent stats for 50 torrents in one request instead of one request per torrent.

  **Cons:**

  - Every 100 milliseconds we run a query to check which torrent stats are pending to update.

ACKs for top commit:
  josecelano:
    ACK af7150b

Tree-SHA512: af1632282419457e20cc86e447b65d36c8e52dbff47e5c79cc1802fc6f67c759d572568f2846f65d4d5540049240ea82246df21d773ed1e6a285bde681fb423b
  • Loading branch information
josecelano committed Mar 13, 2024
2 parents 1769bf1 + af7150b commit 1368045
Show file tree
Hide file tree
Showing 10 changed files with 281 additions and 22 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
-- New field to track when stats were updated from the tracker
ALTER TABLE torrust_torrent_tracker_stats ADD COLUMN updated_at DATETIME DEFAULT NULL;
UPDATE torrust_torrent_tracker_stats SET updated_at = '1000-01-01 00:00:00';
ALTER TABLE torrust_torrent_tracker_stats MODIFY COLUMN updated_at DATETIME NOT NULL;
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
-- New field to track when stats were updated from the tracker
ALTER TABLE torrust_torrent_tracker_stats ADD COLUMN updated_at TEXT DEFAULT "1000-01-01 00:00:00";
55 changes: 44 additions & 11 deletions src/console/cronjobs/tracker_statistics_importer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@ use axum::extract::State;
use axum::routing::{get, post};
use axum::{Json, Router};
use chrono::{DateTime, Utc};
use log::{error, info};
use log::{debug, error, info};
use serde_json::{json, Value};
use text_colorizer::Colorize;
use tokio::net::TcpListener;
use tokio::task::JoinHandle;

use crate::tracker::statistics_importer::StatisticsImporter;
use crate::utils::clock::seconds_ago_utc;

const IMPORTER_API_IP: &str = "127.0.0.1";

Expand All @@ -41,7 +43,7 @@ struct ImporterState {
#[must_use]
pub fn start(
importer_port: u16,
torrent_info_update_interval: u64,
torrent_stats_update_interval: u64,
tracker_statistics_importer: &Arc<StatisticsImporter>,
) -> JoinHandle<()> {
let weak_tracker_statistics_importer = Arc::downgrade(tracker_statistics_importer);
Expand All @@ -54,7 +56,7 @@ pub fn start(
let _importer_api_handle = tokio::spawn(async move {
let import_state = Arc::new(ImporterState {
last_heartbeat: Arc::new(Mutex::new(Utc::now())),
torrent_info_update_interval,
torrent_info_update_interval: torrent_stats_update_interval,
});

let app = Router::new()
Expand All @@ -81,25 +83,56 @@ pub fn start(

info!("Tracker statistics importer cronjob starting ...");

let interval = std::time::Duration::from_secs(torrent_info_update_interval);
let mut interval = tokio::time::interval(interval);
// code-review: we set an execution interval to avoid intense polling to
// the database. If we remove the interval we would be constantly
// queering if there are torrent stats pending to update, unless there
// are torrents to update. Maybe we should only sleep for 100 milliseconds
// if we did not update any torrents in the latest execution.
// With this current limit we can only import 50 torrent stats every 100
// milliseconds which is 500 torrents per second (1800000 torrents per hour).
// If the tracker can handle a request in 100 milliseconds.

interval.tick().await; // first tick is immediate...
let execution_interval_in_milliseconds = 100;
let execution_interval_duration = std::time::Duration::from_millis(execution_interval_in_milliseconds);
let mut execution_interval = tokio::time::interval(execution_interval_duration);

loop {
interval.tick().await;
execution_interval.tick().await; // first tick is immediate...

info!("Running tracker statistics importer ...");
info!("Running tracker statistics importer every {execution_interval_in_milliseconds} milliseconds ...");

loop {
if let Err(e) = send_heartbeat(importer_port).await {
error!("Failed to send heartbeat from importer cronjob: {}", e);
}

if let Some(tracker) = weak_tracker_statistics_importer.upgrade() {
drop(tracker.import_all_torrents_statistics().await);
if let Some(statistics_importer) = weak_tracker_statistics_importer.upgrade() {
let one_interval_ago = seconds_ago_utc(
torrent_stats_update_interval
.try_into()
.expect("update interval should be a positive integer"),
);
let limit = 50;

debug!(
"Importing torrents statistics not updated since {} limited to a maximum of {} torrents ...",
one_interval_ago.to_string().yellow(),
limit.to_string().yellow()
);

match statistics_importer
.import_torrents_statistics_not_updated_since(one_interval_ago, limit)
.await
{
Ok(()) => {}
Err(e) => error!("Failed to import statistics: {:?}", e),
}

drop(statistics_importer);
} else {
break;
}

execution_interval.tick().await;
}
})
}
Expand Down
9 changes: 8 additions & 1 deletion src/databases/database.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use async_trait::async_trait;
use chrono::NaiveDateTime;
use chrono::{DateTime, NaiveDateTime, Utc};
use serde::{Deserialize, Serialize};

use crate::databases::mysql::Mysql;
Expand Down Expand Up @@ -292,6 +292,13 @@ pub trait Database: Sync + Send {
/// Get all torrents as `Vec<TorrentCompact>`.
async fn get_all_torrents_compact(&self) -> Result<Vec<TorrentCompact>, Error>;

/// Get torrents whose stats have not been imported from the tracker at least since a given datetime.
async fn get_torrents_with_stats_not_updated_since(
&self,
datetime: DateTime<Utc>,
limit: i64,
) -> Result<Vec<TorrentCompact>, Error>;

/// Update a torrent's title with `torrent_id` and `title`.
async fn update_torrent_title(&self, torrent_id: i64, title: &str) -> Result<(), Error>;

Expand Down
28 changes: 25 additions & 3 deletions src/databases/mysql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::str::FromStr;
use std::time::Duration;

use async_trait::async_trait;
use chrono::NaiveDateTime;
use chrono::{DateTime, NaiveDateTime, Utc};
use sqlx::mysql::{MySqlConnectOptions, MySqlPoolOptions};
use sqlx::{query, query_as, Acquire, ConnectOptions, MySqlPool};

Expand All @@ -20,7 +20,7 @@ use crate::models::torrent_tag::{TagId, TorrentTag};
use crate::models::tracker_key::TrackerKey;
use crate::models::user::{User, UserAuthentication, UserCompact, UserId, UserProfile};
use crate::services::torrent::{CanonicalInfoHashGroup, DbTorrentInfoHash};
use crate::utils::clock;
use crate::utils::clock::{self, datetime_now, DATETIME_FORMAT};
use crate::utils::hex::from_bytes;

pub struct Mysql {
Expand Down Expand Up @@ -884,6 +884,27 @@ impl Database for Mysql {
.map_err(|_| database::Error::Error)
}

async fn get_torrents_with_stats_not_updated_since(
&self,
datetime: DateTime<Utc>,
limit: i64,
) -> Result<Vec<TorrentCompact>, database::Error> {
query_as::<_, TorrentCompact>(
"SELECT tt.torrent_id, tt.info_hash
FROM torrust_torrents tt
LEFT JOIN torrust_torrent_tracker_stats tts ON tt.torrent_id = tts.torrent_id
WHERE tts.updated_at < ? OR tts.updated_at IS NULL
ORDER BY tts.updated_at ASC
LIMIT ?
",
)
.bind(datetime.format(DATETIME_FORMAT).to_string())
.bind(limit)
.fetch_all(&self.pool)
.await
.map_err(|_| database::Error::Error)
}

async fn update_torrent_title(&self, torrent_id: i64, title: &str) -> Result<(), database::Error> {
query("UPDATE torrust_torrent_info SET title = ? WHERE torrent_id = ?")
.bind(title)
Expand Down Expand Up @@ -1055,11 +1076,12 @@ impl Database for Mysql {
seeders: i64,
leechers: i64,
) -> Result<(), database::Error> {
query("REPLACE INTO torrust_torrent_tracker_stats (torrent_id, tracker_url, seeders, leechers) VALUES (?, ?, ?, ?)")
query("REPLACE INTO torrust_torrent_tracker_stats (torrent_id, tracker_url, seeders, leechers, updated_at) VALUES (?, ?, ?, ?, ?)")
.bind(torrent_id)
.bind(tracker_url)
.bind(seeders)
.bind(leechers)
.bind(datetime_now())
.execute(&self.pool)
.await
.map(|_| ())
Expand Down
28 changes: 25 additions & 3 deletions src/databases/sqlite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::str::FromStr;
use std::time::Duration;

use async_trait::async_trait;
use chrono::NaiveDateTime;
use chrono::{DateTime, NaiveDateTime, Utc};
use sqlx::sqlite::{SqliteConnectOptions, SqlitePoolOptions};
use sqlx::{query, query_as, Acquire, ConnectOptions, SqlitePool};

Expand All @@ -20,7 +20,7 @@ use crate::models::torrent_tag::{TagId, TorrentTag};
use crate::models::tracker_key::TrackerKey;
use crate::models::user::{User, UserAuthentication, UserCompact, UserId, UserProfile};
use crate::services::torrent::{CanonicalInfoHashGroup, DbTorrentInfoHash};
use crate::utils::clock;
use crate::utils::clock::{self, datetime_now, DATETIME_FORMAT};
use crate::utils::hex::from_bytes;

pub struct Sqlite {
Expand Down Expand Up @@ -876,6 +876,27 @@ impl Database for Sqlite {
.map_err(|_| database::Error::Error)
}

async fn get_torrents_with_stats_not_updated_since(
&self,
datetime: DateTime<Utc>,
limit: i64,
) -> Result<Vec<TorrentCompact>, database::Error> {
query_as::<_, TorrentCompact>(
"SELECT tt.torrent_id, tt.info_hash
FROM torrust_torrents tt
LEFT JOIN torrust_torrent_tracker_stats tts ON tt.torrent_id = tts.torrent_id
WHERE tts.updated_at < ? OR tts.updated_at IS NULL
ORDER BY tts.updated_at ASC
LIMIT ?
",
)
.bind(datetime.format(DATETIME_FORMAT).to_string())
.bind(limit)
.fetch_all(&self.pool)
.await
.map_err(|_| database::Error::Error)
}

async fn update_torrent_title(&self, torrent_id: i64, title: &str) -> Result<(), database::Error> {
query("UPDATE torrust_torrent_info SET title = $1 WHERE torrent_id = $2")
.bind(title)
Expand Down Expand Up @@ -1047,11 +1068,12 @@ impl Database for Sqlite {
seeders: i64,
leechers: i64,
) -> Result<(), database::Error> {
query("REPLACE INTO torrust_torrent_tracker_stats (torrent_id, tracker_url, seeders, leechers) VALUES ($1, $2, $3, $4)")
query("REPLACE INTO torrust_torrent_tracker_stats (torrent_id, tracker_url, seeders, leechers, updated_at) VALUES ($1, $2, $3, $4, $5)")
.bind(torrent_id)
.bind(tracker_url)
.bind(seeders)
.bind(leechers)
.bind(datetime_now())
.execute(&self.pool)
.await
.map(|_| ())
Expand Down
25 changes: 23 additions & 2 deletions src/tracker/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ impl ConnectionInfo {
}
}

const TOKEN_PARAM_NAME: &str = "token";

pub struct Client {
pub connection_info: ConnectionInfo,
api_base_url: String,
Expand All @@ -29,7 +31,7 @@ impl Client {
pub fn new(connection_info: ConnectionInfo) -> Result<Self, Error> {
let base_url = format!("{}/api/v1", connection_info.url);
let client = reqwest::Client::builder().timeout(Duration::from_secs(5)).build()?;
let token_param = [("token".to_string(), connection_info.token.to_string())];
let token_param = [(TOKEN_PARAM_NAME.to_string(), connection_info.token.to_string())];

Ok(Self {
connection_info,
Expand Down Expand Up @@ -72,7 +74,7 @@ impl Client {
self.client.post(request_url).query(&self.token_param).send().await
}

/// Retrieve the info for a torrent.
/// Retrieve the info for one torrent.
///
/// # Errors
///
Expand All @@ -82,4 +84,23 @@ impl Client {

self.client.get(request_url).query(&self.token_param).send().await
}

/// Retrieve the info for multiple torrents at the same time.
///
/// # Errors
///
/// Will return an error if the HTTP request fails.
pub async fn get_torrents_info(&self, info_hashes: &[String]) -> Result<Response, Error> {
let request_url = format!("{}/torrents", self.api_base_url);

let mut query_params: Vec<(String, String)> = Vec::with_capacity(info_hashes.len() + 1);

query_params.push((TOKEN_PARAM_NAME.to_string(), self.connection_info.token.clone()));

for info_hash in info_hashes {
query_params.push(("info_hash".to_string(), info_hash.clone()));
}

self.client.get(request_url).query(&query_params).send().await
}
}
56 changes: 56 additions & 0 deletions src/tracker/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,14 @@ pub struct TorrentInfo {
pub peers: Vec<Peer>,
}

#[derive(Debug, Serialize, Deserialize, PartialEq)]
pub struct TorrentBasicInfo {
pub info_hash: String,
pub seeders: i64,
pub completed: i64,
pub leechers: i64,
}

#[derive(Debug, Serialize, Deserialize, PartialEq)]
pub struct Peer {
pub peer_id: Option<PeerId>,
Expand Down Expand Up @@ -259,6 +267,54 @@ impl Service {
}
}

/// Get torrent info from tracker in batches.
///
/// # Errors
///
/// Will return an error if the HTTP request to get torrent info fails or
/// if the response cannot be parsed.
pub async fn get_torrents_info(&self, info_hashes: &[String]) -> Result<Vec<TorrentBasicInfo>, TrackerAPIError> {
debug!(target: "tracker-service", "get torrents info");

let maybe_response = self.api_client.get_torrents_info(info_hashes).await;

debug!(target: "tracker-service", "get torrents info response result: {:?}", maybe_response);

match maybe_response {
Ok(response) => {
let status: StatusCode = map_status_code(response.status());

let body = response.text().await.map_err(|_| {
error!(target: "tracker-service", "response without body");
TrackerAPIError::MissingResponseBody
})?;

match status {
StatusCode::OK => serde_json::from_str(&body).map_err(|e| {
error!(
target: "tracker-service", "Failed to parse torrents info from tracker response. Body: {}, Error: {}",
body, e
);
TrackerAPIError::FailedToParseTrackerResponse { body }
}),
StatusCode::INTERNAL_SERVER_ERROR => {
if body == Self::invalid_token_body() {
Err(TrackerAPIError::InvalidToken)
} else {
error!(target: "tracker-service", "get torrents info 500 response: status {status}, body: {body}");
Err(TrackerAPIError::InternalServerError)
}
}
_ => {
error!(target: "tracker-service", "get torrents info unhandled response: status {status}, body: {body}");
Err(TrackerAPIError::UnexpectedResponseStatus)
}
}
}
Err(_) => Err(TrackerAPIError::TrackerOffline),
}
}

/// Issue a new tracker key from tracker.
async fn retrieve_new_tracker_key(&self, user_id: i64) -> Result<TrackerKey, TrackerAPIError> {
debug!(target: "tracker-service", "retrieve key: {user_id}");
Expand Down
Loading

0 comments on commit 1368045

Please sign in to comment.